From 9939833c2536b21bdc85b9a8ef2f62586a87ed1c Mon Sep 17 00:00:00 2001 From: gbrodman Date: Fri, 15 Apr 2022 15:46:35 -0400 Subject: [PATCH] Create a Dataflow pipeline to resave EPP resources (#1553) * Create a Dataflow pipeline to resave EPP resources This has two modes. If `fast` is false, then we will just load all EPP resources, project them to the current time, and save them. If `fast` is true, we will attempt to intelligently load and save only resources that we expect to have changes applied when we project them to the current time. This means resources with pending transfers that have expired, domains with expired grace periods, and non-deleted domains that have expired (we expect that they autorenewed). --- core/build.gradle | 5 + .../google/registry/batch/BatchModule.java | 6 + .../ResaveAllEppResourcesPipelineAction.java | 129 +++++++++++ .../beam/initsql/InitSqlPipeline.java | 2 +- .../resave/ResaveAllEppResourcesPipeline.java | 174 +++++++++++++++ .../ResaveAllEppResourcesPipelineOptions.java | 26 +++ .../env/common/backend/WEB-INF/web.xml | 6 + .../backend/BackendRequestComponent.java | 3 + .../resave_all_epp_resources_pipeline.json | 30 +++ .../ResaveAllEppResourcesActionTest.java | 4 + ...saveAllEppResourcesPipelineActionTest.java | 84 ++++++++ .../ResaveAllEppResourcesPipelineTest.java | 204 ++++++++++++++++++ .../module/backend/backend_routing.txt | 1 + release/cloudbuild-nomulus.yaml | 4 +- 14 files changed, 676 insertions(+), 2 deletions(-) create mode 100644 core/src/main/java/google/registry/batch/ResaveAllEppResourcesPipelineAction.java create mode 100644 core/src/main/java/google/registry/beam/resave/ResaveAllEppResourcesPipeline.java create mode 100644 core/src/main/java/google/registry/beam/resave/ResaveAllEppResourcesPipelineOptions.java create mode 100644 core/src/main/resources/google/registry/beam/resave_all_epp_resources_pipeline.json create mode 100644 core/src/test/java/google/registry/batch/ResaveAllEppResourcesPipelineActionTest.java create mode 100644 core/src/test/java/google/registry/beam/resave/ResaveAllEppResourcesPipelineTest.java diff --git a/core/build.gradle b/core/build.gradle index b98a8219c..60ffbba89 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -800,6 +800,11 @@ if (environment == 'alpha') { mainClass: 'google.registry.beam.comparedb.ValidateDatabasePipeline', metaData: 'google/registry/beam/validate_database_pipeline_metadata.json' ], + resaveAllEppResources: + [ + mainClass: 'google.registry.beam.resave.ResaveAllEppResourcesPipeline', + metaData: 'google/registry/beam/resave_all_epp_resources_pipeline_metadata.json' + ], ] project.tasks.create("stageBeamPipelines") { doLast { diff --git a/core/src/main/java/google/registry/batch/BatchModule.java b/core/src/main/java/google/registry/batch/BatchModule.java index 8e12a4f9f..18cb2fec7 100644 --- a/core/src/main/java/google/registry/batch/BatchModule.java +++ b/core/src/main/java/google/registry/batch/BatchModule.java @@ -112,6 +112,12 @@ public class BatchModule { req, ExpandRecurringBillingEventsAction.PARAM_CURSOR_TIME); } + @Provides + @Parameter(ResaveAllEppResourcesPipelineAction.PARAM_FAST) + static Optional provideIsFast(HttpServletRequest req) { + return extractOptionalBooleanParameter(req, ResaveAllEppResourcesPipelineAction.PARAM_FAST); + } + @Provides @Named(QUEUE_ASYNC_ACTIONS) static Queue provideAsyncActionsPushQueue() { diff --git a/core/src/main/java/google/registry/batch/ResaveAllEppResourcesPipelineAction.java b/core/src/main/java/google/registry/batch/ResaveAllEppResourcesPipelineAction.java new file mode 100644 index 000000000..b23d0413f --- /dev/null +++ b/core/src/main/java/google/registry/batch/ResaveAllEppResourcesPipelineAction.java @@ -0,0 +1,129 @@ +// Copyright 2022 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.batch; + +import static google.registry.beam.BeamUtils.createJobName; +import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR; +import static javax.servlet.http.HttpServletResponse.SC_OK; + +import com.google.api.services.dataflow.Dataflow; +import com.google.api.services.dataflow.model.LaunchFlexTemplateParameter; +import com.google.api.services.dataflow.model.LaunchFlexTemplateRequest; +import com.google.api.services.dataflow.model.LaunchFlexTemplateResponse; +import com.google.common.collect.ImmutableMap; +import com.google.common.flogger.FluentLogger; +import com.google.common.net.MediaType; +import google.registry.config.RegistryConfig.Config; +import google.registry.config.RegistryEnvironment; +import google.registry.request.Action; +import google.registry.request.Parameter; +import google.registry.request.Response; +import google.registry.request.auth.Auth; +import google.registry.util.Clock; +import java.util.Optional; +import javax.inject.Inject; + +/** + * Starts a Dataflow pipeline that resaves all EPP resources projected to the current time. + * + *

This is useful for a few situations. First, as a fallback option for resource transfers that + * have expired pending transfers (this will resolve them), just in case the enqueued action fails. + * Second, it will reflect domain autorenews that have happened. Third, it will remove any expired + * grace periods. + * + *

There's also the general principle that it's good to have the data in the database remain as + * current as is reasonably possible. + * + *

If the ?isFast=true query string parameter is passed as true, the pipeline will + * only attempt to load, project, and resave entities where we expect one of the previous situations + * has occurred. Otherwise, we will load, project, and resave all EPP resources. + * + *

This runs the {@link google.registry.beam.resave.ResaveAllEppResourcesPipeline}. + */ +@Action( + service = Action.Service.BACKEND, + path = ResaveAllEppResourcesPipelineAction.PATH, + auth = Auth.AUTH_INTERNAL_OR_ADMIN) +public class ResaveAllEppResourcesPipelineAction implements Runnable { + + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); + + static final String PATH = "/_dr/task/resaveAllEppResourcesPipeline"; + static final String PIPELINE_NAME = "resave_all_epp_resources_pipeline"; + + public static final String PARAM_FAST = "fast"; + + private final String projectId; + private final String jobRegion; + private final String stagingBucketUrl; + private final boolean fast; + private final Clock clock; + private final Response response; + private final Dataflow dataflow; + + @Inject + ResaveAllEppResourcesPipelineAction( + @Config("projectId") String projectId, + @Config("defaultJobRegion") String jobRegion, + @Config("beamStagingBucketUrl") String stagingBucketUrl, + @Parameter(PARAM_FAST) Optional fast, + Clock clock, + Response response, + Dataflow dataflow) { + this.projectId = projectId; + this.jobRegion = jobRegion; + this.stagingBucketUrl = stagingBucketUrl; + this.fast = fast.orElse(false); + this.clock = clock; + this.response = response; + this.dataflow = dataflow; + } + + @Override + public void run() { + response.setContentType(MediaType.PLAIN_TEXT_UTF_8); + logger.atInfo().log("Launching ResaveAllEppResourcesPipeline"); + try { + LaunchFlexTemplateParameter parameter = + new LaunchFlexTemplateParameter() + .setJobName(createJobName("resave-all-epp-resources", clock)) + .setContainerSpecGcsPath( + String.format("%s/%s_metadata.json", stagingBucketUrl, PIPELINE_NAME)) + .setParameters( + new ImmutableMap.Builder() + .put(PARAM_FAST, Boolean.toString(fast)) + .put("registryEnvironment", RegistryEnvironment.get().name()) + .build()); + LaunchFlexTemplateResponse launchResponse = + dataflow + .projects() + .locations() + .flexTemplates() + .launch( + projectId, + jobRegion, + new LaunchFlexTemplateRequest().setLaunchParameter(parameter)) + .execute(); + logger.atInfo().log("Got response: %s", launchResponse.getJob().toPrettyString()); + String jobId = launchResponse.getJob().getId(); + response.setStatus(SC_OK); + response.setPayload(String.format("Launched resaveAllEppResources pipeline: %s", jobId)); + } catch (Exception e) { + logger.atSevere().withCause(e).log("Template Launch failed."); + response.setStatus(SC_INTERNAL_SERVER_ERROR); + response.setPayload(String.format("Pipeline launch failed: %s", e.getMessage())); + } + } +} diff --git a/core/src/main/java/google/registry/beam/initsql/InitSqlPipeline.java b/core/src/main/java/google/registry/beam/initsql/InitSqlPipeline.java index e042d219a..3f17dcc3e 100644 --- a/core/src/main/java/google/registry/beam/initsql/InitSqlPipeline.java +++ b/core/src/main/java/google/registry/beam/initsql/InitSqlPipeline.java @@ -158,7 +158,7 @@ public class InitSqlPipeline implements Serializable { .addAll(toKindStrings(PHASE_TWO_ORDERED)) .build())); - // Set up the pipeline to write entity kinds from PHASE_ONE_ORDERED to SQL. Return a object + // Set up the pipeline to write entity kinds from PHASE_ONE_ORDERED to SQL. Return an object // that signals the completion of the phase. PCollection blocker = scheduleOnePhaseWrites(datastoreSnapshot, PHASE_ONE_ORDERED, Optional.empty(), null); diff --git a/core/src/main/java/google/registry/beam/resave/ResaveAllEppResourcesPipeline.java b/core/src/main/java/google/registry/beam/resave/ResaveAllEppResourcesPipeline.java new file mode 100644 index 000000000..a2ad13fcf --- /dev/null +++ b/core/src/main/java/google/registry/beam/resave/ResaveAllEppResourcesPipeline.java @@ -0,0 +1,174 @@ +// Copyright 2022 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam.resave; + +import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; +import static org.apache.beam.sdk.values.TypeDescriptors.integers; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import google.registry.beam.common.RegistryJpaIO; +import google.registry.beam.common.RegistryJpaIO.Read; +import google.registry.model.EppResource; +import google.registry.model.contact.ContactResource; +import google.registry.model.domain.DomainBase; +import google.registry.model.host.HostResource; +import google.registry.persistence.PersistenceModule.TransactionIsolationLevel; +import google.registry.persistence.transaction.CriteriaQueryBuilder; +import google.registry.util.DateTimeUtils; +import java.io.Serializable; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupIntoBatches; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.util.ShardedKey; +import org.apache.beam.sdk.values.KV; +import org.joda.time.DateTime; + +/** + * A Dataflow Flex pipeline that resaves changed EPP resources in SQL. + * + *

