diff --git a/core/build.gradle b/core/build.gradle index 1ad2e9743..40dc6541f 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -703,6 +703,12 @@ createToolTask( 'google.registry.tools.DevTool', sourceSets.nonprod) +createToolTask( + 'initSqlPipeline', 'google.registry.beam.initsql.InitSqlPipeline') + +createToolTask( + 'validateSqlPipeline', 'google.registry.beam.comparedb.ValidateSqlPipeline') + createToolTask( 'jpaDemoPipeline', 'google.registry.beam.common.JpaDemoPipeline') @@ -711,30 +717,6 @@ createToolTask( 'createSyntheticHistoryEntries', 'google.registry.tools.javascrap.CreateSyntheticHistoryEntriesPipeline') -project.tasks.create('initSqlPipeline', JavaExec) { - main = 'google.registry.beam.initsql.InitSqlPipeline' - - doFirst { - getToolArgsList().ifPresent { - args it - } - - def isDirectRunner = - args.contains('DirectRunner') || args.contains('--runner=DirectRunner') - // The dependency containing DirectRunner is intentionally excluded from the - // production binary, so that it won't be chosen by mistake: we definitely do - // not want to use it for the real jobs, yet DirectRunner is the default if - // the user forgets to override it. - // DirectRunner is required for tests and is already on testRuntimeClasspath. - // For simplicity, we add testRuntimeClasspath to this task's classpath instead - // of defining a new configuration just for the DirectRunner dependency. - classpath = - isDirectRunner - ? sourceSets.main.runtimeClasspath.plus(sourceSets.test.runtimeClasspath) - : sourceSets.main.runtimeClasspath - } -} - // Caller must provide projectId, GCP region, runner, and the kinds to delete // (comma-separated kind names or '*' for all). E.g.: // nom_build :core:bulkDeleteDatastore --args="--project=domain-registry-crash \ diff --git a/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java b/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java index b8441cb81..93c141915 100644 --- a/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java +++ b/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java @@ -175,7 +175,7 @@ public final class RegistryJpaIO { * JpaTransactionManager#setDatabaseSnapshot}. */ // TODO(b/193662898): vendor-independent support for richer transaction semantics. - public Read withSnapshot(String snapshotId) { + public Read withSnapshot(@Nullable String snapshotId) { return toBuilder().snapshotId(snapshotId).build(); } diff --git a/core/src/main/java/google/registry/beam/common/RegistryPipelineWorkerInitializer.java b/core/src/main/java/google/registry/beam/common/RegistryPipelineWorkerInitializer.java index 3020ba5ec..a5e01ef95 100644 --- a/core/src/main/java/google/registry/beam/common/RegistryPipelineWorkerInitializer.java +++ b/core/src/main/java/google/registry/beam/common/RegistryPipelineWorkerInitializer.java @@ -63,7 +63,8 @@ public class RegistryPipelineWorkerInitializer implements JvmInitializer { TransactionManagerFactory.setJpaTmOnBeamWorker(transactionManagerLazy::get); // Masquerade all threads as App Engine threads so we can create Ofy keys in the pipeline. Also // loads all ofy entities. - new AppEngineEnvironment("Beam").setEnvironmentForAllThreads(); + new AppEngineEnvironment("s~" + registryPipelineComponent.getProjectId()) + .setEnvironmentForAllThreads(); // Set the system property so that we can call IdService.allocateId() without access to // datastore. SystemPropertySetter.PRODUCTION_IMPL.setProperty(PROPERTY, "true"); diff --git a/core/src/main/java/google/registry/beam/comparedb/DatastoreSnapshots.java b/core/src/main/java/google/registry/beam/comparedb/DatastoreSnapshots.java new file mode 100644 index 000000000..2a4a32675 --- /dev/null +++ b/core/src/main/java/google/registry/beam/comparedb/DatastoreSnapshots.java @@ -0,0 +1,169 @@ +// Copyright 2021 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam.comparedb; + +import static google.registry.beam.comparedb.ValidateSqlUtils.createSqlEntityTupleTag; +import static google.registry.beam.initsql.Transforms.createTagForKind; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Verify; +import com.google.common.collect.ImmutableSet; +import com.googlecode.objectify.Key; +import google.registry.backup.VersionedEntity; +import google.registry.beam.initsql.Transforms; +import google.registry.model.billing.BillingEvent; +import google.registry.model.common.Cursor; +import google.registry.model.contact.ContactHistory; +import google.registry.model.contact.ContactResource; +import google.registry.model.domain.DomainBase; +import google.registry.model.domain.DomainHistory; +import google.registry.model.domain.token.AllocationToken; +import google.registry.model.host.HostHistory; +import google.registry.model.host.HostResource; +import google.registry.model.poll.PollMessage; +import google.registry.model.registrar.Registrar; +import google.registry.model.registrar.RegistrarContact; +import google.registry.model.replay.SqlEntity; +import google.registry.model.reporting.HistoryEntry; +import google.registry.model.tld.Registry; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.joda.time.DateTime; + +/** Utilities for loading Datastore snapshots. */ +public final class DatastoreSnapshots { + + private DatastoreSnapshots() {} + + /** + * Datastore kinds eligible for validation. This set must be consistent with {@link + * SqlSnapshots#ALL_SQL_ENTITIES}. + */ + @VisibleForTesting + static final ImmutableSet> ALL_DATASTORE_KINDS = + ImmutableSet.of( + Registry.class, + Cursor.class, + Registrar.class, + ContactResource.class, + RegistrarContact.class, + HostResource.class, + HistoryEntry.class, + AllocationToken.class, + BillingEvent.Recurring.class, + BillingEvent.OneTime.class, + BillingEvent.Cancellation.class, + PollMessage.class, + DomainBase.class); + + /** + * Returns the Datastore snapshot right before {@code commitLogToTime} for the user specified + * {@code kinds}. The resulting snapshot has all changes that happened before {@code + * commitLogToTime}, and none at or after {@code commitLogToTime}. + * + *

If {@code HistoryEntry} is included in {@code kinds}, the result will contain {@code + * PCollections} for the child entities, {@code DomainHistory}, {@code ContactHistory}, and {@code + * HostHistory}. + */ + static PCollectionTuple loadDatastoreSnapshotByKind( + Pipeline pipeline, + String exportDir, + String commitLogDir, + DateTime commitLogFromTime, + DateTime commitLogToTime, + Set> kinds) { + PCollectionTuple snapshot = + pipeline.apply( + "Load Datastore snapshot.", + Transforms.loadDatastoreSnapshot( + exportDir, + commitLogDir, + commitLogFromTime, + commitLogToTime, + kinds.stream().map(Key::getKind).collect(ImmutableSet.toImmutableSet()))); + + PCollectionTuple perTypeSnapshots = PCollectionTuple.empty(pipeline); + for (Class kind : kinds) { + PCollection perKindSnapshot = + snapshot.get(createTagForKind(Key.getKind(kind))); + if (SqlEntity.class.isAssignableFrom(kind)) { + perTypeSnapshots = + perTypeSnapshots.and( + createSqlEntityTupleTag((Class) kind), + datastoreEntityToPojo(perKindSnapshot, kind.getSimpleName())); + continue; + } + Verify.verify(kind == HistoryEntry.class, "Unexpected Non-SqlEntity class: %s", kind); + PCollectionTuple historyEntriesByType = splitHistoryEntry(perKindSnapshot); + for (Map.Entry, PCollection> entry : + historyEntriesByType.getAll().entrySet()) { + perTypeSnapshots = perTypeSnapshots.and(entry.getKey().getId(), entry.getValue()); + } + } + return perTypeSnapshots; + } + + /** + * Splits a {@link PCollection} of {@link HistoryEntry HistoryEntries} into three collections of + * its child entities by type. + */ + static PCollectionTuple splitHistoryEntry(PCollection historyEntries) { + return historyEntries.apply( + "Split HistoryEntry by Resource Type", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement( + @Element VersionedEntity historyEntry, MultiOutputReceiver out) { + Optional.ofNullable(Transforms.convertVersionedEntityToSqlEntity(historyEntry)) + .ifPresent( + sqlEntity -> + out.get(createSqlEntityTupleTag(sqlEntity.getClass())) + .output(sqlEntity)); + } + }) + .withOutputTags( + createSqlEntityTupleTag(DomainHistory.class), + TupleTagList.of(createSqlEntityTupleTag(ContactHistory.class)) + .and(createSqlEntityTupleTag(HostHistory.class)))); + } + + /** + * Transforms a {@link PCollection} of {@link VersionedEntity VersionedEntities} to Ofy Java + * objects. + */ + static PCollection datastoreEntityToPojo( + PCollection entities, String desc) { + return entities.apply( + "Datastore Entity to Pojo " + desc, + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement( + @Element VersionedEntity entity, OutputReceiver out) { + Optional.ofNullable(Transforms.convertVersionedEntityToSqlEntity(entity)) + .ifPresent(out::output); + } + })); + } +} diff --git a/core/src/main/java/google/registry/beam/comparedb/LatestDatastoreSnapshotFinder.java b/core/src/main/java/google/registry/beam/comparedb/LatestDatastoreSnapshotFinder.java new file mode 100644 index 000000000..795c50cfc --- /dev/null +++ b/core/src/main/java/google/registry/beam/comparedb/LatestDatastoreSnapshotFinder.java @@ -0,0 +1,145 @@ +// Copyright 2021 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam.comparedb; + +import com.google.auto.value.AutoValue; +import com.google.cloud.storage.BlobId; +import com.google.cloud.storage.BlobInfo; +import dagger.Component; +import google.registry.config.CloudTasksUtilsModule; +import google.registry.config.CredentialModule; +import google.registry.config.RegistryConfig; +import google.registry.config.RegistryConfig.Config; +import google.registry.config.RegistryConfig.ConfigModule; +import google.registry.gcs.GcsUtils; +import google.registry.util.Clock; +import google.registry.util.UtilsModule; +import java.io.IOException; +import java.util.Comparator; +import java.util.NoSuchElementException; +import java.util.Optional; +import javax.inject.Inject; +import javax.inject.Singleton; +import org.joda.time.DateTime; +import org.joda.time.Instant; +import org.joda.time.Interval; + +/** Finds the necessary information for loading the most recent Datastore snapshot. */ +public class LatestDatastoreSnapshotFinder { + private final String projectId; + private final GcsUtils gcsUtils; + private final Clock clock; + + @Inject + LatestDatastoreSnapshotFinder( + @Config("projectId") String projectId, GcsUtils gcsUtils, Clock clock) { + this.projectId = projectId; + this.gcsUtils = gcsUtils; + this.clock = clock; + } + + /** + * Finds information of the most recent Datastore snapshot, including the GCS folder of the + * exported data files and the start and stop times of the export. The folder of the CommitLogs is + * also included in the return. + */ + public DatastoreSnapshotInfo getSnapshotInfo() { + String bucketName = RegistryConfig.getDatastoreBackupsBucket().substring("gs://".length()); + /** + * Find the bucket-relative path to the overall metadata file of the last Datastore export. + * Since Datastore export is saved daily, we may need to look back to yesterday. If found, the + * return value is like + * "2021-11-19T06:00:00_76493/2021-11-19T06:00:00_76493.overall_export_metadata". + */ + Optional metaFilePathOptional = findMostRecentExportMetadataFile(bucketName, 2); + if (!metaFilePathOptional.isPresent()) { + throw new NoSuchElementException("No exports found over the past 2 days."); + } + String metaFilePath = metaFilePathOptional.get(); + String metaFileFolder = metaFilePath.substring(0, metaFilePath.indexOf('/')); + Instant exportStartTime = Instant.parse(metaFileFolder.replace('_', '.') + 'Z'); + BlobInfo blobInfo = gcsUtils.getBlobInfo(BlobId.of(bucketName, metaFilePath)); + Instant exportEndTime = new Instant(blobInfo.getCreateTime()); + return DatastoreSnapshotInfo.create( + String.format("gs://%s/%s", bucketName, metaFileFolder), + getCommitLogDir(), + new Interval(exportStartTime, exportEndTime)); + } + + public String getCommitLogDir() { + return "gs://" + projectId + "-commits"; + } + + /** + * Finds the bucket-relative path of the overall export metadata file, in the given bucket, + * searching back up to {@code lookBackDays} days, including today. + * + *

