Create a Dataflow pipeline to resave EPP resources (#1553)

* Create a Dataflow pipeline to resave EPP resources

This has two modes.

If `fast` is false, then we will just load all EPP resources, project them to the current time, and save them.

If `fast` is true, we will attempt to intelligently load and save only resources that we expect to have changes applied when we project them to the current time. This means resources with pending transfers that have expired, domains with expired grace periods, and non-deleted domains that have expired (we expect that they autorenewed).
This commit is contained in:
gbrodman 2022-04-15 15:46:35 -04:00 committed by GitHub
parent 94017b694e
commit 9939833c25
14 changed files with 676 additions and 2 deletions

View file

@ -800,6 +800,11 @@ if (environment == 'alpha') {
mainClass: 'google.registry.beam.comparedb.ValidateDatabasePipeline',
metaData: 'google/registry/beam/validate_database_pipeline_metadata.json'
],
resaveAllEppResources:
[
mainClass: 'google.registry.beam.resave.ResaveAllEppResourcesPipeline',
metaData: 'google/registry/beam/resave_all_epp_resources_pipeline_metadata.json'
],
]
project.tasks.create("stageBeamPipelines") {
doLast {

View file

@ -112,6 +112,12 @@ public class BatchModule {
req, ExpandRecurringBillingEventsAction.PARAM_CURSOR_TIME);
}
@Provides
@Parameter(ResaveAllEppResourcesPipelineAction.PARAM_FAST)
static Optional<Boolean> provideIsFast(HttpServletRequest req) {
return extractOptionalBooleanParameter(req, ResaveAllEppResourcesPipelineAction.PARAM_FAST);
}
@Provides
@Named(QUEUE_ASYNC_ACTIONS)
static Queue provideAsyncActionsPushQueue() {

View file

@ -0,0 +1,129 @@
// Copyright 2022 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.batch;
import static google.registry.beam.BeamUtils.createJobName;
import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
import static javax.servlet.http.HttpServletResponse.SC_OK;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.LaunchFlexTemplateParameter;
import com.google.api.services.dataflow.model.LaunchFlexTemplateRequest;
import com.google.api.services.dataflow.model.LaunchFlexTemplateResponse;
import com.google.common.collect.ImmutableMap;
import com.google.common.flogger.FluentLogger;
import com.google.common.net.MediaType;
import google.registry.config.RegistryConfig.Config;
import google.registry.config.RegistryEnvironment;
import google.registry.request.Action;
import google.registry.request.Parameter;
import google.registry.request.Response;
import google.registry.request.auth.Auth;
import google.registry.util.Clock;
import java.util.Optional;
import javax.inject.Inject;
/**
* Starts a Dataflow pipeline that resaves all EPP resources projected to the current time.
*
* <p>This is useful for a few situations. First, as a fallback option for resource transfers that
* have expired pending transfers (this will resolve them), just in case the enqueued action fails.
* Second, it will reflect domain autorenews that have happened. Third, it will remove any expired
* grace periods.
*
* <p>There's also the general principle that it's good to have the data in the database remain as
* current as is reasonably possible.
*
* <p>If the <code>?isFast=true</code> query string parameter is passed as true, the pipeline will
* only attempt to load, project, and resave entities where we expect one of the previous situations
* has occurred. Otherwise, we will load, project, and resave all EPP resources.
*
* <p>This runs the {@link google.registry.beam.resave.ResaveAllEppResourcesPipeline}.
*/
@Action(
service = Action.Service.BACKEND,
path = ResaveAllEppResourcesPipelineAction.PATH,
auth = Auth.AUTH_INTERNAL_OR_ADMIN)
public class ResaveAllEppResourcesPipelineAction implements Runnable {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
static final String PATH = "/_dr/task/resaveAllEppResourcesPipeline";
static final String PIPELINE_NAME = "resave_all_epp_resources_pipeline";
public static final String PARAM_FAST = "fast";
private final String projectId;
private final String jobRegion;
private final String stagingBucketUrl;
private final boolean fast;
private final Clock clock;
private final Response response;
private final Dataflow dataflow;
@Inject
ResaveAllEppResourcesPipelineAction(
@Config("projectId") String projectId,
@Config("defaultJobRegion") String jobRegion,
@Config("beamStagingBucketUrl") String stagingBucketUrl,
@Parameter(PARAM_FAST) Optional<Boolean> fast,
Clock clock,
Response response,
Dataflow dataflow) {
this.projectId = projectId;
this.jobRegion = jobRegion;
this.stagingBucketUrl = stagingBucketUrl;
this.fast = fast.orElse(false);
this.clock = clock;
this.response = response;
this.dataflow = dataflow;
}
@Override
public void run() {
response.setContentType(MediaType.PLAIN_TEXT_UTF_8);
logger.atInfo().log("Launching ResaveAllEppResourcesPipeline");
try {
LaunchFlexTemplateParameter parameter =
new LaunchFlexTemplateParameter()
.setJobName(createJobName("resave-all-epp-resources", clock))
.setContainerSpecGcsPath(
String.format("%s/%s_metadata.json", stagingBucketUrl, PIPELINE_NAME))
.setParameters(
new ImmutableMap.Builder<String, String>()
.put(PARAM_FAST, Boolean.toString(fast))
.put("registryEnvironment", RegistryEnvironment.get().name())
.build());
LaunchFlexTemplateResponse launchResponse =
dataflow
.projects()
.locations()
.flexTemplates()
.launch(
projectId,
jobRegion,
new LaunchFlexTemplateRequest().setLaunchParameter(parameter))
.execute();
logger.atInfo().log("Got response: %s", launchResponse.getJob().toPrettyString());
String jobId = launchResponse.getJob().getId();
response.setStatus(SC_OK);
response.setPayload(String.format("Launched resaveAllEppResources pipeline: %s", jobId));
} catch (Exception e) {
logger.atSevere().withCause(e).log("Template Launch failed.");
response.setStatus(SC_INTERNAL_SERVER_ERROR);
response.setPayload(String.format("Pipeline launch failed: %s", e.getMessage()));
}
}
}

View file

@ -158,7 +158,7 @@ public class InitSqlPipeline implements Serializable {
.addAll(toKindStrings(PHASE_TWO_ORDERED))
.build()));
// Set up the pipeline to write entity kinds from PHASE_ONE_ORDERED to SQL. Return a object
// Set up the pipeline to write entity kinds from PHASE_ONE_ORDERED to SQL. Return an object
// that signals the completion of the phase.
PCollection<Void> blocker =
scheduleOnePhaseWrites(datastoreSnapshot, PHASE_ONE_ORDERED, Optional.empty(), null);

View file

@ -0,0 +1,174 @@
// Copyright 2022 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.beam.resave;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import static org.apache.beam.sdk.values.TypeDescriptors.integers;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import google.registry.beam.common.RegistryJpaIO;
import google.registry.beam.common.RegistryJpaIO.Read;
import google.registry.model.EppResource;
import google.registry.model.contact.ContactResource;
import google.registry.model.domain.DomainBase;
import google.registry.model.host.HostResource;
import google.registry.persistence.PersistenceModule.TransactionIsolationLevel;
import google.registry.persistence.transaction.CriteriaQueryBuilder;
import google.registry.util.DateTimeUtils;
import java.io.Serializable;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupIntoBatches;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.util.ShardedKey;
import org.apache.beam.sdk.values.KV;
import org.joda.time.DateTime;
/**
* A Dataflow Flex pipeline that resaves changed EPP resources in SQL.
*
* <p>Due to the way that Hibernate works, if an entity is unchanged by {@link
* EppResource#cloneProjectedAtTime(DateTime)} it will not actually be re-persisted to the database.
* Thus, the only actual changes occur when objects are changed by projecting them to now, such as
* when a pending transfer is resolved.
*/
public class ResaveAllEppResourcesPipeline implements Serializable {
private static final ImmutableSet<Class<? extends EppResource>> EPP_RESOURCE_CLASSES =
ImmutableSet.of(ContactResource.class, DomainBase.class, HostResource.class);
/**
* There exist three possible situations where we know we'll want to project domains to the
* current point in time:
*
* <ul>
* <li>A pending domain transfer has expired.
* <li>A domain is past its expiration time without being deleted (this means it autorenewed).
* <li>A domain has expired grace periods.
* </ul>
*
* <p>This command contains all three scenarios so that we can avoid querying the Domain table
* multiple times, and to avoid projecting and resaving the same domain multiple times.
*/
private static final String DOMAINS_TO_PROJECT_QUERY =
"FROM Domain d WHERE (d.transferData.transferStatus = 'PENDING' AND"
+ " d.transferData.pendingTransferExpirationTime < current_timestamp()) OR"
+ " (d.registrationExpirationTime < current_timestamp() AND d.deletionTime ="
+ " (:END_OF_TIME)) OR (EXISTS (SELECT 1 FROM GracePeriod gp WHERE gp.domainRepoId ="
+ " d.repoId AND gp.expirationTime < current_timestamp()))";
private final ResaveAllEppResourcesPipelineOptions options;
ResaveAllEppResourcesPipeline(ResaveAllEppResourcesPipelineOptions options) {
this.options = options;
}
PipelineResult run() {
Pipeline pipeline = Pipeline.create(options);
setupPipeline(pipeline);
return pipeline.run();
}
void setupPipeline(Pipeline pipeline) {
options.setIsolationOverride(TransactionIsolationLevel.TRANSACTION_READ_COMMITTED);
if (options.getFast()) {
fastResaveContacts(pipeline);
fastResaveDomains(pipeline);
} else {
EPP_RESOURCE_CLASSES.forEach(clazz -> forceResaveAllResources(pipeline, clazz));
}
}
/** Projects to the current time and saves any contacts with expired transfers. */
private void fastResaveContacts(Pipeline pipeline) {
Read<ContactResource, ContactResource> read =
RegistryJpaIO.read(
"FROM Contact WHERE transferData.transferStatus = 'PENDING' AND"
+ " transferData.pendingTransferExpirationTime < current_timestamp()",
ContactResource.class,
c -> c);
projectAndResaveResources(pipeline, ContactResource.class, read);
}
/**
* Projects to the current time and saves any domains with expired pending actions (e.g.
* transfers, grace periods).
*
* <p>The logic of what might have changed is paraphrased from {@link
* google.registry.model.domain.DomainContent#cloneProjectedAtTime(DateTime)}.
*/
private void fastResaveDomains(Pipeline pipeline) {
Read<DomainBase, DomainBase> read =
RegistryJpaIO.read(
DOMAINS_TO_PROJECT_QUERY,
ImmutableMap.of("END_OF_TIME", DateTimeUtils.END_OF_TIME),
DomainBase.class,
d -> d);
projectAndResaveResources(pipeline, DomainBase.class, read);
}
/** Projects all resources to the current time and saves them. */
private <T extends EppResource> void forceResaveAllResources(Pipeline pipeline, Class<T> clazz) {
Read<T, T> read = RegistryJpaIO.read(() -> CriteriaQueryBuilder.create(clazz).build());
projectAndResaveResources(pipeline, clazz, read);
}
/** Projects and re-saves the result of the provided {@link Read}. */
private <T extends EppResource> void projectAndResaveResources(
Pipeline pipeline, Class<T> clazz, Read<?, T> read) {
int numShards = options.getSqlWriteShards();
int batchSize = options.getSqlWriteBatchSize();
String className = clazz.getSimpleName();
pipeline
.apply("Read " + className, read)
.apply(
"Shard data for class" + className,
WithKeys.<Integer, T>of(e -> ThreadLocalRandom.current().nextInt(numShards))
.withKeyType(integers()))
.apply(
"Group into batches for class" + className,
GroupIntoBatches.<Integer, T>ofSize(batchSize).withShardedKey())
.apply("Map " + className + " to now", ParDo.of(new BatchedProjectionFunction<>()))
.apply(
"Write transformed " + className,
RegistryJpaIO.<EppResource>write()
.withName("Write transformed " + className)
.withBatchSize(batchSize)
.withShards(numShards));
}
private static class BatchedProjectionFunction<T extends EppResource>
extends DoFn<KV<ShardedKey<Integer>, Iterable<T>>, EppResource> {
@ProcessElement
public void processElement(
@Element KV<ShardedKey<Integer>, Iterable<T>> element,
OutputReceiver<EppResource> outputReceiver) {
jpaTm()
.transact(
() ->
element
.getValue()
.forEach(
resource ->
outputReceiver.output(
resource.cloneProjectedAtTime(jpaTm().getTransactionTime()))));
}
}
}

View file

@ -0,0 +1,26 @@
// Copyright 2022 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.beam.resave;
import google.registry.beam.common.RegistryPipelineOptions;
import org.apache.beam.sdk.options.Description;
public interface ResaveAllEppResourcesPipelineOptions extends RegistryPipelineOptions {
@Description("True if we should attempt to run only over potentially out-of-date EPP resources")
boolean getFast();
void setFast(boolean fast);
}

View file

@ -316,6 +316,12 @@
<url-pattern>/_dr/task/resaveAllEppResources</url-pattern>
</servlet-mapping>
<!-- Dataflow pipeline to re-save all EPP resources. -->
<servlet-mapping>
<servlet-name>backend-servlet</servlet-name>
<url-pattern>/_dr/task/resaveAllEppResourcesPipeline</url-pattern>
</servlet-mapping>
<!-- Reread all Registrar RDAP Base Urls from the ICANN endpoint. -->
<servlet-mapping>
<servlet-name>backend-servlet</servlet-name>

View file

@ -31,6 +31,7 @@ import google.registry.batch.ExpandRecurringBillingEventsAction;
import google.registry.batch.RefreshDnsOnHostRenameAction;
import google.registry.batch.RelockDomainAction;
import google.registry.batch.ResaveAllEppResourcesAction;
import google.registry.batch.ResaveAllEppResourcesPipelineAction;
import google.registry.batch.ResaveEntityAction;
import google.registry.batch.SendExpiringCertificateNotificationEmailAction;
import google.registry.batch.WipeOutCloudSqlAction;
@ -196,6 +197,8 @@ interface BackendRequestComponent {
ResaveAllEppResourcesAction resaveAllEppResourcesAction();
ResaveAllEppResourcesPipelineAction resaveAllEppResourcesPipelineAction();
ResaveEntityAction resaveEntityAction();
SendExpiringCertificateNotificationEmailAction sendExpiringCertificateNotificationEmailAction();

View file

@ -0,0 +1,30 @@
{
"name": "Resave all EPP Resources",
"description": "An Apache Beam pipeline that resaves all (or potentially only changed) EPP resources",
"parameters": [
{
"name": "registryEnvironment",
"label": "The Registry environment.",
"helpText": "The Registry environment.",
"is_optional": false,
"regexes": [
"^PRODUCTION|SANDBOX|CRASH|QA|ALPHA$"
]
},
{
"name": "isolationOverride",
"label": "The desired SQL transaction isolation level.",
"helpText": "The desired SQL transaction isolation level.",
"is_optional": true,
"regexes": [
"^[0-9A-Z_]+$"
]
},
{
"name": "fast",
"label": "Whether or not to attempt to only save changed resources",
"helpText": "If true, we will attempt to only save resources that possibly have expired transfers, grace periods, etc",
"is_optional": false
}
]
}

View file

@ -20,6 +20,7 @@ import static google.registry.testing.DatabaseHelper.persistActiveContact;
import static google.registry.testing.DatabaseHelper.persistContactWithPendingTransfer;
import static org.joda.time.DateTimeZone.UTC;
import google.registry.model.annotations.DeleteAfterMigration;
import google.registry.model.contact.ContactResource;
import google.registry.model.transfer.TransferStatus;
import google.registry.testing.FakeResponse;
@ -29,6 +30,9 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
/** Unit tests for {@link ResaveAllEppResourcesAction}. */
// No longer needed in SQL. Subject to future removal.
@Deprecated
@DeleteAfterMigration
class ResaveAllEppResourcesActionTest extends MapreduceTestCase<ResaveAllEppResourcesAction> {
@BeforeEach

View file

@ -0,0 +1,84 @@
// Copyright 2022 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.batch;
import static com.google.common.truth.Truth.assertThat;
import static google.registry.batch.ResaveAllEppResourcesPipelineAction.PARAM_FAST;
import static google.registry.batch.ResaveAllEppResourcesPipelineAction.PIPELINE_NAME;
import static google.registry.beam.BeamUtils.createJobName;
import static javax.servlet.http.HttpServletResponse.SC_OK;
import static org.mockito.Mockito.verify;
import com.google.api.services.dataflow.model.LaunchFlexTemplateParameter;
import com.google.api.services.dataflow.model.LaunchFlexTemplateRequest;
import com.google.common.collect.ImmutableMap;
import google.registry.beam.BeamActionTestBase;
import google.registry.config.RegistryEnvironment;
import google.registry.testing.AppEngineExtension;
import google.registry.testing.FakeClock;
import java.util.Optional;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
/** Unit tests for {@link ResaveAllEppResourcesPipelineAction}. */
public class ResaveAllEppResourcesPipelineActionTest extends BeamActionTestBase {
@RegisterExtension
final AppEngineExtension appEngine =
AppEngineExtension.builder().withDatastoreAndCloudSql().build();
private final FakeClock fakeClock = new FakeClock();
private ResaveAllEppResourcesPipelineAction createAction(boolean isFast) {
return new ResaveAllEppResourcesPipelineAction(
"test-project",
"test-region",
"staging-bucket",
Optional.of(isFast),
fakeClock,
response,
dataflow);
}
@Test
void testLaunch_notFast() throws Exception {
createAction(false).run();
assertThat(response.getStatus()).isEqualTo(SC_OK);
assertThat(response.getPayload()).isEqualTo("Launched resaveAllEppResources pipeline: jobid");
verify(templates).launch("test-project", "test-region", createLaunchTemplateRequest(false));
}
@Test
void testLaunch_fast() throws Exception {
createAction(true).run();
assertThat(response.getStatus()).isEqualTo(SC_OK);
assertThat(response.getPayload()).isEqualTo("Launched resaveAllEppResources pipeline: jobid");
verify(templates).launch("test-project", "test-region", createLaunchTemplateRequest(true));
}
private LaunchFlexTemplateRequest createLaunchTemplateRequest(boolean isFast) {
return new LaunchFlexTemplateRequest()
.setLaunchParameter(
new LaunchFlexTemplateParameter()
.setJobName(createJobName("resave-all-epp-resources", fakeClock))
.setContainerSpecGcsPath(
String.format("%s/%s_metadata.json", "staging-bucket", PIPELINE_NAME))
.setParameters(
new ImmutableMap.Builder<String, String>()
.put(PARAM_FAST, Boolean.toString(isFast))
.put("registryEnvironment", RegistryEnvironment.get().name())
.build()));
}
}

View file

@ -0,0 +1,204 @@
// Copyright 2022 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.beam.resave;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.truth.Truth.assertThat;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import static google.registry.testing.DatabaseHelper.createTld;
import static google.registry.testing.DatabaseHelper.loadAllOf;
import static google.registry.testing.DatabaseHelper.loadByEntity;
import static google.registry.testing.DatabaseHelper.persistActiveContact;
import static google.registry.testing.DatabaseHelper.persistActiveDomain;
import static google.registry.testing.DatabaseHelper.persistContactWithPendingTransfer;
import static google.registry.testing.DatabaseHelper.persistDomainWithDependentResources;
import static google.registry.testing.DatabaseHelper.persistDomainWithPendingTransfer;
import static google.registry.testing.DatabaseHelper.persistNewRegistrars;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import google.registry.beam.TestPipelineExtension;
import google.registry.model.EppResource;
import google.registry.model.contact.ContactResource;
import google.registry.model.domain.DomainBase;
import google.registry.model.domain.GracePeriod;
import google.registry.model.eppcommon.StatusValue;
import google.registry.persistence.transaction.JpaTestExtensions;
import google.registry.persistence.transaction.JpaTestExtensions.JpaIntegrationTestExtension;
import google.registry.persistence.transaction.JpaTransactionManager;
import google.registry.persistence.transaction.TransactionManagerFactory;
import google.registry.testing.DatastoreEntityExtension;
import google.registry.testing.FakeClock;
import google.registry.testing.TmOverrideExtension;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.ArgumentCaptor;
/** Tests for {@link ResaveAllEppResourcesPipeline}. */
public class ResaveAllEppResourcesPipelineTest {
private final FakeClock fakeClock = new FakeClock(DateTime.parse("2020-03-10T00:00:00.000Z"));
@RegisterExtension
@Order(Order.DEFAULT - 1)
final transient DatastoreEntityExtension datastore =
new DatastoreEntityExtension().allThreads(true);
@RegisterExtension
final TestPipelineExtension testPipeline =
TestPipelineExtension.create().enableAbandonedNodeEnforcement(true);
@RegisterExtension
final JpaIntegrationTestExtension database =
new JpaTestExtensions.Builder().withClock(fakeClock).buildIntegrationTestExtension();
@RegisterExtension
@Order(Order.DEFAULT + 1)
TmOverrideExtension tmOverrideExtension = TmOverrideExtension.withJpa();
private final ResaveAllEppResourcesPipelineOptions options =
PipelineOptionsFactory.create().as(ResaveAllEppResourcesPipelineOptions.class);
@BeforeEach
void beforeEach() {
options.setFast(true);
persistNewRegistrars("TheRegistrar", "NewRegistrar");
createTld("tld");
}
@Test
void testPipeline_unchangedEntity() {
ContactResource contact = persistActiveContact("test123");
DateTime creationTime = contact.getUpdateTimestamp().getTimestamp();
fakeClock.advanceOneMilli();
assertThat(loadByEntity(contact).getUpdateTimestamp().getTimestamp()).isEqualTo(creationTime);
fakeClock.advanceOneMilli();
runPipeline();
assertThat(loadByEntity(contact)).isEqualTo(contact);
}
@Test
void testPipeline_fulfilledContactTransfer() {
ContactResource contact = persistActiveContact("test123");
DateTime now = fakeClock.nowUtc();
contact = persistContactWithPendingTransfer(contact, now, now.plusDays(5), now);
fakeClock.advanceBy(Duration.standardDays(10));
assertThat(loadByEntity(contact).getStatusValues()).contains(StatusValue.PENDING_TRANSFER);
runPipeline();
assertThat(loadByEntity(contact).getStatusValues())
.doesNotContain(StatusValue.PENDING_TRANSFER);
}
@Test
void testPipeline_fulfilledDomainTransfer() {
options.setFast(true);
DateTime now = fakeClock.nowUtc();
DomainBase domain =
persistDomainWithPendingTransfer(
persistDomainWithDependentResources(
"domain",
"tld",
persistActiveContact("jd1234"),
now.minusDays(5),
now.minusDays(5),
now.plusYears(2)),
now.minusDays(4),
now.minusDays(1),
now.plusYears(2));
assertThat(domain.getStatusValues()).contains(StatusValue.PENDING_TRANSFER);
assertThat(domain.getUpdateTimestamp().getTimestamp()).isEqualTo(now);
fakeClock.advanceOneMilli();
runPipeline();
DomainBase postPipeline = loadByEntity(domain);
assertThat(postPipeline.getStatusValues()).doesNotContain(StatusValue.PENDING_TRANSFER);
assertThat(postPipeline.getUpdateTimestamp().getTimestamp()).isEqualTo(fakeClock.nowUtc());
}
@Test
void testPipeline_autorenewedDomain() {
DateTime now = fakeClock.nowUtc();
DomainBase domain =
persistDomainWithDependentResources(
"domain", "tld", persistActiveContact("jd1234"), now, now, now.plusYears(1));
assertThat(domain.getRegistrationExpirationTime()).isEqualTo(now.plusYears(1));
fakeClock.advanceBy(Duration.standardDays(500));
runPipeline();
DomainBase postPipeline = loadByEntity(domain);
assertThat(postPipeline.getRegistrationExpirationTime()).isEqualTo(now.plusYears(2));
}
@Test
void testPipeline_expiredGracePeriod() {
DateTime now = fakeClock.nowUtc();
persistDomainWithDependentResources(
"domain", "tld", persistActiveContact("jd1234"), now, now, now.plusYears(1));
assertThat(loadAllOf(GracePeriod.class)).hasSize(1);
fakeClock.advanceBy(Duration.standardDays(500));
runPipeline();
assertThat(loadAllOf(GracePeriod.class)).isEmpty();
}
@Test
void testPipeline_fastOnlySavesChanged() {
DateTime now = fakeClock.nowUtc();
ContactResource contact = persistActiveContact("jd1234");
persistDomainWithDependentResources("renewed", "tld", contact, now, now, now.plusYears(1));
persistActiveDomain("nonrenewed.tld", now, now.plusYears(20));
// Spy the transaction manager so we can be sure we're only saving the renewed domain
JpaTransactionManager spy = spy(jpaTm());
TransactionManagerFactory.setJpaTm(() -> spy);
ArgumentCaptor<DomainBase> domainPutCaptor = ArgumentCaptor.forClass(DomainBase.class);
runPipeline();
// We should only be attempting to put the one changed domain into the DB
verify(spy).put(domainPutCaptor.capture());
assertThat(domainPutCaptor.getValue().getDomainName()).isEqualTo("renewed.tld");
}
@Test
void testPipeline_notFastResavesAll() {
options.setFast(false);
DateTime now = fakeClock.nowUtc();
ContactResource contact = persistActiveContact("jd1234");
DomainBase renewed =
persistDomainWithDependentResources("renewed", "tld", contact, now, now, now.plusYears(1));
DomainBase nonRenewed =
persistDomainWithDependentResources(
"nonrenewed", "tld", contact, now, now, now.plusYears(20));
// Spy the transaction manager so we can be sure we're attempting to save everything
JpaTransactionManager spy = spy(jpaTm());
TransactionManagerFactory.setJpaTm(() -> spy);
ArgumentCaptor<EppResource> eppResourcePutCaptor = ArgumentCaptor.forClass(EppResource.class);
runPipeline();
// We should be attempting to put both domains (and the contact) in, even the unchanged ones
verify(spy, times(3)).put(eppResourcePutCaptor.capture());
assertThat(
eppResourcePutCaptor.getAllValues().stream()
.map(EppResource::getRepoId)
.collect(toImmutableSet()))
.containsExactly(contact.getRepoId(), renewed.getRepoId(), nonRenewed.getRepoId());
}
private void runPipeline() {
ResaveAllEppResourcesPipeline pipeline = new ResaveAllEppResourcesPipeline(options);
pipeline.setupPipeline(testPipeline);
testPipeline.run().waitUntilFinish();
}
}

View file

@ -37,6 +37,7 @@ PATH CLASS
/_dr/task/relockDomain RelockDomainAction POST y INTERNAL,API APP ADMIN
/_dr/task/replayCommitLogsToSql ReplayCommitLogsToSqlAction POST y INTERNAL,API APP ADMIN
/_dr/task/resaveAllEppResources ResaveAllEppResourcesAction GET n INTERNAL,API APP ADMIN
/_dr/task/resaveAllEppResourcesPipeline ResaveAllEppResourcesPipelineAction GET n INTERNAL,API APP ADMIN
/_dr/task/resaveEntity ResaveEntityAction POST n INTERNAL,API APP ADMIN
/_dr/task/sendExpiringCertificateNotificationEmail SendExpiringCertificateNotificationEmailAction GET n INTERNAL,API APP ADMIN
/_dr/task/syncDatastoreToSqlSnapshot SyncDatastoreToSqlSnapshotAction POST n INTERNAL,API APP ADMIN

View file

@ -98,7 +98,9 @@ steps:
google.registry.beam.rde.RdePipeline \
google/registry/beam/rde_pipeline_metadata.json \
google.registry.beam.comparedb.ValidateDatabasePipeline \
google/registry/beam/validate_database_pipeline_metadata.json
google/registry/beam/validate_database_pipeline_metadata.json \
google.registry.beam.resave.ResaveAllEppResourcesPipeline \
google/registry/beam/resave_all_epp_resources_pipeline_metadata.json
# Tentatively build and publish Cloud SQL schema jar here, before schema release
# process is finalized. Also publish nomulus:core jars that are needed for
# server/schema compatibility tests.