Due to the way that Hibernate works, if an entity is unchanged by {@link + * EppResource#cloneProjectedAtTime(DateTime)} it will not actually be re-persisted to the database. + * Thus, the only actual changes occur when objects are changed by projecting them to now, such as + * when a pending transfer is resolved. + */ +public class ResaveAllEppResourcesPipeline implements Serializable { + + private static final ImmutableSet> EPP_RESOURCE_CLASSES = + ImmutableSet.of(ContactResource.class, DomainBase.class, HostResource.class); + + /** + * There exist three possible situations where we know we'll want to project domains to the + * current point in time: + * + *

+ * + *

This command contains all three scenarios so that we can avoid querying the Domain table + * multiple times, and to avoid projecting and resaving the same domain multiple times. + */ + private static final String DOMAINS_TO_PROJECT_QUERY = + "FROM Domain d WHERE (d.transferData.transferStatus = 'PENDING' AND" + + " d.transferData.pendingTransferExpirationTime < current_timestamp()) OR" + + " (d.registrationExpirationTime < current_timestamp() AND d.deletionTime =" + + " (:END_OF_TIME)) OR (EXISTS (SELECT 1 FROM GracePeriod gp WHERE gp.domainRepoId =" + + " d.repoId AND gp.expirationTime < current_timestamp()))"; + + private final ResaveAllEppResourcesPipelineOptions options; + + ResaveAllEppResourcesPipeline(ResaveAllEppResourcesPipelineOptions options) { + this.options = options; + } + + PipelineResult run() { + Pipeline pipeline = Pipeline.create(options); + setupPipeline(pipeline); + return pipeline.run(); + } + + void setupPipeline(Pipeline pipeline) { + options.setIsolationOverride(TransactionIsolationLevel.TRANSACTION_READ_COMMITTED); + if (options.getFast()) { + fastResaveContacts(pipeline); + fastResaveDomains(pipeline); + } else { + EPP_RESOURCE_CLASSES.forEach(clazz -> forceResaveAllResources(pipeline, clazz)); + } + } + + /** Projects to the current time and saves any contacts with expired transfers. */ + private void fastResaveContacts(Pipeline pipeline) { + Read read = + RegistryJpaIO.read( + "FROM Contact WHERE transferData.transferStatus = 'PENDING' AND" + + " transferData.pendingTransferExpirationTime < current_timestamp()", + ContactResource.class, + c -> c); + projectAndResaveResources(pipeline, ContactResource.class, read); + } + + /** + * Projects to the current time and saves any domains with expired pending actions (e.g. + * transfers, grace periods). + * + *

The logic of what might have changed is paraphrased from {@link + * google.registry.model.domain.DomainContent#cloneProjectedAtTime(DateTime)}. + */ + private void fastResaveDomains(Pipeline pipeline) { + Read read = + RegistryJpaIO.read( + DOMAINS_TO_PROJECT_QUERY, + ImmutableMap.of("END_OF_TIME", DateTimeUtils.END_OF_TIME), + DomainBase.class, + d -> d); + projectAndResaveResources(pipeline, DomainBase.class, read); + } + + /** Projects all resources to the current time and saves them. */ + private void forceResaveAllResources(Pipeline pipeline, Class clazz) { + Read read = RegistryJpaIO.read(() -> CriteriaQueryBuilder.create(clazz).build()); + projectAndResaveResources(pipeline, clazz, read); + } + + /** Projects and re-saves the result of the provided {@link Read}. */ + private void projectAndResaveResources( + Pipeline pipeline, Class clazz, Read read) { + int numShards = options.getSqlWriteShards(); + int batchSize = options.getSqlWriteBatchSize(); + String className = clazz.getSimpleName(); + pipeline + .apply("Read " + className, read) + .apply( + "Shard data for class" + className, + WithKeys.of(e -> ThreadLocalRandom.current().nextInt(numShards)) + .withKeyType(integers())) + .apply( + "Group into batches for class" + className, + GroupIntoBatches.ofSize(batchSize).withShardedKey()) + .apply("Map " + className + " to now", ParDo.of(new BatchedProjectionFunction<>())) + .apply( + "Write transformed " + className, + RegistryJpaIO.write() + .withName("Write transformed " + className) + .withBatchSize(batchSize) + .withShards(numShards)); + } + + private static class BatchedProjectionFunction + extends DoFn, Iterable>, EppResource> { + + @ProcessElement + public void processElement( + @Element KV, Iterable> element, + OutputReceiver outputReceiver) { + jpaTm() + .transact( + () -> + element + .getValue() + .forEach( + resource -> + outputReceiver.output( + resource.cloneProjectedAtTime(jpaTm().getTransactionTime())))); + } + } +} diff --git a/core/src/main/java/google/registry/beam/resave/ResaveAllEppResourcesPipelineOptions.java b/core/src/main/java/google/registry/beam/resave/ResaveAllEppResourcesPipelineOptions.java new file mode 100644 index 000000000..4dc57cb7e --- /dev/null +++ b/core/src/main/java/google/registry/beam/resave/ResaveAllEppResourcesPipelineOptions.java @@ -0,0 +1,26 @@ +// Copyright 2022 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam.resave; + +import google.registry.beam.common.RegistryPipelineOptions; +import org.apache.beam.sdk.options.Description; + +public interface ResaveAllEppResourcesPipelineOptions extends RegistryPipelineOptions { + + @Description("True if we should attempt to run only over potentially out-of-date EPP resources") + boolean getFast(); + + void setFast(boolean fast); +} diff --git a/core/src/main/java/google/registry/env/common/backend/WEB-INF/web.xml b/core/src/main/java/google/registry/env/common/backend/WEB-INF/web.xml index f6b48a58a..a381b00ec 100644 --- a/core/src/main/java/google/registry/env/common/backend/WEB-INF/web.xml +++ b/core/src/main/java/google/registry/env/common/backend/WEB-INF/web.xml @@ -316,6 +316,12 @@ /_dr/task/resaveAllEppResources + + + backend-servlet + /_dr/task/resaveAllEppResourcesPipeline + + backend-servlet diff --git a/core/src/main/java/google/registry/module/backend/BackendRequestComponent.java b/core/src/main/java/google/registry/module/backend/BackendRequestComponent.java index 1f4580051..ce9822a1c 100644 --- a/core/src/main/java/google/registry/module/backend/BackendRequestComponent.java +++ b/core/src/main/java/google/registry/module/backend/BackendRequestComponent.java @@ -31,6 +31,7 @@ import google.registry.batch.ExpandRecurringBillingEventsAction; import google.registry.batch.RefreshDnsOnHostRenameAction; import google.registry.batch.RelockDomainAction; import google.registry.batch.ResaveAllEppResourcesAction; +import google.registry.batch.ResaveAllEppResourcesPipelineAction; import google.registry.batch.ResaveEntityAction; import google.registry.batch.SendExpiringCertificateNotificationEmailAction; import google.registry.batch.WipeOutCloudSqlAction; @@ -196,6 +197,8 @@ interface BackendRequestComponent { ResaveAllEppResourcesAction resaveAllEppResourcesAction(); + ResaveAllEppResourcesPipelineAction resaveAllEppResourcesPipelineAction(); + ResaveEntityAction resaveEntityAction(); SendExpiringCertificateNotificationEmailAction sendExpiringCertificateNotificationEmailAction(); diff --git a/core/src/main/resources/google/registry/beam/resave_all_epp_resources_pipeline.json b/core/src/main/resources/google/registry/beam/resave_all_epp_resources_pipeline.json new file mode 100644 index 000000000..5899e2488 --- /dev/null +++ b/core/src/main/resources/google/registry/beam/resave_all_epp_resources_pipeline.json @@ -0,0 +1,30 @@ +{ + "name": "Resave all EPP Resources", + "description": "An Apache Beam pipeline that resaves all (or potentially only changed) EPP resources", + "parameters": [ + { + "name": "registryEnvironment", + "label": "The Registry environment.", + "helpText": "The Registry environment.", + "is_optional": false, + "regexes": [ + "^PRODUCTION|SANDBOX|CRASH|QA|ALPHA$" + ] + }, + { + "name": "isolationOverride", + "label": "The desired SQL transaction isolation level.", + "helpText": "The desired SQL transaction isolation level.", + "is_optional": true, + "regexes": [ + "^[0-9A-Z_]+$" + ] + }, + { + "name": "fast", + "label": "Whether or not to attempt to only save changed resources", + "helpText": "If true, we will attempt to only save resources that possibly have expired transfers, grace periods, etc", + "is_optional": false + } + ] +} diff --git a/core/src/test/java/google/registry/batch/ResaveAllEppResourcesActionTest.java b/core/src/test/java/google/registry/batch/ResaveAllEppResourcesActionTest.java index d11fc7799..a4cda8f28 100644 --- a/core/src/test/java/google/registry/batch/ResaveAllEppResourcesActionTest.java +++ b/core/src/test/java/google/registry/batch/ResaveAllEppResourcesActionTest.java @@ -20,6 +20,7 @@ import static google.registry.testing.DatabaseHelper.persistActiveContact; import static google.registry.testing.DatabaseHelper.persistContactWithPendingTransfer; import static org.joda.time.DateTimeZone.UTC; +import google.registry.model.annotations.DeleteAfterMigration; import google.registry.model.contact.ContactResource; import google.registry.model.transfer.TransferStatus; import google.registry.testing.FakeResponse; @@ -29,6 +30,9 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; /** Unit tests for {@link ResaveAllEppResourcesAction}. */ +// No longer needed in SQL. Subject to future removal. +@Deprecated +@DeleteAfterMigration class ResaveAllEppResourcesActionTest extends MapreduceTestCase { @BeforeEach diff --git a/core/src/test/java/google/registry/batch/ResaveAllEppResourcesPipelineActionTest.java b/core/src/test/java/google/registry/batch/ResaveAllEppResourcesPipelineActionTest.java new file mode 100644 index 000000000..76fd3de31 --- /dev/null +++ b/core/src/test/java/google/registry/batch/ResaveAllEppResourcesPipelineActionTest.java @@ -0,0 +1,84 @@ +// Copyright 2022 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.batch; + +import static com.google.common.truth.Truth.assertThat; +import static google.registry.batch.ResaveAllEppResourcesPipelineAction.PARAM_FAST; +import static google.registry.batch.ResaveAllEppResourcesPipelineAction.PIPELINE_NAME; +import static google.registry.beam.BeamUtils.createJobName; +import static javax.servlet.http.HttpServletResponse.SC_OK; +import static org.mockito.Mockito.verify; + +import com.google.api.services.dataflow.model.LaunchFlexTemplateParameter; +import com.google.api.services.dataflow.model.LaunchFlexTemplateRequest; +import com.google.common.collect.ImmutableMap; +import google.registry.beam.BeamActionTestBase; +import google.registry.config.RegistryEnvironment; +import google.registry.testing.AppEngineExtension; +import google.registry.testing.FakeClock; +import java.util.Optional; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +/** Unit tests for {@link ResaveAllEppResourcesPipelineAction}. */ +public class ResaveAllEppResourcesPipelineActionTest extends BeamActionTestBase { + + @RegisterExtension + final AppEngineExtension appEngine = + AppEngineExtension.builder().withDatastoreAndCloudSql().build(); + + private final FakeClock fakeClock = new FakeClock(); + + private ResaveAllEppResourcesPipelineAction createAction(boolean isFast) { + return new ResaveAllEppResourcesPipelineAction( + "test-project", + "test-region", + "staging-bucket", + Optional.of(isFast), + fakeClock, + response, + dataflow); + } + + @Test + void testLaunch_notFast() throws Exception { + createAction(false).run(); + assertThat(response.getStatus()).isEqualTo(SC_OK); + assertThat(response.getPayload()).isEqualTo("Launched resaveAllEppResources pipeline: jobid"); + verify(templates).launch("test-project", "test-region", createLaunchTemplateRequest(false)); + } + + @Test + void testLaunch_fast() throws Exception { + createAction(true).run(); + assertThat(response.getStatus()).isEqualTo(SC_OK); + assertThat(response.getPayload()).isEqualTo("Launched resaveAllEppResources pipeline: jobid"); + verify(templates).launch("test-project", "test-region", createLaunchTemplateRequest(true)); + } + + private LaunchFlexTemplateRequest createLaunchTemplateRequest(boolean isFast) { + return new LaunchFlexTemplateRequest() + .setLaunchParameter( + new LaunchFlexTemplateParameter() + .setJobName(createJobName("resave-all-epp-resources", fakeClock)) + .setContainerSpecGcsPath( + String.format("%s/%s_metadata.json", "staging-bucket", PIPELINE_NAME)) + .setParameters( + new ImmutableMap.Builder() + .put(PARAM_FAST, Boolean.toString(isFast)) + .put("registryEnvironment", RegistryEnvironment.get().name()) + .build())); + } +} diff --git a/core/src/test/java/google/registry/beam/resave/ResaveAllEppResourcesPipelineTest.java b/core/src/test/java/google/registry/beam/resave/ResaveAllEppResourcesPipelineTest.java new file mode 100644 index 000000000..f5f12978a --- /dev/null +++ b/core/src/test/java/google/registry/beam/resave/ResaveAllEppResourcesPipelineTest.java @@ -0,0 +1,204 @@ +// Copyright 2022 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam.resave; + +import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static com.google.common.truth.Truth.assertThat; +import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; +import static google.registry.testing.DatabaseHelper.createTld; +import static google.registry.testing.DatabaseHelper.loadAllOf; +import static google.registry.testing.DatabaseHelper.loadByEntity; +import static google.registry.testing.DatabaseHelper.persistActiveContact; +import static google.registry.testing.DatabaseHelper.persistActiveDomain; +import static google.registry.testing.DatabaseHelper.persistContactWithPendingTransfer; +import static google.registry.testing.DatabaseHelper.persistDomainWithDependentResources; +import static google.registry.testing.DatabaseHelper.persistDomainWithPendingTransfer; +import static google.registry.testing.DatabaseHelper.persistNewRegistrars; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import google.registry.beam.TestPipelineExtension; +import google.registry.model.EppResource; +import google.registry.model.contact.ContactResource; +import google.registry.model.domain.DomainBase; +import google.registry.model.domain.GracePeriod; +import google.registry.model.eppcommon.StatusValue; +import google.registry.persistence.transaction.JpaTestExtensions; +import google.registry.persistence.transaction.JpaTestExtensions.JpaIntegrationTestExtension; +import google.registry.persistence.transaction.JpaTransactionManager; +import google.registry.persistence.transaction.TransactionManagerFactory; +import google.registry.testing.DatastoreEntityExtension; +import google.registry.testing.FakeClock; +import google.registry.testing.TmOverrideExtension; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.joda.time.DateTime; +import org.joda.time.Duration; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.mockito.ArgumentCaptor; + +/** Tests for {@link ResaveAllEppResourcesPipeline}. */ +public class ResaveAllEppResourcesPipelineTest { + + private final FakeClock fakeClock = new FakeClock(DateTime.parse("2020-03-10T00:00:00.000Z")); + + @RegisterExtension + @Order(Order.DEFAULT - 1) + final transient DatastoreEntityExtension datastore = + new DatastoreEntityExtension().allThreads(true); + + @RegisterExtension + final TestPipelineExtension testPipeline = + TestPipelineExtension.create().enableAbandonedNodeEnforcement(true); + + @RegisterExtension + final JpaIntegrationTestExtension database = + new JpaTestExtensions.Builder().withClock(fakeClock).buildIntegrationTestExtension(); + + @RegisterExtension + @Order(Order.DEFAULT + 1) + TmOverrideExtension tmOverrideExtension = TmOverrideExtension.withJpa(); + + private final ResaveAllEppResourcesPipelineOptions options = + PipelineOptionsFactory.create().as(ResaveAllEppResourcesPipelineOptions.class); + + @BeforeEach + void beforeEach() { + options.setFast(true); + persistNewRegistrars("TheRegistrar", "NewRegistrar"); + createTld("tld"); + } + + @Test + void testPipeline_unchangedEntity() { + ContactResource contact = persistActiveContact("test123"); + DateTime creationTime = contact.getUpdateTimestamp().getTimestamp(); + fakeClock.advanceOneMilli(); + assertThat(loadByEntity(contact).getUpdateTimestamp().getTimestamp()).isEqualTo(creationTime); + fakeClock.advanceOneMilli(); + runPipeline(); + assertThat(loadByEntity(contact)).isEqualTo(contact); + } + + @Test + void testPipeline_fulfilledContactTransfer() { + ContactResource contact = persistActiveContact("test123"); + DateTime now = fakeClock.nowUtc(); + contact = persistContactWithPendingTransfer(contact, now, now.plusDays(5), now); + fakeClock.advanceBy(Duration.standardDays(10)); + assertThat(loadByEntity(contact).getStatusValues()).contains(StatusValue.PENDING_TRANSFER); + runPipeline(); + assertThat(loadByEntity(contact).getStatusValues()) + .doesNotContain(StatusValue.PENDING_TRANSFER); + } + + @Test + void testPipeline_fulfilledDomainTransfer() { + options.setFast(true); + DateTime now = fakeClock.nowUtc(); + DomainBase domain = + persistDomainWithPendingTransfer( + persistDomainWithDependentResources( + "domain", + "tld", + persistActiveContact("jd1234"), + now.minusDays(5), + now.minusDays(5), + now.plusYears(2)), + now.minusDays(4), + now.minusDays(1), + now.plusYears(2)); + assertThat(domain.getStatusValues()).contains(StatusValue.PENDING_TRANSFER); + assertThat(domain.getUpdateTimestamp().getTimestamp()).isEqualTo(now); + fakeClock.advanceOneMilli(); + runPipeline(); + DomainBase postPipeline = loadByEntity(domain); + assertThat(postPipeline.getStatusValues()).doesNotContain(StatusValue.PENDING_TRANSFER); + assertThat(postPipeline.getUpdateTimestamp().getTimestamp()).isEqualTo(fakeClock.nowUtc()); + } + + @Test + void testPipeline_autorenewedDomain() { + DateTime now = fakeClock.nowUtc(); + DomainBase domain = + persistDomainWithDependentResources( + "domain", "tld", persistActiveContact("jd1234"), now, now, now.plusYears(1)); + assertThat(domain.getRegistrationExpirationTime()).isEqualTo(now.plusYears(1)); + fakeClock.advanceBy(Duration.standardDays(500)); + runPipeline(); + DomainBase postPipeline = loadByEntity(domain); + assertThat(postPipeline.getRegistrationExpirationTime()).isEqualTo(now.plusYears(2)); + } + + @Test + void testPipeline_expiredGracePeriod() { + DateTime now = fakeClock.nowUtc(); + persistDomainWithDependentResources( + "domain", "tld", persistActiveContact("jd1234"), now, now, now.plusYears(1)); + assertThat(loadAllOf(GracePeriod.class)).hasSize(1); + fakeClock.advanceBy(Duration.standardDays(500)); + runPipeline(); + assertThat(loadAllOf(GracePeriod.class)).isEmpty(); + } + + @Test + void testPipeline_fastOnlySavesChanged() { + DateTime now = fakeClock.nowUtc(); + ContactResource contact = persistActiveContact("jd1234"); + persistDomainWithDependentResources("renewed", "tld", contact, now, now, now.plusYears(1)); + persistActiveDomain("nonrenewed.tld", now, now.plusYears(20)); + // Spy the transaction manager so we can be sure we're only saving the renewed domain + JpaTransactionManager spy = spy(jpaTm()); + TransactionManagerFactory.setJpaTm(() -> spy); + ArgumentCaptor domainPutCaptor = ArgumentCaptor.forClass(DomainBase.class); + runPipeline(); + // We should only be attempting to put the one changed domain into the DB + verify(spy).put(domainPutCaptor.capture()); + assertThat(domainPutCaptor.getValue().getDomainName()).isEqualTo("renewed.tld"); + } + + @Test + void testPipeline_notFastResavesAll() { + options.setFast(false); + DateTime now = fakeClock.nowUtc(); + ContactResource contact = persistActiveContact("jd1234"); + DomainBase renewed = + persistDomainWithDependentResources("renewed", "tld", contact, now, now, now.plusYears(1)); + DomainBase nonRenewed = + persistDomainWithDependentResources( + "nonrenewed", "tld", contact, now, now, now.plusYears(20)); + // Spy the transaction manager so we can be sure we're attempting to save everything + JpaTransactionManager spy = spy(jpaTm()); + TransactionManagerFactory.setJpaTm(() -> spy); + ArgumentCaptor eppResourcePutCaptor = ArgumentCaptor.forClass(EppResource.class); + runPipeline(); + // We should be attempting to put both domains (and the contact) in, even the unchanged ones + verify(spy, times(3)).put(eppResourcePutCaptor.capture()); + assertThat( + eppResourcePutCaptor.getAllValues().stream() + .map(EppResource::getRepoId) + .collect(toImmutableSet())) + .containsExactly(contact.getRepoId(), renewed.getRepoId(), nonRenewed.getRepoId()); + } + + private void runPipeline() { + ResaveAllEppResourcesPipeline pipeline = new ResaveAllEppResourcesPipeline(options); + pipeline.setupPipeline(testPipeline); + testPipeline.run().waitUntilFinish(); + } +} diff --git a/core/src/test/resources/google/registry/module/backend/backend_routing.txt b/core/src/test/resources/google/registry/module/backend/backend_routing.txt index 3228ab536..41b3ce86d 100644 --- a/core/src/test/resources/google/registry/module/backend/backend_routing.txt +++ b/core/src/test/resources/google/registry/module/backend/backend_routing.txt @@ -37,6 +37,7 @@ PATH CLASS /_dr/task/relockDomain RelockDomainAction POST y INTERNAL,API APP ADMIN /_dr/task/replayCommitLogsToSql ReplayCommitLogsToSqlAction POST y INTERNAL,API APP ADMIN /_dr/task/resaveAllEppResources ResaveAllEppResourcesAction GET n INTERNAL,API APP ADMIN +/_dr/task/resaveAllEppResourcesPipeline ResaveAllEppResourcesPipelineAction GET n INTERNAL,API APP ADMIN /_dr/task/resaveEntity ResaveEntityAction POST n INTERNAL,API APP ADMIN /_dr/task/sendExpiringCertificateNotificationEmail SendExpiringCertificateNotificationEmailAction GET n INTERNAL,API APP ADMIN /_dr/task/syncDatastoreToSqlSnapshot SyncDatastoreToSqlSnapshotAction POST n INTERNAL,API APP ADMIN diff --git a/release/cloudbuild-nomulus.yaml b/release/cloudbuild-nomulus.yaml index 0903cca92..3fa4023f5 100644 --- a/release/cloudbuild-nomulus.yaml +++ b/release/cloudbuild-nomulus.yaml @@ -98,7 +98,9 @@ steps: google.registry.beam.rde.RdePipeline \ google/registry/beam/rde_pipeline_metadata.json \ google.registry.beam.comparedb.ValidateDatabasePipeline \ - google/registry/beam/validate_database_pipeline_metadata.json + google/registry/beam/validate_database_pipeline_metadata.json \ + google.registry.beam.resave.ResaveAllEppResourcesPipeline \ + google/registry/beam/resave_all_epp_resources_pipeline_metadata.json # Tentatively build and publish Cloud SQL schema jar here, before schema release # process is finalized. Also publish nomulus:core jars that are needed for # server/schema compatibility tests.