The overall export metadata file is the last file created during a Datastore export. All + * data has been exported by the creation time of this file. The name of this file, like that of + * all files in the same export, begins with the timestamp when the export starts. + * + *

An example return value: {@code + * 2021-11-19T06:00:00_76493/2021-11-19T06:00:00_76493.overall_export_metadata}. + */ + private Optional findMostRecentExportMetadataFile(String bucketName, int lookBackDays) { + DateTime today = clock.nowUtc(); + for (int day = 0; day < lookBackDays; day++) { + String dateString = today.minusDays(day).toString("yyyy-MM-dd"); + try { + Optional metaFilePath = + gcsUtils.listFolderObjects(bucketName, dateString).stream() + .filter(s -> s.endsWith("overall_export_metadata")) + .map(s -> dateString + s) + .sorted(Comparator.naturalOrder().reversed()) + .findFirst(); + if (metaFilePath.isPresent()) { + return metaFilePath; + } + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + } + return Optional.empty(); + } + + /** Holds information about a Datastore snapshot. */ + @AutoValue + abstract static class DatastoreSnapshotInfo { + abstract String exportDir(); + + abstract String commitLogDir(); + + abstract Interval exportInterval(); + + static DatastoreSnapshotInfo create( + String exportDir, String commitLogDir, Interval exportOperationInterval) { + return new AutoValue_LatestDatastoreSnapshotFinder_DatastoreSnapshotInfo( + exportDir, commitLogDir, exportOperationInterval); + } + } + + @Singleton + @Component( + modules = { + CredentialModule.class, + ConfigModule.class, + CloudTasksUtilsModule.class, + UtilsModule.class + }) + interface LatestDatastoreSnapshotFinderFinderComponent { + + LatestDatastoreSnapshotFinder datastoreSnapshotInfoFinder(); + } +} diff --git a/core/src/main/java/google/registry/beam/comparedb/SqlSnapshots.java b/core/src/main/java/google/registry/beam/comparedb/SqlSnapshots.java new file mode 100644 index 000000000..82674968d --- /dev/null +++ b/core/src/main/java/google/registry/beam/comparedb/SqlSnapshots.java @@ -0,0 +1,384 @@ +// Copyright 2021 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam.comparedb; + +import static google.registry.beam.comparedb.ValidateSqlUtils.createSqlEntityTupleTag; +import static google.registry.beam.comparedb.ValidateSqlUtils.getMedianIdForHistoryTable; + +import com.google.common.base.Verify; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ImmutableSetMultimap; +import com.google.common.collect.Streams; +import google.registry.beam.common.RegistryJpaIO; +import google.registry.model.billing.BillingEvent; +import google.registry.model.bulkquery.BulkQueryEntities; +import google.registry.model.bulkquery.DomainBaseLite; +import google.registry.model.bulkquery.DomainHistoryHost; +import google.registry.model.bulkquery.DomainHistoryLite; +import google.registry.model.bulkquery.DomainHost; +import google.registry.model.common.Cursor; +import google.registry.model.contact.ContactHistory; +import google.registry.model.contact.ContactResource; +import google.registry.model.domain.DomainBase; +import google.registry.model.domain.DomainHistory; +import google.registry.model.domain.DomainHistory.DomainHistoryId; +import google.registry.model.domain.GracePeriod; +import google.registry.model.domain.GracePeriod.GracePeriodHistory; +import google.registry.model.domain.secdns.DelegationSignerData; +import google.registry.model.domain.secdns.DomainDsDataHistory; +import google.registry.model.domain.token.AllocationToken; +import google.registry.model.host.HostHistory; +import google.registry.model.host.HostResource; +import google.registry.model.poll.PollMessage; +import google.registry.model.registrar.Registrar; +import google.registry.model.registrar.RegistrarContact; +import google.registry.model.replay.SqlEntity; +import google.registry.model.reporting.DomainTransactionRecord; +import google.registry.model.tld.Registry; +import google.registry.persistence.transaction.CriteriaQueryBuilder; +import java.io.Serializable; +import java.util.Optional; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeDescriptors; + +/** + * Utilities for loading SQL snapshots. + * + *

