diff --git a/core/build.gradle b/core/build.gradle index b98a8219c..60ffbba89 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -800,6 +800,11 @@ if (environment == 'alpha') { mainClass: 'google.registry.beam.comparedb.ValidateDatabasePipeline', metaData: 'google/registry/beam/validate_database_pipeline_metadata.json' ], + resaveAllEppResources: + [ + mainClass: 'google.registry.beam.resave.ResaveAllEppResourcesPipeline', + metaData: 'google/registry/beam/resave_all_epp_resources_pipeline_metadata.json' + ], ] project.tasks.create("stageBeamPipelines") { doLast { diff --git a/core/src/main/java/google/registry/batch/BatchModule.java b/core/src/main/java/google/registry/batch/BatchModule.java index 8e12a4f9f..18cb2fec7 100644 --- a/core/src/main/java/google/registry/batch/BatchModule.java +++ b/core/src/main/java/google/registry/batch/BatchModule.java @@ -112,6 +112,12 @@ public class BatchModule { req, ExpandRecurringBillingEventsAction.PARAM_CURSOR_TIME); } + @Provides + @Parameter(ResaveAllEppResourcesPipelineAction.PARAM_FAST) + static Optional provideIsFast(HttpServletRequest req) { + return extractOptionalBooleanParameter(req, ResaveAllEppResourcesPipelineAction.PARAM_FAST); + } + @Provides @Named(QUEUE_ASYNC_ACTIONS) static Queue provideAsyncActionsPushQueue() { diff --git a/core/src/main/java/google/registry/batch/ResaveAllEppResourcesPipelineAction.java b/core/src/main/java/google/registry/batch/ResaveAllEppResourcesPipelineAction.java new file mode 100644 index 000000000..b23d0413f --- /dev/null +++ b/core/src/main/java/google/registry/batch/ResaveAllEppResourcesPipelineAction.java @@ -0,0 +1,129 @@ +// Copyright 2022 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.batch; + +import static google.registry.beam.BeamUtils.createJobName; +import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR; +import static javax.servlet.http.HttpServletResponse.SC_OK; + +import com.google.api.services.dataflow.Dataflow; +import com.google.api.services.dataflow.model.LaunchFlexTemplateParameter; +import com.google.api.services.dataflow.model.LaunchFlexTemplateRequest; +import com.google.api.services.dataflow.model.LaunchFlexTemplateResponse; +import com.google.common.collect.ImmutableMap; +import com.google.common.flogger.FluentLogger; +import com.google.common.net.MediaType; +import google.registry.config.RegistryConfig.Config; +import google.registry.config.RegistryEnvironment; +import google.registry.request.Action; +import google.registry.request.Parameter; +import google.registry.request.Response; +import google.registry.request.auth.Auth; +import google.registry.util.Clock; +import java.util.Optional; +import javax.inject.Inject; + +/** + * Starts a Dataflow pipeline that resaves all EPP resources projected to the current time. + * + *

This is useful for a few situations. First, as a fallback option for resource transfers that + * have expired pending transfers (this will resolve them), just in case the enqueued action fails. + * Second, it will reflect domain autorenews that have happened. Third, it will remove any expired + * grace periods. + * + *

There's also the general principle that it's good to have the data in the database remain as + * current as is reasonably possible. + * + *

If the ?isFast=true query string parameter is passed as true, the pipeline will + * only attempt to load, project, and resave entities where we expect one of the previous situations + * has occurred. Otherwise, we will load, project, and resave all EPP resources. + * + *

This runs the {@link google.registry.beam.resave.ResaveAllEppResourcesPipeline}. + */ +@Action( + service = Action.Service.BACKEND, + path = ResaveAllEppResourcesPipelineAction.PATH, + auth = Auth.AUTH_INTERNAL_OR_ADMIN) +public class ResaveAllEppResourcesPipelineAction implements Runnable { + + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); + + static final String PATH = "/_dr/task/resaveAllEppResourcesPipeline"; + static final String PIPELINE_NAME = "resave_all_epp_resources_pipeline"; + + public static final String PARAM_FAST = "fast"; + + private final String projectId; + private final String jobRegion; + private final String stagingBucketUrl; + private final boolean fast; + private final Clock clock; + private final Response response; + private final Dataflow dataflow; + + @Inject + ResaveAllEppResourcesPipelineAction( + @Config("projectId") String projectId, + @Config("defaultJobRegion") String jobRegion, + @Config("beamStagingBucketUrl") String stagingBucketUrl, + @Parameter(PARAM_FAST) Optional fast, + Clock clock, + Response response, + Dataflow dataflow) { + this.projectId = projectId; + this.jobRegion = jobRegion; + this.stagingBucketUrl = stagingBucketUrl; + this.fast = fast.orElse(false); + this.clock = clock; + this.response = response; + this.dataflow = dataflow; + } + + @Override + public void run() { + response.setContentType(MediaType.PLAIN_TEXT_UTF_8); + logger.atInfo().log("Launching ResaveAllEppResourcesPipeline"); + try { + LaunchFlexTemplateParameter parameter = + new LaunchFlexTemplateParameter() + .setJobName(createJobName("resave-all-epp-resources", clock)) + .setContainerSpecGcsPath( + String.format("%s/%s_metadata.json", stagingBucketUrl, PIPELINE_NAME)) + .setParameters( + new ImmutableMap.Builder() + .put(PARAM_FAST, Boolean.toString(fast)) + .put("registryEnvironment", RegistryEnvironment.get().name()) + .build()); + LaunchFlexTemplateResponse launchResponse = + dataflow + .projects() + .locations() + .flexTemplates() + .launch( + projectId, + jobRegion, + new LaunchFlexTemplateRequest().setLaunchParameter(parameter)) + .execute(); + logger.atInfo().log("Got response: %s", launchResponse.getJob().toPrettyString()); + String jobId = launchResponse.getJob().getId(); + response.setStatus(SC_OK); + response.setPayload(String.format("Launched resaveAllEppResources pipeline: %s", jobId)); + } catch (Exception e) { + logger.atSevere().withCause(e).log("Template Launch failed."); + response.setStatus(SC_INTERNAL_SERVER_ERROR); + response.setPayload(String.format("Pipeline launch failed: %s", e.getMessage())); + } + } +} diff --git a/core/src/main/java/google/registry/beam/initsql/InitSqlPipeline.java b/core/src/main/java/google/registry/beam/initsql/InitSqlPipeline.java index e042d219a..3f17dcc3e 100644 --- a/core/src/main/java/google/registry/beam/initsql/InitSqlPipeline.java +++ b/core/src/main/java/google/registry/beam/initsql/InitSqlPipeline.java @@ -158,7 +158,7 @@ public class InitSqlPipeline implements Serializable { .addAll(toKindStrings(PHASE_TWO_ORDERED)) .build())); - // Set up the pipeline to write entity kinds from PHASE_ONE_ORDERED to SQL. Return a object + // Set up the pipeline to write entity kinds from PHASE_ONE_ORDERED to SQL. Return an object // that signals the completion of the phase. PCollection blocker = scheduleOnePhaseWrites(datastoreSnapshot, PHASE_ONE_ORDERED, Optional.empty(), null); diff --git a/core/src/main/java/google/registry/beam/resave/ResaveAllEppResourcesPipeline.java b/core/src/main/java/google/registry/beam/resave/ResaveAllEppResourcesPipeline.java new file mode 100644 index 000000000..a2ad13fcf --- /dev/null +++ b/core/src/main/java/google/registry/beam/resave/ResaveAllEppResourcesPipeline.java @@ -0,0 +1,174 @@ +// Copyright 2022 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam.resave; + +import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; +import static org.apache.beam.sdk.values.TypeDescriptors.integers; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import google.registry.beam.common.RegistryJpaIO; +import google.registry.beam.common.RegistryJpaIO.Read; +import google.registry.model.EppResource; +import google.registry.model.contact.ContactResource; +import google.registry.model.domain.DomainBase; +import google.registry.model.host.HostResource; +import google.registry.persistence.PersistenceModule.TransactionIsolationLevel; +import google.registry.persistence.transaction.CriteriaQueryBuilder; +import google.registry.util.DateTimeUtils; +import java.io.Serializable; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupIntoBatches; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.util.ShardedKey; +import org.apache.beam.sdk.values.KV; +import org.joda.time.DateTime; + +/** + * A Dataflow Flex pipeline that resaves changed EPP resources in SQL. + * + *