For {@link DomainBase} and {@link DomainHistory}, this class assumes the presence of the + * {@link google.registry.persistence.PersistenceModule.JpaTransactionManagerType#BULK_QUERY + * bulk-query-capable JpaTransactionManager}, and takes advantage of it for higher throughput. + * + *

For now this class is meant for use during the database migration period only. Therefore, it + * contains optimizations specifically for the production database at the current size, e.g., + * parallel queries for select tables. + */ +public final class SqlSnapshots { + + private SqlSnapshots() {} + + /** + * SQL entity types that are eligible for validation. This set must be consistent with {@link + * DatastoreSnapshots#ALL_DATASTORE_KINDS}. + */ + static final ImmutableSet> ALL_SQL_ENTITIES = + ImmutableSet.of( + Registry.class, + Cursor.class, + Registrar.class, + ContactResource.class, + RegistrarContact.class, + HostResource.class, + AllocationToken.class, + BillingEvent.Recurring.class, + BillingEvent.OneTime.class, + BillingEvent.Cancellation.class, + PollMessage.class, + DomainBase.class, + ContactHistory.class, + HostHistory.class, + DomainHistory.class); + + /** + * Loads a SQL snapshot for the given {@code sqlEntityTypes}. + * + *

If {@code snapshotId} is present, all queries use the specified database snapshot, + * guaranteeing a consistent result. + */ + public static PCollectionTuple loadCloudSqlSnapshotByType( + Pipeline pipeline, + ImmutableSet> sqlEntityTypes, + Optional snapshotId) { + PCollectionTuple perTypeSnapshots = PCollectionTuple.empty(pipeline); + for (Class clazz : sqlEntityTypes) { + if (clazz == DomainBase.class) { + perTypeSnapshots = + perTypeSnapshots.and( + createSqlEntityTupleTag(DomainBase.class), + loadAndAssembleDomainBase(pipeline, snapshotId)); + continue; + } + if (clazz == DomainHistory.class) { + perTypeSnapshots = + perTypeSnapshots.and( + createSqlEntityTupleTag(DomainHistory.class), + loadAndAssembleDomainHistory(pipeline, snapshotId)); + continue; + } + if (clazz == ContactHistory.class) { + perTypeSnapshots = + perTypeSnapshots.and( + createSqlEntityTupleTag(ContactHistory.class), + loadContactHistory(pipeline, snapshotId)); + continue; + } + perTypeSnapshots = + perTypeSnapshots.and( + createSqlEntityTupleTag(clazz), + pipeline.apply( + "SQL Load " + clazz.getSimpleName(), + RegistryJpaIO.read( + () -> CriteriaQueryBuilder.create(clazz).build(), SqlEntity.class::cast) + .withSnapshot(snapshotId.orElse(null)))); + } + return perTypeSnapshots; + } + + /** + * Bulk-loads parts of {@link DomainBase} and assembles them in the pipeline. + * + * @see BulkQueryEntities + */ + public static PCollection loadAndAssembleDomainBase( + Pipeline pipeline, Optional snapshotId) { + PCollection> baseObjects = + readAllAndAssignKey(pipeline, DomainBaseLite.class, DomainBaseLite::getRepoId, snapshotId); + PCollection> gracePeriods = + readAllAndAssignKey(pipeline, GracePeriod.class, GracePeriod::getDomainRepoId, snapshotId); + PCollection> delegationSigners = + readAllAndAssignKey( + pipeline, + DelegationSignerData.class, + DelegationSignerData::getDomainRepoId, + snapshotId); + PCollection> domainHosts = + readAllAndAssignKey(pipeline, DomainHost.class, DomainHost::getDomainRepoId, snapshotId); + + return PCollectionList.of( + ImmutableList.of(baseObjects, gracePeriods, delegationSigners, domainHosts)) + .apply("SQL Merge DomainBase parts", Flatten.pCollections()) + .apply("Group by Domain Parts by RepoId", GroupByKey.create()) + .apply( + "Assemble DomainBase", + ParDo.of( + new DoFn>, SqlEntity>() { + @ProcessElement + public void processElement( + @Element KV> kv, + OutputReceiver outputReceiver) { + TypedClassifier partsByType = new TypedClassifier(kv.getValue()); + ImmutableSet baseObjects = + partsByType.getAllOf(DomainBaseLite.class); + Verify.verify( + baseObjects.size() == 1, + "Expecting one DomainBaseLite object per repoId: " + kv.getKey()); + outputReceiver.output( + BulkQueryEntities.assembleDomainBase( + baseObjects.iterator().next(), + partsByType.getAllOf(GracePeriod.class), + partsByType.getAllOf(DelegationSignerData.class), + partsByType.getAllOf(DomainHost.class).stream() + .map(DomainHost::getHostVKey) + .collect(ImmutableSet.toImmutableSet()))); + } + })); + } + + /** + * Loads all {@link ContactHistory} entities from the database. + * + *

This method uses two queries to load data in parallel. This is a performance optimization + * specifically for the production database. + */ + static PCollection loadContactHistory(Pipeline pipeline, Optional snapshotId) { + long medianId = + getMedianIdForHistoryTable("ContactHistory") + .orElseThrow( + () -> new IllegalStateException("Not a valid database: no ContactHistory.")); + PCollection part1 = + pipeline.apply( + "SQL Load ContactHistory first half", + RegistryJpaIO.read( + String.format("select c from ContactHistory c where id <= %s", medianId), + false, + SqlEntity.class::cast) + .withSnapshot(snapshotId.orElse(null))); + PCollection part2 = + pipeline.apply( + "SQL Load ContactHistory second half", + RegistryJpaIO.read( + String.format("select c from ContactHistory c where id > %s", medianId), + false, + SqlEntity.class::cast) + .withSnapshot(snapshotId.orElse(null))); + return PCollectionList.of(part1) + .and(part2) + .apply("Combine ContactHistory parts", Flatten.pCollections()); + } + + /** + * Bulk-loads all parts of {@link DomainHistory} and assembles them in the pipeline. + * + *

This method uses two queries to load {@link DomainBaseLite} in parallel. This is a + * performance optimization specifically for the production database. + * + * @see BulkQueryEntities + */ + static PCollection loadAndAssembleDomainHistory( + Pipeline pipeline, Optional snapshotId) { + long medianId = + getMedianIdForHistoryTable("DomainHistory") + .orElseThrow( + () -> new IllegalStateException("Not a valid database: no DomainHistory.")); + PCollection> baseObjectsPart1 = + queryAndAssignKey( + pipeline, + "first half", + String.format("select c from DomainHistory c where id <= %s", medianId), + DomainHistoryLite.class, + compose(DomainHistoryLite::getDomainHistoryId, DomainHistoryId::toString), + snapshotId); + PCollection> baseObjectsPart2 = + queryAndAssignKey( + pipeline, + "second half", + String.format("select c from DomainHistory c where id > %s", medianId), + DomainHistoryLite.class, + compose(DomainHistoryLite::getDomainHistoryId, DomainHistoryId::toString), + snapshotId); + PCollection> gracePeriods = + readAllAndAssignKey( + pipeline, + GracePeriodHistory.class, + compose(GracePeriodHistory::getDomainHistoryId, DomainHistoryId::toString), + snapshotId); + PCollection> delegationSigners = + readAllAndAssignKey( + pipeline, + DomainDsDataHistory.class, + compose(DomainDsDataHistory::getDomainHistoryId, DomainHistoryId::toString), + snapshotId); + PCollection> domainHosts = + readAllAndAssignKey( + pipeline, + DomainHistoryHost.class, + compose(DomainHistoryHost::getDomainHistoryId, DomainHistoryId::toString), + snapshotId); + PCollection> transactionRecords = + readAllAndAssignKey( + pipeline, + DomainTransactionRecord.class, + compose(DomainTransactionRecord::getDomainHistoryId, DomainHistoryId::toString), + snapshotId); + + return PCollectionList.of( + ImmutableList.of( + baseObjectsPart1, + baseObjectsPart2, + gracePeriods, + delegationSigners, + domainHosts, + transactionRecords)) + .apply("Merge DomainHistory parts", Flatten.pCollections()) + .apply("Group by DomainHistory Parts by DomainHistoryId string", GroupByKey.create()) + .apply( + "Assemble DomainHistory", + ParDo.of( + new DoFn>, SqlEntity>() { + @ProcessElement + public void processElement( + @Element KV> kv, + OutputReceiver outputReceiver) { + TypedClassifier partsByType = new TypedClassifier(kv.getValue()); + ImmutableSet baseObjects = + partsByType.getAllOf(DomainHistoryLite.class); + Verify.verify( + baseObjects.size() == 1, + "Expecting one DomainHistoryLite object per domainHistoryId: " + + kv.getKey()); + outputReceiver.output( + BulkQueryEntities.assembleDomainHistory( + baseObjects.iterator().next(), + partsByType.getAllOf(DomainDsDataHistory.class), + partsByType.getAllOf(DomainHistoryHost.class).stream() + .map(DomainHistoryHost::getHostVKey) + .collect(ImmutableSet.toImmutableSet()), + partsByType.getAllOf(GracePeriodHistory.class), + partsByType.getAllOf(DomainTransactionRecord.class))); + } + })); + } + + static PCollection> readAllAndAssignKey( + Pipeline pipeline, + Class type, + SerializableFunction keyFunction, + Optional snapshotId) { + return pipeline + .apply( + "SQL Load " + type.getSimpleName(), + RegistryJpaIO.read(() -> CriteriaQueryBuilder.create(type).build()) + .withSnapshot(snapshotId.orElse(null))) + .apply( + "Assign Key to " + type.getSimpleName(), + MapElements.into( + TypeDescriptors.kvs( + TypeDescriptors.strings(), TypeDescriptor.of(Serializable.class))) + .via(obj -> KV.of(keyFunction.apply(obj), (Serializable) obj))); + } + + static PCollection> queryAndAssignKey( + Pipeline pipeline, + String diffrentiator, + String jplQuery, + Class type, + SerializableFunction keyFunction, + Optional snapshotId) { + return pipeline + .apply( + "SQL Load " + type.getSimpleName() + " " + diffrentiator, + RegistryJpaIO.read(jplQuery, false, type::cast).withSnapshot(snapshotId.orElse(null))) + .apply( + "Assign Key to " + type.getSimpleName() + " " + diffrentiator, + MapElements.into( + TypeDescriptors.kvs( + TypeDescriptors.strings(), TypeDescriptor.of(Serializable.class))) + .via(obj -> KV.of(keyFunction.apply(obj), (Serializable) obj))); + } + + // TODO(b/205988530): don't use beam serializablefunction, make one that extends Java's Function. + private static SerializableFunction compose( + SerializableFunction f1, SerializableFunction f2) { + return r -> f2.apply(f1.apply(r)); + } + + /** Container that receives mixed-typed data and groups them by {@link Class}. */ + static class TypedClassifier { + private final ImmutableSetMultimap, Object> classifiedEntities; + + TypedClassifier(Iterable inputs) { + this.classifiedEntities = + Streams.stream(inputs) + .collect(ImmutableSetMultimap.toImmutableSetMultimap(Object::getClass, x -> x)); + } + + ImmutableSet getAllOf(Class clazz) { + return classifiedEntities.get(clazz).stream() + .map(clazz::cast) + .collect(ImmutableSet.toImmutableSet()); + } + } +} diff --git a/core/src/main/java/google/registry/beam/comparedb/ValidateSqlPipeline.java b/core/src/main/java/google/registry/beam/comparedb/ValidateSqlPipeline.java new file mode 100644 index 000000000..636c1b243 --- /dev/null +++ b/core/src/main/java/google/registry/beam/comparedb/ValidateSqlPipeline.java @@ -0,0 +1,159 @@ +// Copyright 2021 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam.comparedb; + +import static com.google.common.base.Verify.verify; +import static org.apache.beam.sdk.values.TypeDescriptors.strings; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import google.registry.beam.common.DatabaseSnapshot; +import google.registry.beam.common.RegistryPipelineWorkerInitializer; +import google.registry.beam.comparedb.LatestDatastoreSnapshotFinder.DatastoreSnapshotInfo; +import google.registry.beam.comparedb.ValidateSqlUtils.CompareSqlEntity; +import google.registry.model.domain.DomainBase; +import google.registry.model.domain.DomainHistory; +import google.registry.model.replay.SqlEntity; +import google.registry.model.replay.SqlReplayCheckpoint; +import google.registry.persistence.PersistenceModule.JpaTransactionManagerType; +import google.registry.persistence.PersistenceModule.TransactionIsolationLevel; +import google.registry.persistence.transaction.TransactionManagerFactory; +import java.io.Serializable; +import java.util.Optional; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult.State; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.DateTime; + +/** + * Validates the asynchronous data replication process from Datastore (primary storage) to Cloud SQL + * (secondary storage). + */ +public class ValidateSqlPipeline { + + /** Specifies the extra CommitLogs to load before the start of a Database export. */ + private static final int COMMIT_LOG_MARGIN_MINUTES = 10; + + private final ValidateSqlPipelineOptions options; + private final DatastoreSnapshotInfo mostRecentExport; + + public ValidateSqlPipeline( + ValidateSqlPipelineOptions options, DatastoreSnapshotInfo mostRecentExport) { + this.options = options; + this.mostRecentExport = mostRecentExport; + } + + void run() { + run(Pipeline.create(options)); + } + + @VisibleForTesting + void run(Pipeline pipeline) { + // TODO(weiminyu): Acquire the commit log replay lock when the lock release bug is fixed. + DateTime latestCommitLogTime = + TransactionManagerFactory.jpaTm().transact(() -> SqlReplayCheckpoint.get()); + Preconditions.checkState( + latestCommitLogTime.isAfter(mostRecentExport.exportInterval().getEnd()), + "Cannot recreate Datastore snapshot since target time is in the middle of an export."); + try (DatabaseSnapshot databaseSnapshot = DatabaseSnapshot.createSnapshot()) { + setupPipeline(pipeline, Optional.of(databaseSnapshot.getSnapshotId()), latestCommitLogTime); + State state = pipeline.run().waitUntilFinish(); + if (!State.DONE.equals(state)) { + throw new IllegalStateException("Unexpected pipeline state: " + state); + } + } + } + + void setupPipeline( + Pipeline pipeline, Optional sqlSnapshotId, DateTime latestCommitLogTime) { + pipeline + .getCoderRegistry() + .registerCoderForClass(SqlEntity.class, SerializableCoder.of(Serializable.class)); + + PCollectionTuple datastoreSnapshot = + DatastoreSnapshots.loadDatastoreSnapshotByKind( + pipeline, + mostRecentExport.exportDir(), + mostRecentExport.commitLogDir(), + mostRecentExport.exportInterval().getStart().minusMinutes(COMMIT_LOG_MARGIN_MINUTES), + // Increase by 1ms since we want to include commitLogs latestCommitLogTime but + // this parameter is exclusive. + latestCommitLogTime.plusMillis(1), + DatastoreSnapshots.ALL_DATASTORE_KINDS); + + PCollectionTuple cloudSqlSnapshot = + SqlSnapshots.loadCloudSqlSnapshotByType( + pipeline, SqlSnapshots.ALL_SQL_ENTITIES, sqlSnapshotId); + + verify( + datastoreSnapshot.getAll().keySet().equals(cloudSqlSnapshot.getAll().keySet()), + "Expecting the same set of types in both snapshots."); + + for (Class clazz : SqlSnapshots.ALL_SQL_ENTITIES) { + TupleTag tag = ValidateSqlUtils.createSqlEntityTupleTag(clazz); + verify( + datastoreSnapshot.has(tag), "Missing %s in Datastore snapshot.", clazz.getSimpleName()); + verify(cloudSqlSnapshot.has(tag), "Missing %s in Cloud SQL snapshot.", clazz.getSimpleName()); + PCollectionList.of(datastoreSnapshot.get(tag)) + .and(cloudSqlSnapshot.get(tag)) + .apply("Combine from both snapshots: " + clazz.getSimpleName(), Flatten.pCollections()) + .apply( + "Assign primary key to merged " + clazz.getSimpleName(), + WithKeys.of(ValidateSqlPipeline::getPrimaryKeyString).withKeyType(strings())) + .apply("Group by primary key " + clazz.getSimpleName(), GroupByKey.create()) + .apply("Compare " + clazz.getSimpleName(), ParDo.of(new CompareSqlEntity())); + } + } + + private static String getPrimaryKeyString(SqlEntity sqlEntity) { + // SqlEntity.getPrimaryKeyString only works with entities registered with Hibernate. + // We are using the BulkQueryJpaTransactionManager, which does not recognize DomainBase and + // DomainHistory. See BulkQueryEntities.java for more information. + if (sqlEntity instanceof DomainBase) { + return "DomainBase_" + ((DomainBase) sqlEntity).getRepoId(); + } + if (sqlEntity instanceof DomainHistory) { + return "DomainHistory_" + ((DomainHistory) sqlEntity).getDomainHistoryId().toString(); + } + return sqlEntity.getPrimaryKeyString(); + } + + public static void main(String[] args) { + ValidateSqlPipelineOptions options = + PipelineOptionsFactory.fromArgs(args).withValidation().as(ValidateSqlPipelineOptions.class); + + // Defensively set important options. + options.setIsolationOverride(TransactionIsolationLevel.TRANSACTION_REPEATABLE_READ); + options.setJpaTransactionManagerType(JpaTransactionManagerType.BULK_QUERY); + + // Reuse Dataflow worker initialization code to set up JPA in the pipeline harness. + new RegistryPipelineWorkerInitializer().beforeProcessing(options); + + DatastoreSnapshotInfo mostRecentExport = + DaggerLatestDatastoreSnapshotFinder_LatestDatastoreSnapshotFinderFinderComponent.create() + .datastoreSnapshotInfoFinder() + .getSnapshotInfo(); + + new ValidateSqlPipeline(options, mostRecentExport).run(Pipeline.create(options)); + } +} diff --git a/core/src/main/java/google/registry/beam/comparedb/ValidateSqlPipelineOptions.java b/core/src/main/java/google/registry/beam/comparedb/ValidateSqlPipelineOptions.java new file mode 100644 index 000000000..e108108a0 --- /dev/null +++ b/core/src/main/java/google/registry/beam/comparedb/ValidateSqlPipelineOptions.java @@ -0,0 +1,20 @@ +// Copyright 2021 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam.comparedb; + +import google.registry.beam.common.RegistryPipelineOptions; + +/** BEAM pipeline options for {@link ValidateSqlPipeline}. */ +public interface ValidateSqlPipelineOptions extends RegistryPipelineOptions {} diff --git a/core/src/main/java/google/registry/beam/comparedb/ValidateSqlUtils.java b/core/src/main/java/google/registry/beam/comparedb/ValidateSqlUtils.java new file mode 100644 index 000000000..130f2de51 --- /dev/null +++ b/core/src/main/java/google/registry/beam/comparedb/ValidateSqlUtils.java @@ -0,0 +1,321 @@ +// Copyright 2021 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam.comparedb; + +import static com.google.common.base.Verify.verify; +import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSortedMap; +import com.google.common.collect.Maps; +import com.google.common.flogger.FluentLogger; +import google.registry.model.EppResource; +import google.registry.model.ImmutableObject; +import google.registry.model.billing.BillingEvent.OneTime; +import google.registry.model.contact.ContactBase; +import google.registry.model.contact.ContactHistory; +import google.registry.model.domain.DomainContent; +import google.registry.model.domain.DomainHistory; +import google.registry.model.eppcommon.AuthInfo; +import google.registry.model.host.HostHistory; +import google.registry.model.poll.PollMessage; +import google.registry.model.replay.SqlEntity; +import google.registry.model.reporting.HistoryEntry; +import google.registry.model.tld.Registry; +import java.lang.reflect.Field; +import java.math.BigInteger; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; +import org.joda.money.Money; + +/** Helpers for use by {@link ValidateSqlPipeline}. */ +final class ValidateSqlUtils { + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); + + private ValidateSqlUtils() {} + + /** + * Query template for finding the median value of the {@code history_revision_id} column in one of + * the History tables. + * + *

The {@link ValidateSqlPipeline} uses this query to parallelize the query to some of the + * history tables. Although the {@code repo_id} column is the leading column in the primary keys + * of these tables, in practice and with production data, division by {@code history_revision_id} + * works slightly faster for unknown reasons. + */ + private static final String MEDIAN_ID_QUERY_TEMPLATE = + "SELECT history_revision_id FROM ( " + + " SELECT" + + " ROW_NUMBER() OVER (ORDER BY history_revision_id ASC) AS rownumber," + + " history_revision_id" + + " FROM \"%TABLE%\"" + + ") AS foo\n" + + "WHERE rownumber in (select count(*) / 2 + 1 from \"%TABLE%\")"; + + static Optional getMedianIdForHistoryTable(String tableName) { + Preconditions.checkArgument( + tableName.endsWith("History"), "Table must be one of the History tables."); + String sqlText = MEDIAN_ID_QUERY_TEMPLATE.replace("%TABLE%", tableName); + List results = + jpaTm() + .transact(() -> jpaTm().getEntityManager().createNativeQuery(sqlText).getResultList()); + verify(results.size() < 2, "MidPoint query should have at most one result."); + if (results.isEmpty()) { + return Optional.empty(); + } + return Optional.of(((BigInteger) results.get(0)).longValue()); + } + + static TupleTag createSqlEntityTupleTag(Class actualType) { + return new TupleTag(actualType.getSimpleName()) {}; + } + + static class CompareSqlEntity extends DoFn>, Void> { + private final HashMap totalCounters = new HashMap<>(); + private final HashMap missingCounters = new HashMap<>(); + private final HashMap unequalCounters = new HashMap<>(); + private final HashMap badEntityCounters = new HashMap<>(); + + private volatile boolean logPrinted = false; + + private String getCounterKey(Class clazz) { + return PollMessage.class.isAssignableFrom(clazz) ? "PollMessage" : clazz.getSimpleName(); + } + + private synchronized void ensureCounterExists(String counterKey) { + if (totalCounters.containsKey(counterKey)) { + return; + } + totalCounters.put(counterKey, Metrics.counter("CompareDB", "Total Compared: " + counterKey)); + missingCounters.put( + counterKey, Metrics.counter("CompareDB", "Missing In One DB: " + counterKey)); + unequalCounters.put(counterKey, Metrics.counter("CompareDB", "Not Equal:" + counterKey)); + badEntityCounters.put(counterKey, Metrics.counter("CompareDB", "Bad Entities:" + counterKey)); + } + + /** + * A rudimentary debugging helper that prints the first pair of unequal entities in each worker. + * This will be removed when we start exporting such entities to GCS. + */ + void logDiff(String key, Object entry0, Object entry1) { + if (logPrinted) { + return; + } + logPrinted = true; + Map fields0 = ((ImmutableObject) entry0).toDiffableFieldMap(); + Map fields1 = ((ImmutableObject) entry1).toDiffableFieldMap(); + StringBuilder sb = new StringBuilder(); + fields0.forEach( + (field, value) -> { + if (fields1.containsKey(field)) { + if (!Objects.equals(value, fields1.get(field))) { + sb.append(field + " not match: " + value + " -> " + fields1.get(field) + "\n"); + } + } else { + sb.append(field + "Not found in entity 2\n"); + } + }); + fields1.forEach( + (field, value) -> { + if (!fields0.containsKey(field)) { + sb.append(field + "Not found in entity 1\n"); + } + }); + logger.atWarning().log(key + " " + sb.toString()); + } + + @ProcessElement + public void processElement(@Element KV> kv) { + ImmutableList entities = ImmutableList.copyOf(kv.getValue()); + + verify(!entities.isEmpty(), "Can't happen: no value for key %s.", kv.getKey()); + verify(entities.size() <= 2, "Unexpected duplicates for key %s", kv.getKey()); + + String counterKey = getCounterKey(entities.get(0).getClass()); + ensureCounterExists(counterKey); + totalCounters.get(counterKey).inc(); + + if (entities.size() == 1) { + missingCounters.get(counterKey).inc(); + // Temporary debugging help. See logDiff() above. + if (!logPrinted) { + logPrinted = true; + logger.atWarning().log("Unexpected single entity: %s", kv.getKey()); + } + return; + } + SqlEntity entity0; + SqlEntity entity1; + + try { + entity0 = normalizeEntity(entities.get(0)); + entity1 = normalizeEntity(entities.get(1)); + } catch (Exception e) { + // Temporary debugging help. See logDiff() above. + if (!logPrinted) { + logPrinted = true; + badEntityCounters.get(counterKey).inc(); + } + return; + } + + if (!Objects.equals(entity0, entity1)) { + unequalCounters.get(counterKey).inc(); + logDiff(kv.getKey(), entities.get(0), entities.get(1)); + } + } + } + + static SqlEntity normalizeEntity(SqlEntity sqlEntity) { + if (sqlEntity instanceof EppResource) { + return normalizeEppResource(sqlEntity); + } + if (sqlEntity instanceof HistoryEntry) { + return (SqlEntity) normalizeHistoryEntry((HistoryEntry) sqlEntity); + } + if (sqlEntity instanceof Registry) { + return normalizeRegistry((Registry) sqlEntity); + } + if (sqlEntity instanceof OneTime) { + return normalizeOnetime((OneTime) sqlEntity); + } + return sqlEntity; + } + + /** + * Normalizes an {@link EppResource} instance for comparison. + * + *

This method may modify the input object using reflection instead of making a copy with + * {@code eppResource.asBuilder().build()}, because when {@code eppResource} is a {@link + * google.registry.model.domain.DomainBase}, the {@code build} method accesses the Database, which + * we want to avoid. + */ + static SqlEntity normalizeEppResource(SqlEntity eppResource) { + try { + Field authField = + eppResource instanceof DomainContent + ? DomainContent.class.getDeclaredField("authInfo") + : eppResource instanceof ContactBase + ? ContactBase.class.getDeclaredField("authInfo") + : null; + if (authField != null) { + authField.setAccessible(true); + AuthInfo authInfo = (AuthInfo) authField.get(eppResource); + // When AuthInfo is missing, the authInfo field is null if the object is loaded from + // Datastore, or a PasswordAuth with null properties if loaded from SQL. In the second case + // we set the authInfo field to null. + if (authInfo != null + && authInfo.getPw() != null + && authInfo.getPw().getRepoId() == null + && authInfo.getPw().getValue() == null) { + authField.set(eppResource, null); + } + } + + Field field = EppResource.class.getDeclaredField("revisions"); + field.setAccessible(true); + field.set(eppResource, null); + return eppResource; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Normalizes a {@link HistoryEntry} for comparison. + * + *

This method modifies the input using reflection because relevant builder methods performs + * unwanted checks and changes. + */ + static HistoryEntry normalizeHistoryEntry(HistoryEntry historyEntry) { + // History objects from Datastore do not have details of their EppResource objects + // (domainContent, contactBase, hostBase). + try { + if (historyEntry instanceof DomainHistory) { + Field domainContent = DomainHistory.class.getDeclaredField("domainContent"); + domainContent.setAccessible(true); + domainContent.set(historyEntry, null); + Field domainTransactionRecords = + HistoryEntry.class.getDeclaredField("domainTransactionRecords"); + domainTransactionRecords.setAccessible(true); + Set domainTransactionRecordsValue = (Set) domainTransactionRecords.get(historyEntry); + if (domainTransactionRecordsValue != null && domainTransactionRecordsValue.isEmpty()) { + domainTransactionRecords.set(historyEntry, null); + } + } else if (historyEntry instanceof ContactHistory) { + Field contactBase = ContactHistory.class.getDeclaredField("contactBase"); + contactBase.setAccessible(true); + contactBase.set(historyEntry, null); + } else if (historyEntry instanceof HostHistory) { + Field hostBase = HostHistory.class.getDeclaredField("hostBase"); + hostBase.setAccessible(true); + hostBase.set(historyEntry, null); + } + return historyEntry; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + static Registry normalizeRegistry(Registry registry) { + if (registry.getStandardCreateCost().getAmount().scale() == 0) { + return registry; + } + return registry + .asBuilder() + .setCreateBillingCost(normalizeMoney(registry.getStandardCreateCost())) + .setRestoreBillingCost(normalizeMoney(registry.getStandardRestoreCost())) + .setServerStatusChangeBillingCost(normalizeMoney(registry.getServerStatusChangeCost())) + .setRegistryLockOrUnlockBillingCost( + normalizeMoney(registry.getRegistryLockOrUnlockBillingCost())) + .setRenewBillingCostTransitions( + ImmutableSortedMap.copyOf( + Maps.transformValues( + registry.getRenewBillingCostTransitions(), ValidateSqlUtils::normalizeMoney))) + .setEapFeeSchedule( + ImmutableSortedMap.copyOf( + Maps.transformValues( + registry.getEapFeeScheduleAsMap(), ValidateSqlUtils::normalizeMoney))) + .build(); + } + + /** Normalizes an {@link OneTime} instance for comparison. */ + static OneTime normalizeOnetime(OneTime oneTime) { + Money cost = oneTime.getCost(); + if (cost.getAmount().scale() == 0) { + return oneTime; + } + try { + return oneTime.asBuilder().setCost(normalizeMoney(oneTime.getCost())).build(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + static Money normalizeMoney(Money original) { + // Strips ".00" from the amount. + return Money.of(original.getCurrencyUnit(), original.getAmount().stripTrailingZeros()); + } +} diff --git a/core/src/main/java/google/registry/beam/initsql/Transforms.java b/core/src/main/java/google/registry/beam/initsql/Transforms.java index 0c502b0a9..821dacf26 100644 --- a/core/src/main/java/google/registry/beam/initsql/Transforms.java +++ b/core/src/main/java/google/registry/beam/initsql/Transforms.java @@ -101,8 +101,9 @@ public final class Transforms { } /** - * Composite {@link PTransform transform} that loads the Datastore snapshot at {@code - * commitLogToTime} for caller specified {@code kinds}. + * Composite {@link PTransform transform} that loads the Datastore snapshot right before {@code + * commitLogToTime} for caller specified {@code kinds}. The resulting snapshot has all changes + * that happened before {@code commitLogToTime}, and none at or after {@code commitLogToTime}. * *

Caller must provide the location of a Datastore export that started AFTER {@code * commitLogFromTime} and completed BEFORE {@code commitLogToTime}, as well as the root directory @@ -363,7 +364,7 @@ public final class Transforms { * to make Optional work with BEAM) */ @Nullable - public static Object convertVersionedEntityToSqlEntity(VersionedEntity dsEntity) { + public static SqlEntity convertVersionedEntityToSqlEntity(VersionedEntity dsEntity) { return dsEntity .getEntity() .filter(Transforms::isMigratable) diff --git a/core/src/main/java/google/registry/model/ImmutableObject.java b/core/src/main/java/google/registry/model/ImmutableObject.java index 7144c8a85..fb615c714 100644 --- a/core/src/main/java/google/registry/model/ImmutableObject.java +++ b/core/src/main/java/google/registry/model/ImmutableObject.java @@ -156,6 +156,10 @@ public abstract class ImmutableObject implements Cloneable { * } * } * + * + *

This method makes use of {@link #toStringHelper}, which embeds {@link + * System#identityHashCode} in the output string. Subclasses that require deterministic string + * representations across JVM instances should override this method. */ @Override public String toString() { diff --git a/core/src/main/java/google/registry/model/bulkquery/DomainHistoryLite.java b/core/src/main/java/google/registry/model/bulkquery/DomainHistoryLite.java index b102edcdd..4891031b7 100644 --- a/core/src/main/java/google/registry/model/bulkquery/DomainHistoryLite.java +++ b/core/src/main/java/google/registry/model/bulkquery/DomainHistoryLite.java @@ -107,6 +107,10 @@ public class DomainHistoryLite extends HistoryEntry implements SqlOnlyEntity { return VKey.create(DomainBase.class, getDomainRepoId()); } + public DomainHistoryId getDomainHistoryId() { + return new DomainHistoryId(getDomainRepoId(), getId()); + } + @PostLoad void postLoad() { if (domainContent == null) { diff --git a/core/src/main/java/google/registry/model/common/Cursor.java b/core/src/main/java/google/registry/model/common/Cursor.java index 2aca2ad80..c3fb5b85d 100644 --- a/core/src/main/java/google/registry/model/common/Cursor.java +++ b/core/src/main/java/google/registry/model/common/Cursor.java @@ -19,7 +19,9 @@ import static com.google.common.base.Preconditions.checkNotNull; import static google.registry.model.common.EntityGroupRoot.getCrossTldKey; import static google.registry.util.DateTimeUtils.START_OF_TIME; +import com.google.common.base.Joiner; import com.google.common.base.Splitter; +import com.google.common.collect.ImmutableSortedMap; import com.googlecode.objectify.Key; import com.googlecode.objectify.annotation.Entity; import com.googlecode.objectify.annotation.Id; @@ -294,5 +296,22 @@ public class Cursor extends ImmutableObject implements DatastoreAndSqlEntity, Un this.type = type; this.scope = scope; } + + /** + * A deterministic string representation of a {@link CursorId}. See {@link + * ImmutableObject#toString} for more information. + */ + @Override + public String toString() { + return String.format( + "%s: {\n%s", + getClass().getSimpleName(), + Joiner.on('\n') + .join( + ImmutableSortedMap.of("scope", scope, "type", type) + .entrySet())) + .replaceAll("\n", "\n ") + + "\n}"; + } } } diff --git a/core/src/main/java/google/registry/model/contact/ContactHistory.java b/core/src/main/java/google/registry/model/contact/ContactHistory.java index ab2f6de18..7a8f2061c 100644 --- a/core/src/main/java/google/registry/model/contact/ContactHistory.java +++ b/core/src/main/java/google/registry/model/contact/ContactHistory.java @@ -16,6 +16,8 @@ package google.registry.model.contact; import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableSortedMap; import com.googlecode.objectify.Key; import com.googlecode.objectify.annotation.EntitySubclass; import google.registry.model.EppResource; @@ -200,6 +202,24 @@ public class ContactHistory extends HistoryEntry implements SqlEntity, UnsafeSer private void setId(long id) { this.id = id; } + + /** + * A deterministic string representation of a {@link ContactHistoryId}. See {@link + * ImmutableObject#toString} for more information. + */ + @Override + public String toString() { + return String.format( + "%s: {\n%s", + getClass().getSimpleName(), + Joiner.on('\n') + .join( + ImmutableSortedMap.of( + "contactRepoId", getContactRepoId(), "id", getId()) + .entrySet())) + .replaceAll("\n", "\n ") + + "\n}"; + } } @Override diff --git a/core/src/main/java/google/registry/model/domain/DomainHistory.java b/core/src/main/java/google/registry/model/domain/DomainHistory.java index 79b14da47..666741a28 100644 --- a/core/src/main/java/google/registry/model/domain/DomainHistory.java +++ b/core/src/main/java/google/registry/model/domain/DomainHistory.java @@ -18,7 +18,9 @@ import static com.google.common.collect.ImmutableSet.toImmutableSet; import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; import static google.registry.util.CollectionUtils.nullToEmptyImmutableCopy; +import com.google.common.base.Joiner; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ImmutableSortedMap; import com.googlecode.objectify.Key; import com.googlecode.objectify.annotation.EntitySubclass; import google.registry.model.EppResource; @@ -382,6 +384,24 @@ public class DomainHistory extends HistoryEntry implements SqlEntity { private void setId(long id) { this.id = id; } + + /** + * A deterministic string representation of a {@link DomainHistoryId}. See {@link + * ImmutableObject#toString} for more information. + */ + @Override + public String toString() { + return String.format( + "%s: {\n%s", + getClass().getSimpleName(), + Joiner.on('\n') + .join( + ImmutableSortedMap.of( + "domainRepoId", getDomainRepoId(), "id", getId()) + .entrySet())) + .replaceAll("\n", "\n ") + + "\n}"; + } } @Override diff --git a/core/src/main/java/google/registry/model/host/HostHistory.java b/core/src/main/java/google/registry/model/host/HostHistory.java index 0831a65e6..247f9e700 100644 --- a/core/src/main/java/google/registry/model/host/HostHistory.java +++ b/core/src/main/java/google/registry/model/host/HostHistory.java @@ -16,6 +16,8 @@ package google.registry.model.host; import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableSortedMap; import com.googlecode.objectify.Key; import com.googlecode.objectify.annotation.EntitySubclass; import google.registry.model.EppResource; @@ -203,6 +205,24 @@ public class HostHistory extends HistoryEntry implements SqlEntity, UnsafeSerial private void setId(long id) { this.id = id; } + + /** + * A deterministic string representation of a {@link HostHistoryId}. See {@link + * ImmutableObject#toString} for more information. + */ + @Override + public String toString() { + return String.format( + "%s: {\n%s", + getClass().getSimpleName(), + Joiner.on('\n') + .join( + ImmutableSortedMap.of( + "hostRepoId", getHostRepoId(), "id", getId()) + .entrySet())) + .replaceAll("\n", "\n ") + + "\n}"; + } } @Override diff --git a/core/src/main/java/google/registry/model/registrar/RegistrarContact.java b/core/src/main/java/google/registry/model/registrar/RegistrarContact.java index d4aefc041..647cf4214 100644 --- a/core/src/main/java/google/registry/model/registrar/RegistrarContact.java +++ b/core/src/main/java/google/registry/model/registrar/RegistrarContact.java @@ -32,7 +32,9 @@ import static java.util.stream.Collectors.joining; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Enums; +import com.google.common.base.Joiner; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ImmutableSortedMap; import com.google.common.collect.ImmutableSortedSet; import com.google.common.collect.Streams; import com.googlecode.objectify.Key; @@ -413,6 +415,24 @@ public class RegistrarContact extends ImmutableObject this.emailAddress = emailAddress; this.registrarId = registrarId; } + + /** + * A deterministic string representation of a {@link RegistrarPocId}. See {@link + * ImmutableObject#toString} for more information. + */ + @Override + public String toString() { + return String.format( + "%s: {\n%s", + getClass().getSimpleName(), + Joiner.on('\n') + .join( + ImmutableSortedMap.of( + "emailAddress", emailAddress, "registrarId", registrarId) + .entrySet())) + .replaceAll("\n", "\n ") + + "\n}"; + } } /** A builder for constructing a {@link RegistrarContact}, since it is immutable. */ diff --git a/core/src/test/java/google/registry/beam/common/RegistryJpaWriteTest.java b/core/src/test/java/google/registry/beam/common/RegistryJpaWriteTest.java index 499fa2f71..366d1ab4f 100644 --- a/core/src/test/java/google/registry/beam/common/RegistryJpaWriteTest.java +++ b/core/src/test/java/google/registry/beam/common/RegistryJpaWriteTest.java @@ -55,7 +55,8 @@ class RegistryJpaWriteTest implements Serializable { @RegisterExtension @Order(Order.DEFAULT - 1) - final transient DatastoreEntityExtension datastore = new DatastoreEntityExtension(); + final transient DatastoreEntityExtension datastore = + new DatastoreEntityExtension().allThreads(true); @RegisterExtension final transient InjectExtension injectExtension = new InjectExtension(); diff --git a/core/src/test/java/google/registry/beam/comparedb/DatastoreSnapshotsTest.java b/core/src/test/java/google/registry/beam/comparedb/DatastoreSnapshotsTest.java new file mode 100644 index 000000000..2fb62a5f7 --- /dev/null +++ b/core/src/test/java/google/registry/beam/comparedb/DatastoreSnapshotsTest.java @@ -0,0 +1,93 @@ +// Copyright 2021 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam.comparedb; + +import com.google.common.collect.ImmutableSet; +import google.registry.beam.TestPipelineExtension; +import google.registry.beam.initsql.DatastoreSetupHelper; +import google.registry.model.domain.DomainHistory; +import google.registry.model.ofy.Ofy; +import google.registry.model.registrar.Registrar; +import google.registry.model.replay.SqlEntity; +import google.registry.persistence.transaction.JpaTestExtensions; +import google.registry.persistence.transaction.JpaTestExtensions.JpaIntegrationTestExtension; +import google.registry.testing.DatastoreEntityExtension; +import google.registry.testing.FakeClock; +import google.registry.testing.InjectExtension; +import java.io.Serializable; +import java.nio.file.Path; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.joda.time.DateTime; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; + +/** Unit tests for {@link DatastoreSnapshots}. */ +class DatastoreSnapshotsTest { + static final DateTime START_TIME = DateTime.parse("2000-01-01T00:00:00.0Z"); + + private FakeClock fakeClock = new FakeClock(START_TIME); + + @RegisterExtension + @Order(Order.DEFAULT - 1) + final transient DatastoreEntityExtension datastore = + new DatastoreEntityExtension().allThreads(true); + + @RegisterExtension final transient InjectExtension injectExtension = new InjectExtension(); + + @SuppressWarnings("WeakerAccess") + @TempDir + transient Path tmpDir; + + @RegisterExtension + final transient TestPipelineExtension testPipeline = + TestPipelineExtension.create().enableAbandonedNodeEnforcement(true); + + @RegisterExtension + final transient JpaIntegrationTestExtension database = + new JpaTestExtensions.Builder().withClock(fakeClock).buildIntegrationTestExtension(); + + DatastoreSetupHelper setupHelper; + + @BeforeEach + void beforeEach() throws Exception { + injectExtension.setStaticField(Ofy.class, "clock", fakeClock); + setupHelper = new DatastoreSetupHelper(tmpDir, fakeClock).initializeData(); + testPipeline + .getCoderRegistry() + .registerCoderForClass(SqlEntity.class, SerializableCoder.of(Serializable.class)); + } + + @Test + void loadDatastoreSnapshotByKind() { + PCollectionTuple tuple = + DatastoreSnapshots.loadDatastoreSnapshotByKind( + testPipeline, + setupHelper.exportDir.getAbsolutePath(), + setupHelper.commitLogDir.getAbsolutePath(), + START_TIME, + fakeClock.nowUtc().plusMillis(1), + ImmutableSet.copyOf(DatastoreSetupHelper.ALL_KINDS)); + PAssert.that(tuple.get(ValidateSqlUtils.createSqlEntityTupleTag(Registrar.class))) + .containsInAnyOrder(setupHelper.registrar1, setupHelper.registrar2); + PAssert.that(tuple.get(ValidateSqlUtils.createSqlEntityTupleTag(DomainHistory.class))) + .containsInAnyOrder(setupHelper.historyEntry); + testPipeline.run(); + } +} diff --git a/core/src/test/java/google/registry/beam/comparedb/SqlSnapshotsTest.java b/core/src/test/java/google/registry/beam/comparedb/SqlSnapshotsTest.java new file mode 100644 index 000000000..fcc168aa1 --- /dev/null +++ b/core/src/test/java/google/registry/beam/comparedb/SqlSnapshotsTest.java @@ -0,0 +1,100 @@ +// Copyright 2021 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam.comparedb; + +import static google.registry.beam.comparedb.ValidateSqlUtils.createSqlEntityTupleTag; +import static org.joda.time.DateTimeZone.UTC; + +import com.google.common.collect.ImmutableSet; +import google.registry.beam.TestPipelineExtension; +import google.registry.model.bulkquery.TestSetupHelper; +import google.registry.model.contact.ContactResource; +import google.registry.model.domain.DomainBase; +import google.registry.model.domain.DomainHistory; +import google.registry.model.host.HostResource; +import google.registry.model.registrar.Registrar; +import google.registry.model.replay.SqlEntity; +import google.registry.model.tld.Registry; +import google.registry.testing.AppEngineExtension; +import google.registry.testing.DatastoreEntityExtension; +import google.registry.testing.FakeClock; +import java.io.Serializable; +import java.util.Optional; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.joda.time.DateTime; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +/** Unit tests for {@link SqlSnapshots}. */ +class SqlSnapshotsTest { + + protected FakeClock fakeClock = new FakeClock(DateTime.now(UTC)); + + @RegisterExtension + @Order(Order.DEFAULT - 1) + final transient DatastoreEntityExtension datastore = + new DatastoreEntityExtension().allThreads(true); + + @RegisterExtension + public final AppEngineExtension appEngine = + AppEngineExtension.builder().withDatastoreAndCloudSql().withClock(fakeClock).build(); + + @RegisterExtension + final transient TestPipelineExtension testPipeline = + TestPipelineExtension.create().enableAbandonedNodeEnforcement(true); + + private final TestSetupHelper setupHelper = new TestSetupHelper(fakeClock); + + @BeforeEach + void setUp() { + testPipeline + .getCoderRegistry() + .registerCoderForClass(SqlEntity.class, SerializableCoder.of(Serializable.class)); + setupHelper.initializeAllEntities(); + setupHelper.setupBulkQueryJpaTm(appEngine); + } + + @AfterEach + void afterEach() { + setupHelper.tearDownBulkQueryJpaTm(); + } + + @Test + void loadCloudSqlSnapshotByType() { + PCollectionTuple sqlSnapshot = + SqlSnapshots.loadCloudSqlSnapshotByType( + testPipeline, + ImmutableSet.of( + Registry.class, + Registrar.class, + DomainBase.class, + DomainHistory.class, + ContactResource.class, + HostResource.class), + Optional.empty()); + PAssert.that(sqlSnapshot.get(createSqlEntityTupleTag(Registry.class))) + .containsInAnyOrder(setupHelper.registry); + PAssert.that(sqlSnapshot.get(createSqlEntityTupleTag(DomainBase.class))) + .containsInAnyOrder(setupHelper.domain); + PAssert.that(sqlSnapshot.get(createSqlEntityTupleTag(DomainHistory.class))) + .containsInAnyOrder(setupHelper.domainHistory); + testPipeline.run(); + } +} diff --git a/core/src/test/java/google/registry/beam/comparedb/ValidateSqlUtilsTest.java b/core/src/test/java/google/registry/beam/comparedb/ValidateSqlUtilsTest.java new file mode 100644 index 000000000..c45af8d68 --- /dev/null +++ b/core/src/test/java/google/registry/beam/comparedb/ValidateSqlUtilsTest.java @@ -0,0 +1,63 @@ +// Copyright 2021 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam.comparedb; + +import static com.google.common.truth.Truth8.assertThat; +import static google.registry.beam.comparedb.ValidateSqlUtils.getMedianIdForHistoryTable; +import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; +import static org.joda.time.DateTimeZone.UTC; + +import com.google.common.truth.Truth; +import google.registry.model.bulkquery.TestSetupHelper; +import google.registry.model.domain.DomainHistory; +import google.registry.testing.AppEngineExtension; +import google.registry.testing.FakeClock; +import org.joda.time.DateTime; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +/** Unit tests for {@link ValidateSqlUtils}. */ +class ValidateSqlUtilsTest { + + private final FakeClock fakeClock = new FakeClock(DateTime.now(UTC)); + + private final TestSetupHelper setupHelper = new TestSetupHelper(fakeClock); + + @RegisterExtension + public final AppEngineExtension appEngine = + AppEngineExtension.builder().withDatastoreAndCloudSql().withClock(fakeClock).build(); + + @Test + void getMedianIdForHistoryTable_emptyTable() { + assertThat(getMedianIdForHistoryTable("DomainHistory")).isEmpty(); + } + + @Test + void getMedianIdForHistoryTable_oneRow() { + setupHelper.initializeAllEntities(); + Truth.assertThat(jpaTm().transact(() -> jpaTm().loadAllOf(DomainHistory.class))).hasSize(1); + assertThat(getMedianIdForHistoryTable("DomainHistory")) + .hasValue(setupHelper.domainHistory.getId()); + } + + @Test + void getMedianIdForHistoryTable_twoRows() { + setupHelper.initializeAllEntities(); + setupHelper.applyChangeToDomainAndHistory(); + Truth.assertThat(jpaTm().transact(() -> jpaTm().loadAllOf(DomainHistory.class))).hasSize(2); + assertThat(getMedianIdForHistoryTable("DomainHistory")) + .hasValue(setupHelper.domainHistory.getId()); + } +} diff --git a/core/src/test/java/google/registry/beam/initsql/DatastoreSetupHelper.java b/core/src/test/java/google/registry/beam/initsql/DatastoreSetupHelper.java new file mode 100644 index 000000000..750a49245 --- /dev/null +++ b/core/src/test/java/google/registry/beam/initsql/DatastoreSetupHelper.java @@ -0,0 +1,295 @@ +// Copyright 2021 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam.initsql; + +import static google.registry.model.common.Cursor.CursorType.BRDA; +import static google.registry.model.common.Cursor.CursorType.RECURRING_BILLING; +import static google.registry.model.domain.token.AllocationToken.TokenType.SINGLE_USE; +import static google.registry.testing.DatabaseHelper.newRegistry; +import static google.registry.testing.DatabaseHelper.persistResource; +import static google.registry.testing.DatabaseHelper.persistSimpleResource; +import static google.registry.util.DateTimeUtils.END_OF_TIME; +import static google.registry.util.DateTimeUtils.START_OF_TIME; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.googlecode.objectify.Key; +import google.registry.flows.domain.DomainFlowUtils; +import google.registry.model.billing.BillingEvent; +import google.registry.model.billing.BillingEvent.Flag; +import google.registry.model.billing.BillingEvent.Reason; +import google.registry.model.common.Cursor; +import google.registry.model.contact.ContactResource; +import google.registry.model.domain.DesignatedContact; +import google.registry.model.domain.DomainAuthInfo; +import google.registry.model.domain.DomainBase; +import google.registry.model.domain.DomainHistory; +import google.registry.model.domain.GracePeriod; +import google.registry.model.domain.launch.LaunchNotice; +import google.registry.model.domain.rgp.GracePeriodStatus; +import google.registry.model.domain.secdns.DelegationSignerData; +import google.registry.model.domain.token.AllocationToken; +import google.registry.model.eppcommon.AuthInfo.PasswordAuth; +import google.registry.model.eppcommon.StatusValue; +import google.registry.model.eppcommon.Trid; +import google.registry.model.host.HostResource; +import google.registry.model.poll.PollMessage; +import google.registry.model.registrar.Registrar; +import google.registry.model.registrar.RegistrarContact; +import google.registry.model.reporting.HistoryEntry; +import google.registry.model.tld.Registry; +import google.registry.model.transfer.DomainTransferData; +import google.registry.model.transfer.TransferStatus; +import google.registry.persistence.VKey; +import google.registry.testing.AppEngineExtension; +import google.registry.testing.FakeClock; +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Path; +import org.joda.money.Money; + +/** + * Sets up a test scenario in Datastore. + * + *

The {@link #initializeData} populates Datastore with test data, including {@link DomainBase}, + * {@link DomainHistory}, and commit logs. The up-to-date version of the relevant entities are saved + * in public instance variables (e.g., {@link #domain} for easy access. + */ +public class DatastoreSetupHelper { + + /** + * All kinds of entities to be set up in the Datastore. Must contain all kinds known to {@link + * InitSqlPipeline}. + */ + public static final ImmutableList> ALL_KINDS = + ImmutableList.of( + Registry.class, + Cursor.class, + Registrar.class, + ContactResource.class, + RegistrarContact.class, + DomainBase.class, + HostResource.class, + HistoryEntry.class, + AllocationToken.class, + BillingEvent.Recurring.class, + BillingEvent.OneTime.class, + BillingEvent.Cancellation.class, + PollMessage.class); + + private final Path tmpDir; + private final FakeClock fakeClock; + + public File exportRootDir; + public File exportDir; + public File commitLogDir; + + public Registrar registrar1; + public Registrar registrar2; + public DomainBase domain; + public ContactResource contact1; + public ContactResource contact2; + public HostResource hostResource; + + public DomainHistory historyEntry; + + public Cursor globalCursor; + public Cursor tldCursor; + + public DatastoreSetupHelper(Path tempDir, FakeClock fakeClock) { + this.tmpDir = tempDir; + this.fakeClock = fakeClock; + } + + public DatastoreSetupHelper initializeData() throws Exception { + try (BackupTestStore store = new BackupTestStore(fakeClock)) { + exportRootDir = Files.createDirectory(tmpDir.resolve("exports")).toFile(); + + persistResource(newRegistry("com", "COM")); + registrar1 = persistResource(AppEngineExtension.makeRegistrar1()); + registrar2 = persistResource(AppEngineExtension.makeRegistrar2()); + Key domainKey = Key.create(null, DomainBase.class, "4-COM"); + hostResource = + persistResource( + new HostResource.Builder() + .setHostName("ns1.example.com") + .setSuperordinateDomain(VKey.from(domainKey)) + .setRepoId("1-COM") + .setCreationRegistrarId(registrar1.getRegistrarId()) + .setPersistedCurrentSponsorRegistrarId(registrar2.getRegistrarId()) + .build()); + contact1 = + persistResource( + new ContactResource.Builder() + .setContactId("contact_id1") + .setRepoId("2-COM") + .setCreationRegistrarId(registrar1.getRegistrarId()) + .setPersistedCurrentSponsorRegistrarId(registrar2.getRegistrarId()) + .build()); + contact2 = + persistResource( + new ContactResource.Builder() + .setContactId("contact_id2") + .setRepoId("3-COM") + .setCreationRegistrarId(registrar1.getRegistrarId()) + .setPersistedCurrentSponsorRegistrarId(registrar1.getRegistrarId()) + .build()); + persistSimpleResource( + new RegistrarContact.Builder() + .setParent(registrar1) + .setName("John Abused") + .setEmailAddress("johnabuse@example.com") + .setVisibleInWhoisAsAdmin(true) + .setVisibleInWhoisAsTech(false) + .setPhoneNumber("+1.2125551213") + .setFaxNumber("+1.2125551213") + .setTypes(ImmutableSet.of(RegistrarContact.Type.ABUSE, RegistrarContact.Type.ADMIN)) + .build()); + historyEntry = + persistResource( + new DomainHistory.Builder() + .setDomainRepoId(domainKey.getName()) + .setModificationTime(fakeClock.nowUtc()) + .setRegistrarId(registrar1.getRegistrarId()) + .setType(HistoryEntry.Type.DOMAIN_CREATE) + .build()); + persistResource( + new AllocationToken.Builder().setToken("abc123").setTokenType(SINGLE_USE).build()); + Key historyEntryKey = Key.create(historyEntry); + BillingEvent.OneTime onetimeBillEvent = + new BillingEvent.OneTime.Builder() + .setId(1) + .setReason(Reason.RENEW) + .setTargetId("example.com") + .setRegistrarId("TheRegistrar") + .setCost(Money.parse("USD 44.00")) + .setPeriodYears(4) + .setEventTime(fakeClock.nowUtc()) + .setBillingTime(fakeClock.nowUtc()) + .setParent(historyEntryKey) + .build(); + persistResource(onetimeBillEvent); + Key oneTimeBillKey = Key.create(onetimeBillEvent); + BillingEvent.Recurring recurringBillEvent = + new BillingEvent.Recurring.Builder() + .setId(2) + .setReason(Reason.RENEW) + .setFlags(ImmutableSet.of(Flag.AUTO_RENEW)) + .setTargetId("example.com") + .setRegistrarId("TheRegistrar") + .setEventTime(fakeClock.nowUtc()) + .setRecurrenceEndTime(END_OF_TIME) + .setParent(historyEntryKey) + .build(); + persistResource(recurringBillEvent); + VKey recurringBillKey = recurringBillEvent.createVKey(); + PollMessage.Autorenew autorenewPollMessage = + new PollMessage.Autorenew.Builder() + .setId(3L) + .setTargetId("example.com") + .setRegistrarId("TheRegistrar") + .setEventTime(fakeClock.nowUtc()) + .setMsg("Domain was auto-renewed.") + .setParent(historyEntry) + .build(); + persistResource(autorenewPollMessage); + VKey autorenewPollKey = autorenewPollMessage.createVKey(); + PollMessage.OneTime oneTimePollMessage = + new PollMessage.OneTime.Builder() + .setId(1L) + .setParent(historyEntry) + .setEventTime(fakeClock.nowUtc()) + .setRegistrarId("TheRegistrar") + .setMsg(DomainFlowUtils.COLLISION_MESSAGE) + .build(); + persistResource(oneTimePollMessage); + VKey onetimePollKey = oneTimePollMessage.createVKey(); + domain = + persistResource( + new DomainBase.Builder() + .setDomainName("example.com") + .setRepoId("4-COM") + .setCreationRegistrarId(registrar1.getRegistrarId()) + .setLastEppUpdateTime(fakeClock.nowUtc()) + .setLastEppUpdateRegistrarId(registrar2.getRegistrarId()) + .setLastTransferTime(fakeClock.nowUtc()) + .setStatusValues( + ImmutableSet.of( + StatusValue.CLIENT_DELETE_PROHIBITED, + StatusValue.SERVER_DELETE_PROHIBITED, + StatusValue.SERVER_TRANSFER_PROHIBITED, + StatusValue.SERVER_UPDATE_PROHIBITED, + StatusValue.SERVER_RENEW_PROHIBITED, + StatusValue.SERVER_HOLD)) + .setRegistrant(contact1.createVKey()) + .setContacts( + ImmutableSet.of( + DesignatedContact.create( + DesignatedContact.Type.ADMIN, contact2.createVKey()))) + .setNameservers(ImmutableSet.of(hostResource.createVKey())) + .setSubordinateHosts(ImmutableSet.of("ns1.example.com")) + .setPersistedCurrentSponsorRegistrarId(registrar2.getRegistrarId()) + .setRegistrationExpirationTime(fakeClock.nowUtc().plusYears(1)) + .setAuthInfo(DomainAuthInfo.create(PasswordAuth.create("password"))) + .setDsData( + ImmutableSet.of(DelegationSignerData.create(1, 2, 3, new byte[] {0, 1, 2}))) + .setLaunchNotice( + LaunchNotice.create("tcnid", "validatorId", START_OF_TIME, START_OF_TIME)) + .setTransferData( + new DomainTransferData.Builder() + .setGainingRegistrarId(registrar1.getRegistrarId()) + .setLosingRegistrarId(registrar2.getRegistrarId()) + .setPendingTransferExpirationTime(fakeClock.nowUtc()) + .setServerApproveEntities( + ImmutableSet.of( + VKey.from(oneTimeBillKey), recurringBillKey, autorenewPollKey)) + .setServerApproveBillingEvent(VKey.from(oneTimeBillKey)) + .setServerApproveAutorenewEvent(recurringBillKey) + .setServerApproveAutorenewPollMessage(autorenewPollKey) + .setTransferRequestTime(fakeClock.nowUtc().plusDays(1)) + .setTransferStatus(TransferStatus.SERVER_APPROVED) + .setTransferRequestTrid(Trid.create("client-trid", "server-trid")) + .build()) + .setDeletePollMessage(onetimePollKey) + .setAutorenewBillingEvent(recurringBillKey) + .setAutorenewPollMessage(autorenewPollKey) + .setSmdId("smdid") + .addGracePeriod( + GracePeriod.create( + GracePeriodStatus.ADD, + "4-COM", + fakeClock.nowUtc().plusDays(1), + "TheRegistrar", + null)) + .build()); + persistResource( + new BillingEvent.Cancellation.Builder() + .setReason(Reason.RENEW) + .setTargetId(domain.getDomainName()) + .setRegistrarId(domain.getCurrentSponsorRegistrarId()) + .setEventTime(fakeClock.nowUtc()) + .setBillingTime(fakeClock.nowUtc()) + .setRecurringEventKey(recurringBillEvent.createVKey()) + .setParent(historyEntryKey) + .build()); + globalCursor = persistResource(Cursor.createGlobal(RECURRING_BILLING, fakeClock.nowUtc())); + tldCursor = persistResource(Cursor.create(BRDA, fakeClock.nowUtc(), Registry.get("com"))); + exportDir = store.export(exportRootDir.getAbsolutePath(), ALL_KINDS, ImmutableSet.of()); + commitLogDir = Files.createDirectory(tmpDir.resolve("commits")).toFile(); + fakeClock.advanceOneMilli(); + } + return this; + } +} diff --git a/core/src/test/java/google/registry/beam/initsql/ExportLoadingTransformsTest.java b/core/src/test/java/google/registry/beam/initsql/ExportLoadingTransformsTest.java index 109ff89da..ebcad614f 100644 --- a/core/src/test/java/google/registry/beam/initsql/ExportLoadingTransformsTest.java +++ b/core/src/test/java/google/registry/beam/initsql/ExportLoadingTransformsTest.java @@ -79,7 +79,7 @@ class ExportLoadingTransformsTest implements Serializable { @RegisterExtension @Order(value = 1) final transient DatastoreEntityExtension datastoreEntityExtension = - new DatastoreEntityExtension(); + new DatastoreEntityExtension().allThreads(true); @RegisterExtension final transient TestPipelineExtension testPipeline = diff --git a/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineTest.java b/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineTest.java index e740822e3..d08e69304 100644 --- a/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineTest.java +++ b/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineTest.java @@ -17,59 +17,22 @@ package google.registry.beam.initsql; import static com.google.common.truth.Truth.assertThat; import static google.registry.model.ImmutableObjectSubject.assertAboutImmutableObjects; import static google.registry.model.ImmutableObjectSubject.immutableObjectCorrespondence; -import static google.registry.model.common.Cursor.CursorType.BRDA; -import static google.registry.model.common.Cursor.CursorType.RECURRING_BILLING; -import static google.registry.model.domain.token.AllocationToken.TokenType.SINGLE_USE; import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; -import static google.registry.testing.DatabaseHelper.newRegistry; -import static google.registry.testing.DatabaseHelper.persistResource; -import static google.registry.testing.DatabaseHelper.persistSimpleResource; -import static google.registry.util.DateTimeUtils.END_OF_TIME; -import static google.registry.util.DateTimeUtils.START_OF_TIME; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.googlecode.objectify.Key; import google.registry.beam.TestPipelineExtension; -import google.registry.flows.domain.DomainFlowUtils; -import google.registry.model.billing.BillingEvent; -import google.registry.model.billing.BillingEvent.Flag; -import google.registry.model.billing.BillingEvent.Reason; import google.registry.model.common.Cursor; import google.registry.model.contact.ContactResource; -import google.registry.model.domain.DesignatedContact; -import google.registry.model.domain.DomainAuthInfo; import google.registry.model.domain.DomainBase; -import google.registry.model.domain.DomainHistory; -import google.registry.model.domain.GracePeriod; -import google.registry.model.domain.launch.LaunchNotice; -import google.registry.model.domain.rgp.GracePeriodStatus; -import google.registry.model.domain.secdns.DelegationSignerData; -import google.registry.model.domain.token.AllocationToken; -import google.registry.model.eppcommon.AuthInfo.PasswordAuth; -import google.registry.model.eppcommon.StatusValue; -import google.registry.model.eppcommon.Trid; import google.registry.model.host.HostResource; import google.registry.model.ofy.Ofy; -import google.registry.model.poll.PollMessage; import google.registry.model.registrar.Registrar; -import google.registry.model.registrar.RegistrarContact; -import google.registry.model.reporting.HistoryEntry; -import google.registry.model.tld.Registry; -import google.registry.model.transfer.DomainTransferData; -import google.registry.model.transfer.TransferStatus; -import google.registry.persistence.VKey; import google.registry.persistence.transaction.JpaTestExtensions; import google.registry.persistence.transaction.JpaTestExtensions.JpaIntegrationTestExtension; -import google.registry.testing.AppEngineExtension; import google.registry.testing.DatastoreEntityExtension; import google.registry.testing.FakeClock; import google.registry.testing.InjectExtension; -import java.io.File; -import java.nio.file.Files; import java.nio.file.Path; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.joda.money.Money; import org.joda.time.DateTime; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Order; @@ -81,27 +44,7 @@ import org.junit.jupiter.api.io.TempDir; class InitSqlPipelineTest { private static final DateTime START_TIME = DateTime.parse("2000-01-01T00:00:00.0Z"); - /** - * All kinds of entities to be set up in the Datastore. Must contain all kinds known to {@link - * InitSqlPipeline}. - */ - private static final ImmutableList> ALL_KINDS = - ImmutableList.of( - Registry.class, - Cursor.class, - Registrar.class, - ContactResource.class, - RegistrarContact.class, - DomainBase.class, - HostResource.class, - HistoryEntry.class, - AllocationToken.class, - BillingEvent.Recurring.class, - BillingEvent.OneTime.class, - BillingEvent.Cancellation.class, - PollMessage.class); - - private transient FakeClock fakeClock = new FakeClock(START_TIME); + private FakeClock fakeClock = new FakeClock(START_TIME); @RegisterExtension @Order(Order.DEFAULT - 1) @@ -122,201 +65,12 @@ class InitSqlPipelineTest { final transient JpaIntegrationTestExtension database = new JpaTestExtensions.Builder().withClock(fakeClock).buildIntegrationTestExtension(); - private File exportRootDir; - private File exportDir; - private File commitLogDir; - - private transient Registrar registrar1; - private transient Registrar registrar2; - private transient DomainBase domain; - private transient ContactResource contact1; - private transient ContactResource contact2; - private transient HostResource hostResource; - - private transient DomainHistory historyEntry; - - private transient Cursor globalCursor; - private transient Cursor tldCursor; + DatastoreSetupHelper setupHelper; @BeforeEach void beforeEach() throws Exception { - try (BackupTestStore store = new BackupTestStore(fakeClock)) { - injectExtension.setStaticField(Ofy.class, "clock", fakeClock); - exportRootDir = Files.createDirectory(tmpDir.resolve("exports")).toFile(); - - persistResource(newRegistry("com", "COM")); - registrar1 = persistResource(AppEngineExtension.makeRegistrar1()); - registrar2 = persistResource(AppEngineExtension.makeRegistrar2()); - Key domainKey = Key.create(null, DomainBase.class, "4-COM"); - hostResource = - persistResource( - new HostResource.Builder() - .setHostName("ns1.example.com") - .setSuperordinateDomain(VKey.from(domainKey)) - .setRepoId("1-COM") - .setCreationRegistrarId(registrar1.getRegistrarId()) - .setPersistedCurrentSponsorRegistrarId(registrar2.getRegistrarId()) - .build()); - contact1 = - persistResource( - new ContactResource.Builder() - .setContactId("contact_id1") - .setRepoId("2-COM") - .setCreationRegistrarId(registrar1.getRegistrarId()) - .setPersistedCurrentSponsorRegistrarId(registrar2.getRegistrarId()) - .build()); - contact2 = - persistResource( - new ContactResource.Builder() - .setContactId("contact_id2") - .setRepoId("3-COM") - .setCreationRegistrarId(registrar1.getRegistrarId()) - .setPersistedCurrentSponsorRegistrarId(registrar1.getRegistrarId()) - .build()); - persistSimpleResource( - new RegistrarContact.Builder() - .setParent(registrar1) - .setName("John Abused") - .setEmailAddress("johnabuse@example.com") - .setVisibleInWhoisAsAdmin(true) - .setVisibleInWhoisAsTech(false) - .setPhoneNumber("+1.2125551213") - .setFaxNumber("+1.2125551213") - .setTypes(ImmutableSet.of(RegistrarContact.Type.ABUSE, RegistrarContact.Type.ADMIN)) - .build()); - historyEntry = - persistResource( - new DomainHistory.Builder() - .setDomainRepoId(domainKey.getName()) - .setModificationTime(fakeClock.nowUtc()) - .setRegistrarId(registrar1.getRegistrarId()) - .setType(HistoryEntry.Type.DOMAIN_CREATE) - .build()); - persistResource( - new AllocationToken.Builder().setToken("abc123").setTokenType(SINGLE_USE).build()); - Key historyEntryKey = Key.create(historyEntry); - BillingEvent.OneTime onetimeBillEvent = - new BillingEvent.OneTime.Builder() - .setId(1) - .setReason(Reason.RENEW) - .setTargetId("example.com") - .setRegistrarId("TheRegistrar") - .setCost(Money.parse("USD 44.00")) - .setPeriodYears(4) - .setEventTime(fakeClock.nowUtc()) - .setBillingTime(fakeClock.nowUtc()) - .setParent(historyEntryKey) - .build(); - persistResource(onetimeBillEvent); - Key oneTimeBillKey = Key.create(onetimeBillEvent); - BillingEvent.Recurring recurringBillEvent = - new BillingEvent.Recurring.Builder() - .setId(2) - .setReason(Reason.RENEW) - .setFlags(ImmutableSet.of(Flag.AUTO_RENEW)) - .setTargetId("example.com") - .setRegistrarId("TheRegistrar") - .setEventTime(fakeClock.nowUtc()) - .setRecurrenceEndTime(END_OF_TIME) - .setParent(historyEntryKey) - .build(); - persistResource(recurringBillEvent); - VKey recurringBillKey = recurringBillEvent.createVKey(); - PollMessage.Autorenew autorenewPollMessage = - new PollMessage.Autorenew.Builder() - .setId(3L) - .setTargetId("example.com") - .setRegistrarId("TheRegistrar") - .setEventTime(fakeClock.nowUtc()) - .setMsg("Domain was auto-renewed.") - .setParent(historyEntry) - .build(); - persistResource(autorenewPollMessage); - VKey autorenewPollKey = autorenewPollMessage.createVKey(); - PollMessage.OneTime oneTimePollMessage = - new PollMessage.OneTime.Builder() - .setId(1L) - .setParent(historyEntry) - .setEventTime(fakeClock.nowUtc()) - .setRegistrarId("TheRegistrar") - .setMsg(DomainFlowUtils.COLLISION_MESSAGE) - .build(); - persistResource(oneTimePollMessage); - VKey onetimePollKey = oneTimePollMessage.createVKey(); - domain = - persistResource( - new DomainBase.Builder() - .setDomainName("example.com") - .setRepoId("4-COM") - .setCreationRegistrarId(registrar1.getRegistrarId()) - .setLastEppUpdateTime(fakeClock.nowUtc()) - .setLastEppUpdateRegistrarId(registrar2.getRegistrarId()) - .setLastTransferTime(fakeClock.nowUtc()) - .setStatusValues( - ImmutableSet.of( - StatusValue.CLIENT_DELETE_PROHIBITED, - StatusValue.SERVER_DELETE_PROHIBITED, - StatusValue.SERVER_TRANSFER_PROHIBITED, - StatusValue.SERVER_UPDATE_PROHIBITED, - StatusValue.SERVER_RENEW_PROHIBITED, - StatusValue.SERVER_HOLD)) - .setRegistrant(contact1.createVKey()) - .setContacts( - ImmutableSet.of( - DesignatedContact.create( - DesignatedContact.Type.ADMIN, contact2.createVKey()))) - .setNameservers(ImmutableSet.of(hostResource.createVKey())) - .setSubordinateHosts(ImmutableSet.of("ns1.example.com")) - .setPersistedCurrentSponsorRegistrarId(registrar2.getRegistrarId()) - .setRegistrationExpirationTime(fakeClock.nowUtc().plusYears(1)) - .setAuthInfo(DomainAuthInfo.create(PasswordAuth.create("password"))) - .setDsData( - ImmutableSet.of(DelegationSignerData.create(1, 2, 3, new byte[] {0, 1, 2}))) - .setLaunchNotice( - LaunchNotice.create("tcnid", "validatorId", START_OF_TIME, START_OF_TIME)) - .setTransferData( - new DomainTransferData.Builder() - .setGainingRegistrarId(registrar1.getRegistrarId()) - .setLosingRegistrarId(registrar2.getRegistrarId()) - .setPendingTransferExpirationTime(fakeClock.nowUtc()) - .setServerApproveEntities( - ImmutableSet.of( - VKey.from(oneTimeBillKey), recurringBillKey, autorenewPollKey)) - .setServerApproveBillingEvent(VKey.from(oneTimeBillKey)) - .setServerApproveAutorenewEvent(recurringBillKey) - .setServerApproveAutorenewPollMessage(autorenewPollKey) - .setTransferRequestTime(fakeClock.nowUtc().plusDays(1)) - .setTransferStatus(TransferStatus.SERVER_APPROVED) - .setTransferRequestTrid(Trid.create("client-trid", "server-trid")) - .build()) - .setDeletePollMessage(onetimePollKey) - .setAutorenewBillingEvent(recurringBillKey) - .setAutorenewPollMessage(autorenewPollKey) - .setSmdId("smdid") - .addGracePeriod( - GracePeriod.create( - GracePeriodStatus.ADD, - "4-COM", - fakeClock.nowUtc().plusDays(1), - "TheRegistrar", - null)) - .build()); - persistResource( - new BillingEvent.Cancellation.Builder() - .setReason(Reason.RENEW) - .setTargetId(domain.getDomainName()) - .setRegistrarId(domain.getCurrentSponsorRegistrarId()) - .setEventTime(fakeClock.nowUtc()) - .setBillingTime(fakeClock.nowUtc()) - .setRecurringEventKey(recurringBillEvent.createVKey()) - .setParent(historyEntryKey) - .build()); - globalCursor = persistResource(Cursor.createGlobal(RECURRING_BILLING, fakeClock.nowUtc())); - tldCursor = persistResource(Cursor.create(BRDA, fakeClock.nowUtc(), Registry.get("com"))); - exportDir = store.export(exportRootDir.getAbsolutePath(), ALL_KINDS, ImmutableSet.of()); - commitLogDir = Files.createDirectory(tmpDir.resolve("commits")).toFile(); - fakeClock.advanceOneMilli(); - } + injectExtension.setStaticField(Ofy.class, "clock", fakeClock); + setupHelper = new DatastoreSetupHelper(tmpDir, fakeClock).initializeData(); } @Test @@ -325,24 +79,27 @@ class InitSqlPipelineTest { PipelineOptionsFactory.fromArgs( "--commitLogStartTimestamp=" + START_TIME, "--commitLogEndTimestamp=" + fakeClock.nowUtc().plusMillis(1), - "--datastoreExportDir=" + exportDir.getAbsolutePath(), - "--commitLogDir=" + commitLogDir.getAbsolutePath()) + "--datastoreExportDir=" + setupHelper.exportDir.getAbsolutePath(), + "--commitLogDir=" + setupHelper.commitLogDir.getAbsolutePath()) .withValidation() .as(InitSqlPipelineOptions.class); InitSqlPipeline initSqlPipeline = new InitSqlPipeline(options); initSqlPipeline.run(testPipeline).waitUntilFinish(); assertHostResourceEquals( - jpaTm().transact(() -> jpaTm().loadByKey(hostResource.createVKey())), hostResource); + jpaTm().transact(() -> jpaTm().loadByKey(setupHelper.hostResource.createVKey())), + setupHelper.hostResource); assertThat(jpaTm().transact(() -> jpaTm().loadAllOf(Registrar.class))) .comparingElementsUsing(immutableObjectCorrespondence("lastUpdateTime")) - .containsExactly(registrar1, registrar2); + .containsExactly(setupHelper.registrar1, setupHelper.registrar2); assertThat(jpaTm().transact(() -> jpaTm().loadAllOf(ContactResource.class))) .comparingElementsUsing(immutableObjectCorrespondence("revisions", "updateTimestamp")) - .containsExactly(contact1, contact2); - assertDomainEquals(jpaTm().transact(() -> jpaTm().loadByKey(domain.createVKey())), domain); + .containsExactly(setupHelper.contact1, setupHelper.contact2); + assertDomainEquals( + jpaTm().transact(() -> jpaTm().loadByKey(setupHelper.domain.createVKey())), + setupHelper.domain); assertThat(jpaTm().transact(() -> jpaTm().loadAllOf(Cursor.class))) .comparingElementsUsing(immutableObjectCorrespondence()) - .containsExactly(globalCursor, tldCursor); + .containsExactly(setupHelper.globalCursor, setupHelper.tldCursor); } private static void assertHostResourceEquals(HostResource actual, HostResource expected) { diff --git a/core/src/test/java/google/registry/model/bulkquery/TestSetupHelper.java b/core/src/test/java/google/registry/model/bulkquery/TestSetupHelper.java index 1b66d4c7e..687cddc37 100644 --- a/core/src/test/java/google/registry/model/bulkquery/TestSetupHelper.java +++ b/core/src/test/java/google/registry/model/bulkquery/TestSetupHelper.java @@ -50,8 +50,18 @@ import google.registry.testing.AppEngineExtension; import google.registry.testing.DatabaseHelper; import google.registry.testing.FakeClock; -/** Entity creation utilities for domain-related tests. */ -class TestSetupHelper { +/** + * Utilities for managing domain-related SQL test scenarios. + * + *

The {@link #initializeAllEntities} method initializes the database, and the {@link + * #applyChangeToDomainAndHistory} makes one change to the domain, generating an additional history + * event. The most up-to-date values of the relevant entities are saved in public instance + * variables, {@link #domain} etc. + * + *

This class makes use of {@link DatabaseHelper}, which requires Datastore. Tests that use this + * class should use {@link AppEngineExtension}. + */ +public final class TestSetupHelper { public static final String TLD = "tld"; public static final String DOMAIN_REPO_ID = "4-TLD"; @@ -60,21 +70,21 @@ class TestSetupHelper { private final FakeClock fakeClock; - Registry registry; - Registrar registrar; - ContactResource contact; - DomainBase domain; - DomainHistory domainHistory; - HostResource host; + public Registry registry; + public Registrar registrar; + public ContactResource contact; + public DomainBase domain; + public DomainHistory domainHistory; + public HostResource host; private JpaTransactionManager originalJpaTm; private JpaTransactionManager bulkQueryJpaTm; - TestSetupHelper(FakeClock fakeClock) { + public TestSetupHelper(FakeClock fakeClock) { this.fakeClock = fakeClock; } - void initializeAllEntities() { + public void initializeAllEntities() { registry = putInDb(DatabaseHelper.newRegistry(TLD, Ascii.toUpperCase(TLD))); registrar = saveRegistrar(REGISTRAR_ID); contact = putInDb(createContact(DOMAIN_REPO_ID, REGISTRAR_ID)); @@ -83,12 +93,12 @@ class TestSetupHelper { host = putInDb(createHost()); } - void applyChangeToDomainAndHistory() { + public void applyChangeToDomainAndHistory() { domain = putInDb(createFullDomain(contact, host, fakeClock)); domainHistory = putInDb(createFullHistory(domain, fakeClock)); } - void setupBulkQueryJpaTm(AppEngineExtension appEngineExtension) { + public void setupBulkQueryJpaTm(AppEngineExtension appEngineExtension) { bulkQueryJpaTm = BulkQueryJpaFactory.createBulkQueryJpaTransactionManager( appEngineExtension @@ -101,7 +111,7 @@ class TestSetupHelper { TransactionManagerFactory.setJpaTm(() -> bulkQueryJpaTm); } - void tearDownBulkQueryJpaTm() { + public void tearDownBulkQueryJpaTm() { if (bulkQueryJpaTm != null) { bulkQueryJpaTm.teardown(); TransactionManagerFactory.setJpaTm(() -> originalJpaTm);