Due to the way that Hibernate works, if an entity is unchanged by {@link + * EppResource#cloneProjectedAtTime(DateTime)} it will not actually be re-persisted to the database. + * Thus, the only actual changes occur when objects are changed by projecting them to now, such as + * when a pending transfer is resolved. + */ +public class ResaveAllEppResourcesPipeline implements Serializable { + + private static final ImmutableSet> EPP_RESOURCE_CLASSES = + ImmutableSet.of(ContactResource.class, DomainBase.class, HostResource.class); + + /** + * There exist three possible situations where we know we'll want to project domains to the + * current point in time: + * + *

+ * + *

This command contains all three scenarios so that we can avoid querying the Domain table + * multiple times, and to avoid projecting and resaving the same domain multiple times. + */ + private static final String DOMAINS_TO_PROJECT_QUERY = + "FROM Domain d WHERE (d.transferData.transferStatus = 'PENDING' AND" + + " d.transferData.pendingTransferExpirationTime < current_timestamp()) OR" + + " (d.registrationExpirationTime < current_timestamp() AND d.deletionTime =" + + " (:END_OF_TIME)) OR (EXISTS (SELECT 1 FROM GracePeriod gp WHERE gp.domainRepoId =" + + " d.repoId AND gp.expirationTime < current_timestamp()))"; + + private final ResaveAllEppResourcesPipelineOptions options; + + ResaveAllEppResourcesPipeline(ResaveAllEppResourcesPipelineOptions options) { + this.options = options; + } + + PipelineResult run() { + Pipeline pipeline = Pipeline.create(options); + setupPipeline(pipeline); + return pipeline.run(); + } + + void setupPipeline(Pipeline pipeline) { + options.setIsolationOverride(TransactionIsolationLevel.TRANSACTION_READ_COMMITTED); + if (options.getFast()) { + fastResaveContacts(pipeline); + fastResaveDomains(pipeline); + } else { + EPP_RESOURCE_CLASSES.forEach(clazz -> forceResaveAllResources(pipeline, clazz)); + } + } + + /** Projects to the current time and saves any contacts with expired transfers. */ + private void fastResaveContacts(Pipeline pipeline) { + Read read = + RegistryJpaIO.read( + "FROM Contact WHERE transferData.transferStatus = 'PENDING' AND" + + " transferData.pendingTransferExpirationTime < current_timestamp()", + ContactResource.class, + c -> c); + projectAndResaveResources(pipeline, ContactResource.class, read); + } + + /** + * Projects to the current time and saves any domains with expired pending actions (e.g. + * transfers, grace periods). + * + *

The logic of what might have changed is paraphrased from {@link + * google.registry.model.domain.DomainContent#cloneProjectedAtTime(DateTime)}. + */ + private void fastResaveDomains(Pipeline pipeline) { + Read read = + RegistryJpaIO.read( + DOMAINS_TO_PROJECT_QUERY, + ImmutableMap.of("END_OF_TIME", DateTimeUtils.END_OF_TIME), + DomainBase.class, + d -> d); + projectAndResaveResources(pipeline, DomainBase.class, read); + } + + /** Projects all resources to the current time and saves them. */ + private void forceResaveAllResources(Pipeline pipeline, Class clazz) { + Read read = RegistryJpaIO.read(() -> CriteriaQueryBuilder.create(clazz).build()); + projectAndResaveResources(pipeline, clazz, read); + } + + /** Projects and re-saves the result of the provided {@link Read}. */ + private void projectAndResaveResources( + Pipeline pipeline, Class clazz, Read read) { + int numShards = options.getSqlWriteShards(); + int batchSize = options.getSqlWriteBatchSize(); + String className = clazz.getSimpleName(); + pipeline + .apply("Read " + className, read) + .apply( + "Shard data for class" + className, + WithKeys.of(e -> ThreadLocalRandom.current().nextInt(numShards)) + .withKeyType(integers())) + .apply( + "Group into batches for class" + className, + GroupIntoBatches.ofSize(batchSize).withShardedKey()) + .apply("Map " + className + " to now", ParDo.of(new BatchedProjectionFunction<>())) + .apply( + "Write transformed " + className, + RegistryJpaIO.write() + .withName("Write transformed " + className) + .withBatchSize(batchSize) + .withShards(numShards)); + } + + private static class BatchedProjectionFunction + extends DoFn, Iterable>, EppResource> { + + @ProcessElement + public void processElement( + @Element KV, Iterable> element, + OutputReceiver outputReceiver) { + jpaTm() + .transact( + () -> + element + .getValue() + .forEach( + resource -> + outputReceiver.output( + resource.cloneProjectedAtTime(jpaTm().getTransactionTime())))); + } + } +} diff --git a/core/src/main/java/google/registry/beam/resave/ResaveAllEppResourcesPipelineOptions.java b/core/src/main/java/google/registry/beam/resave/ResaveAllEppResourcesPipelineOptions.java new file mode 100644 index 000000000..4dc57cb7e --- /dev/null +++ b/core/src/main/java/google/registry/beam/resave/ResaveAllEppResourcesPipelineOptions.java @@ -0,0 +1,26 @@ +// Copyright 2022 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam.resave; + +import google.registry.beam.common.RegistryPipelineOptions; +import org.apache.beam.sdk.options.Description; + +public interface ResaveAllEppResourcesPipelineOptions extends RegistryPipelineOptions { + + @Description("True if we should attempt to run only over potentially out-of-date EPP resources") + boolean getFast(); + + void setFast(boolean fast); +} diff --git a/core/src/main/java/google/registry/env/common/backend/WEB-INF/web.xml b/core/src/main/java/google/registry/env/common/backend/WEB-INF/web.xml index f6b48a58a..a381b00ec 100644 --- a/core/src/main/java/google/registry/env/common/backend/WEB-INF/web.xml +++ b/core/src/main/java/google/registry/env/common/backend/WEB-INF/web.xml @@ -316,6 +316,12 @@ /_dr/task/resaveAllEppResources + + + backend-servlet + /_dr/task/resaveAllEppResourcesPipeline + + backend-servlet diff --git a/core/src/main/java/google/registry/module/backend/BackendRequestComponent.java b/core/src/main/java/google/registry/module/backend/BackendRequestComponent.java index 1f4580051..ce9822a1c 100644 --- a/core/src/main/java/google/registry/module/backend/BackendRequestComponent.java +++ b/core/src/main/java/google/registry/module/backend/BackendRequestComponent.java @@ -31,6 +31,7 @@ import google.registry.batch.ExpandRecurringBillingEventsAction; import google.registry.batch.RefreshDnsOnHostRenameAction; import google.registry.batch.RelockDomainAction; import google.registry.batch.ResaveAllEppResourcesAction; +import google.registry.batch.ResaveAllEppResourcesPipelineAction; import google.registry.batch.ResaveEntityAction; import google.registry.batch.SendExpiringCertificateNotificationEmailAction; import google.registry.batch.WipeOutCloudSqlAction; @@ -196,6 +197,8 @@ interface BackendRequestComponent { ResaveAllEppResourcesAction resaveAllEppResourcesAction(); + ResaveAllEppResourcesPipelineAction resaveAllEppResourcesPipelineAction(); + ResaveEntityAction resaveEntityAction(); SendExpiringCertificateNotificationEmailAction sendExpiringCertificateNotificationEmailAction(); diff --git a/core/src/main/resources/google/registry/beam/resave_all_epp_resources_pipeline.json b/core/src/main/resources/google/registry/beam/resave_all_epp_resources_pipeline.json new file mode 100644 index 000000000..5899e2488 --- /dev/null +++ b/core/src/main/resources/google/registry/beam/resave_all_epp_resources_pipeline.json @@ -0,0 +1,30 @@ +{ + "name": "Resave all EPP Resources", + "description": "An Apache Beam pipeline that resaves all (or potentially only changed) EPP resources", + "parameters": [ + { + "name": "registryEnvironment", + "label": "The Registry environment.", + "helpText": "The Registry environment.", + "is_optional": false, + "regexes": [ + "^PRODUCTION|SANDBOX|CRASH|QA|ALPHA$" + ] + }, + { + "name": "isolationOverride", + "label": "The desired SQL transaction isolation level.", + "helpText": "The desired SQL transaction isolation level.", + "is_optional": true, + "regexes": [ + "^[0-9A-Z_]+$" + ] + }, + { + "name": "fast", + "label": "Whether or not to attempt to only save changed resources", + "helpText": "If true, we will attempt to only save resources that possibly have expired transfers, grace periods, etc", + "is_optional": false + } + ] +} diff --git a/core/src/test/java/google/registry/batch/ResaveAllEppResourcesActionTest.java b/core/src/test/java/google/registry/batch/ResaveAllEppResourcesActionTest.java index d11fc7799..a4cda8f28 100644 --- a/core/src/test/java/google/registry/batch/ResaveAllEppResourcesActionTest.java +++ b/core/src/test/java/google/registry/batch/ResaveAllEppResourcesActionTest.java @@ -20,6 +20,7 @@ import static google.registry.testing.DatabaseHelper.persistActiveContact; import static google.registry.testing.DatabaseHelper.persistContactWithPendingTransfer; import static org.joda.time.DateTimeZone.UTC; +import google.registry.model.annotations.DeleteAfterMigration; import google.registry.model.contact.ContactResource; import google.registry.model.transfer.TransferStatus; import google.registry.testing.FakeResponse; @@ -29,6 +30,9 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; /** Unit tests for {@link ResaveAllEppResourcesAction}. */ +// No longer needed in SQL. Subject to future removal. +@Deprecated +@DeleteAfterMigration class ResaveAllEppResourcesActionTest extends MapreduceTestCase { @BeforeEach diff --git a/core/src/test/java/google/registry/batch/ResaveAllEppResourcesPipelineActionTest.java b/core/src/test/java/google/registry/batch/ResaveAllEppResourcesPipelineActionTest.java new file mode 100644 index 000000000..76fd3de31 --- /dev/null +++ b/core/src/test/java/google/registry/batch/ResaveAllEppResourcesPipelineActionTest.java @@ -0,0 +1,84 @@ +// Copyright 2022 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.batch; + +import static com.google.common.truth.Truth.assertThat; +import static google.registry.batch.ResaveAllEppResourcesPipelineAction.PARAM_FAST; +import static google.registry.batch.ResaveAllEppResourcesPipelineAction.PIPELINE_NAME; +import static google.registry.beam.BeamUtils.createJobName; +import static javax.servlet.http.HttpServletResponse.SC_OK; +import static org.mockito.Mockito.verify; + +import com.google.api.services.dataflow.model.LaunchFlexTemplateParameter; +import com.google.api.services.dataflow.model.LaunchFlexTemplateRequest; +import com.google.common.collect.ImmutableMap; +import google.registry.beam.BeamActionTestBase; +import google.registry.config.RegistryEnvironment; +import google.registry.testing.AppEngineExtension; +import google.registry.testing.FakeClock; +import java.util.Optional; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +/** Unit tests for {@link ResaveAllEppResourcesPipelineAction}. */ +public class ResaveAllEppResourcesPipelineActionTest extends BeamActionTestBase { + + @RegisterExtension + final AppEngineExtension appEngine = + AppEngineExtension.builder().withDatastoreAndCloudSql().build(); + + private final FakeClock fakeClock = new FakeClock(); + + private ResaveAllEppResourcesPipelineAction createAction(boolean isFast) { + return new ResaveAllEppResourcesPipelineAction( + "test-project", + "test-region", + "staging-bucket", + Optional.of(isFast), + fakeClock, + response, + dataflow); + } + + @Test + void testLaunch_notFast() throws Exception { + createAction(false).run(); + assertThat(response.getStatus()).isEqualTo(SC_OK); + assertThat(response.getPayload()).isEqualTo("Launched resaveAllEppResources pipeline: jobid"); + verify(templates).launch("test-project", "test-region", createLaunchTemplateRequest(false)); + } + + @Test + void testLaunch_fast() throws Exception { + createAction(true).run(); + assertThat(response.getStatus()).isEqualTo(SC_OK); + assertThat(response.getPayload()).isEqualTo("Launched resaveAllEppResources pipeline: jobid"); + verify(templates).launch("test-project", "test-region", createLaunchTemplateRequest(true)); + } + + private LaunchFlexTemplateRequest createLaunchTemplateRequest(boolean isFast) { + return new LaunchFlexTemplateRequest() + .setLaunchParameter( + new LaunchFlexTemplateParameter() + .setJobName(createJobName("resave-all-epp-resources", fakeClock)) + .setContainerSpecGcsPath( + String.format("%s/%s_metadata.json", "staging-bucket", PIPELINE_NAME)) + .setParameters( + new ImmutableMap.Builder() + .put(PARAM_FAST, Boolean.toString(isFast)) + .put("registryEnvironment", RegistryEnvironment.get().name()) + .build())); + } +} diff --git a/core/src/test/java/google/registry/beam/resave/ResaveAllEppResourcesPipelineTest.java b/core/src/test/java/google/registry/beam/resave/ResaveAllEppResourcesPipelineTest.java new file mode 100644 index 000000000..f5f12978a --- /dev/null +++ b/core/src/test/java/google/registry/beam/resave/ResaveAllEppResourcesPipelineTest.java @@ -0,0 +1,204 @@ +// Copyright 2022 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam.resave; + +import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static com.google.common.truth.Truth.assertThat; +import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; +import static google.registry.testing.DatabaseHelper.createTld; +import static google.registry.testing.DatabaseHelper.loadAllOf; +import static google.registry.testing.DatabaseHelper.loadByEntity; +import static google.registry.testing.DatabaseHelper.persistActiveContact; +import static google.registry.testing.DatabaseHelper.persistActiveDomain; +import static google.registry.testing.DatabaseHelper.persistContactWithPendingTransfer; +import static google.registry.testing.DatabaseHelper.persistDomainWithDependentResources; +import static google.registry.testing.DatabaseHelper.persistDomainWithPendingTransfer; +import static google.registry.testing.DatabaseHelper.persistNewRegistrars; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import google.registry.beam.TestPipelineExtension; +import google.registry.model.EppResource; +import google.registry.model.contact.ContactResource; +import google.registry.model.domain.DomainBase; +import google.registry.model.domain.GracePeriod; +import google.registry.model.eppcommon.StatusValue; +import google.registry.persistence.transaction.JpaTestExtensions; +import google.registry.persistence.transaction.JpaTestExtensions.JpaIntegrationTestExtension; +import google.registry.persistence.transaction.JpaTransactionManager; +import google.registry.persistence.transaction.TransactionManagerFactory; +import google.registry.testing.DatastoreEntityExtension; +import google.registry.testing.FakeClock; +import google.registry.testing.TmOverrideExtension; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.joda.time.DateTime; +import org.joda.time.Duration; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.mockito.ArgumentCaptor; + +/** Tests for {@link ResaveAllEppResourcesPipeline}. */ +public class ResaveAllEppResourcesPipelineTest { + + private final FakeClock fakeClock = new FakeClock(DateTime.parse("2020-03-10T00:00:00.000Z")); + + @RegisterExtension + @Order(Order.DEFAULT - 1) + final transient DatastoreEntityExtension datastore = + new DatastoreEntityExtension().allThreads(true); + + @RegisterExtension + final TestPipelineExtension testPipeline = + TestPipelineExtension.create().enableAbandonedNodeEnforcement(true); + + @RegisterExtension + final JpaIntegrationTestExtension database = + new JpaTestExtensions.Builder().withClock(fakeClock).buildIntegrationTestExtension(); + + @RegisterExtension + @Order(Order.DEFAULT + 1) + TmOverrideExtension tmOverrideExtension = TmOverrideExtension.withJpa(); + + private final ResaveAllEppResourcesPipelineOptions options = + PipelineOptionsFactory.create().as(ResaveAllEppResourcesPipelineOptions.class); + + @BeforeEach + void beforeEach() { + options.setFast(true); + persistNewRegistrars("TheRegistrar", "NewRegistrar"); + createTld("tld"); + } + + @Test + void testPipeline_unchangedEntity() { + ContactResource contact = persistActiveContact("test123"); + DateTime creationTime = contact.getUpdateTimestamp().getTimestamp(); + fakeClock.advanceOneMilli(); + assertThat(loadByEntity(contact).getUpdateTimestamp().getTimestamp()).isEqualTo(creationTime); + fakeClock.advanceOneMilli(); + runPipeline(); + assertThat(loadByEntity(contact)).isEqualTo(contact); + } + + @Test + void testPipeline_fulfilledContactTransfer() { + ContactResource contact = persistActiveContact("test123"); + DateTime now = fakeClock.nowUtc(); + contact = persistContactWithPendingTransfer(contact, now, now.plusDays(5), now); + fakeClock.advanceBy(Duration.standardDays(10)); + assertThat(loadByEntity(contact).getStatusValues()).contains(StatusValue.PENDING_TRANSFER); + runPipeline(); + assertThat(loadByEntity(contact).getStatusValues()) + .doesNotContain(StatusValue.PENDING_TRANSFER); + } + + @Test + void testPipeline_fulfilledDomainTransfer() { + options.setFast(true); + DateTime now = fakeClock.nowUtc(); + DomainBase domain = + persistDomainWithPendingTransfer( + persistDomainWithDependentResources( + "domain", + "tld", + persistActiveContact("jd1234"), + now.minusDays(5), + now.minusDays(5), + now.plusYears(2)), + now.minusDays(4), + now.minusDays(1), + now.plusYears(2)); + assertThat(domain.getStatusValues()).contains(StatusValue.PENDING_TRANSFER); + assertThat(domain.getUpdateTimestamp().getTimestamp()).isEqualTo(now); + fakeClock.advanceOneMilli(); + runPipeline(); + DomainBase postPipeline = loadByEntity(domain); + assertThat(postPipeline.getStatusValues()).doesNotContain(StatusValue.PENDING_TRANSFER); + assertThat(postPipeline.getUpdateTimestamp().getTimestamp()).isEqualTo(fakeClock.nowUtc()); + } + + @Test + void testPipeline_autorenewedDomain() { + DateTime now = fakeClock.nowUtc(); + DomainBase domain = + persistDomainWithDependentResources( + "domain", "tld", persistActiveContact("jd1234"), now, now, now.plusYears(1)); + assertThat(domain.getRegistrationExpirationTime()).isEqualTo(now.plusYears(1)); + fakeClock.advanceBy(Duration.standardDays(500)); + runPipeline(); + DomainBase postPipeline = loadByEntity(domain); + assertThat(postPipeline.getRegistrationExpirationTime()).isEqualTo(now.plusYears(2)); + } + + @Test + void testPipeline_expiredGracePeriod() { + DateTime now = fakeClock.nowUtc(); + persistDomainWithDependentResources( + "domain", "tld", persistActiveContact("jd1234"), now, now, now.plusYears(1)); + assertThat(loadAllOf(GracePeriod.class)).hasSize(1); + fakeClock.advanceBy(Duration.standardDays(500)); + runPipeline(); + assertThat(loadAllOf(GracePeriod.class)).isEmpty(); + } + + @Test + void testPipeline_fastOnlySavesChanged() { + DateTime now = fakeClock.nowUtc(); + ContactResource contact = persistActiveContact("jd1234"); + persistDomainWithDependentResources("renewed", "tld", contact, now, now, now.plusYears(1)); + persistActiveDomain("nonrenewed.tld", now, now.plusYears(20)); + // Spy the transaction manager so we can be sure we're only saving the renewed domain + JpaTransactionManager spy = spy(jpaTm()); + TransactionManagerFactory.setJpaTm(() -> spy); + ArgumentCaptor domainPutCaptor = ArgumentCaptor.forClass(DomainBase.class); + runPipeline(); + // We should only be attempting to put the one changed domain into the DB + verify(spy).put(domainPutCaptor.capture()); + assertThat(domainPutCaptor.getValue().getDomainName()).isEqualTo("renewed.tld"); + } + + @Test + void testPipeline_notFastResavesAll() { + options.setFast(false); + DateTime now = fakeClock.nowUtc(); + ContactResource contact = persistActiveContact("jd1234"); + DomainBase renewed = + persistDomainWithDependentResources("renewed", "tld", contact, now, now, now.plusYears(1)); + DomainBase nonRenewed = + persistDomainWithDependentResources( + "nonrenewed", "tld", contact, now, now, now.plusYears(20)); + // Spy the transaction manager so we can be sure we're attempting to save everything + JpaTransactionManager spy = spy(jpaTm()); + TransactionManagerFactory.setJpaTm(() -> spy); + ArgumentCaptor eppResourcePutCaptor = ArgumentCaptor.forClass(EppResource.class); + runPipeline(); + // We should be attempting to put both domains (and the contact) in, even the unchanged ones + verify(spy, times(3)).put(eppResourcePutCaptor.capture()); + assertThat( + eppResourcePutCaptor.getAllValues().stream() + .map(EppResource::getRepoId) + .collect(toImmutableSet())) + .containsExactly(contact.getRepoId(), renewed.getRepoId(), nonRenewed.getRepoId()); + } + + private void runPipeline() { + ResaveAllEppResourcesPipeline pipeline = new ResaveAllEppResourcesPipeline(options); + pipeline.setupPipeline(testPipeline); + testPipeline.run().waitUntilFinish(); + } +} diff --git a/core/src/test/resources/google/registry/module/backend/backend_routing.txt b/core/src/test/resources/google/registry/module/backend/backend_routing.txt index 3228ab536..41b3ce86d 100644 --- a/core/src/test/resources/google/registry/module/backend/backend_routing.txt +++ b/core/src/test/resources/google/registry/module/backend/backend_routing.txt @@ -37,6 +37,7 @@ PATH CLASS /_dr/task/relockDomain RelockDomainAction POST y INTERNAL,API APP ADMIN /_dr/task/replayCommitLogsToSql ReplayCommitLogsToSqlAction POST y INTERNAL,API APP ADMIN /_dr/task/resaveAllEppResources ResaveAllEppResourcesAction GET n INTERNAL,API APP ADMIN +/_dr/task/resaveAllEppResourcesPipeline ResaveAllEppResourcesPipelineAction GET n INTERNAL,API APP ADMIN /_dr/task/resaveEntity ResaveEntityAction POST n INTERNAL,API APP ADMIN /_dr/task/sendExpiringCertificateNotificationEmail SendExpiringCertificateNotificationEmailAction GET n INTERNAL,API APP ADMIN /_dr/task/syncDatastoreToSqlSnapshot SyncDatastoreToSqlSnapshotAction POST n INTERNAL,API APP ADMIN diff --git a/release/cloudbuild-nomulus.yaml b/release/cloudbuild-nomulus.yaml index 0903cca92..3fa4023f5 100644 --- a/release/cloudbuild-nomulus.yaml +++ b/release/cloudbuild-nomulus.yaml @@ -98,7 +98,9 @@ steps: google.registry.beam.rde.RdePipeline \ google/registry/beam/rde_pipeline_metadata.json \ google.registry.beam.comparedb.ValidateDatabasePipeline \ - google/registry/beam/validate_database_pipeline_metadata.json + google/registry/beam/validate_database_pipeline_metadata.json \ + google.registry.beam.resave.ResaveAllEppResourcesPipeline \ + google/registry/beam/resave_all_epp_resources_pipeline_metadata.json # Tentatively build and publish Cloud SQL schema jar here, before schema release # process is finalized. Also publish nomulus:core jars that are needed for # server/schema compatibility tests.