Validate SQL with Datastore being Primary (#1436)

* Validate SQL with Datastore being primary

Validates the data asynchronously replicated from Datastore to SQL.
This is a short term tool optimized for the current production database.

Tested in production.
This commit is contained in:
Weimin Yu 2021-11-30 12:57:49 -05:00 committed by GitHub
parent 52ef8592fc
commit 72be02916e
25 changed files with 1909 additions and 301 deletions

View file

@ -703,6 +703,12 @@ createToolTask(
'google.registry.tools.DevTool', 'google.registry.tools.DevTool',
sourceSets.nonprod) sourceSets.nonprod)
createToolTask(
'initSqlPipeline', 'google.registry.beam.initsql.InitSqlPipeline')
createToolTask(
'validateSqlPipeline', 'google.registry.beam.comparedb.ValidateSqlPipeline')
createToolTask( createToolTask(
'jpaDemoPipeline', 'google.registry.beam.common.JpaDemoPipeline') 'jpaDemoPipeline', 'google.registry.beam.common.JpaDemoPipeline')
@ -711,30 +717,6 @@ createToolTask(
'createSyntheticHistoryEntries', 'createSyntheticHistoryEntries',
'google.registry.tools.javascrap.CreateSyntheticHistoryEntriesPipeline') 'google.registry.tools.javascrap.CreateSyntheticHistoryEntriesPipeline')
project.tasks.create('initSqlPipeline', JavaExec) {
main = 'google.registry.beam.initsql.InitSqlPipeline'
doFirst {
getToolArgsList().ifPresent {
args it
}
def isDirectRunner =
args.contains('DirectRunner') || args.contains('--runner=DirectRunner')
// The dependency containing DirectRunner is intentionally excluded from the
// production binary, so that it won't be chosen by mistake: we definitely do
// not want to use it for the real jobs, yet DirectRunner is the default if
// the user forgets to override it.
// DirectRunner is required for tests and is already on testRuntimeClasspath.
// For simplicity, we add testRuntimeClasspath to this task's classpath instead
// of defining a new configuration just for the DirectRunner dependency.
classpath =
isDirectRunner
? sourceSets.main.runtimeClasspath.plus(sourceSets.test.runtimeClasspath)
: sourceSets.main.runtimeClasspath
}
}
// Caller must provide projectId, GCP region, runner, and the kinds to delete // Caller must provide projectId, GCP region, runner, and the kinds to delete
// (comma-separated kind names or '*' for all). E.g.: // (comma-separated kind names or '*' for all). E.g.:
// nom_build :core:bulkDeleteDatastore --args="--project=domain-registry-crash \ // nom_build :core:bulkDeleteDatastore --args="--project=domain-registry-crash \

View file

@ -175,7 +175,7 @@ public final class RegistryJpaIO {
* JpaTransactionManager#setDatabaseSnapshot}. * JpaTransactionManager#setDatabaseSnapshot}.
*/ */
// TODO(b/193662898): vendor-independent support for richer transaction semantics. // TODO(b/193662898): vendor-independent support for richer transaction semantics.
public Read<R, T> withSnapshot(String snapshotId) { public Read<R, T> withSnapshot(@Nullable String snapshotId) {
return toBuilder().snapshotId(snapshotId).build(); return toBuilder().snapshotId(snapshotId).build();
} }

View file

@ -63,7 +63,8 @@ public class RegistryPipelineWorkerInitializer implements JvmInitializer {
TransactionManagerFactory.setJpaTmOnBeamWorker(transactionManagerLazy::get); TransactionManagerFactory.setJpaTmOnBeamWorker(transactionManagerLazy::get);
// Masquerade all threads as App Engine threads so we can create Ofy keys in the pipeline. Also // Masquerade all threads as App Engine threads so we can create Ofy keys in the pipeline. Also
// loads all ofy entities. // loads all ofy entities.
new AppEngineEnvironment("Beam").setEnvironmentForAllThreads(); new AppEngineEnvironment("s~" + registryPipelineComponent.getProjectId())
.setEnvironmentForAllThreads();
// Set the system property so that we can call IdService.allocateId() without access to // Set the system property so that we can call IdService.allocateId() without access to
// datastore. // datastore.
SystemPropertySetter.PRODUCTION_IMPL.setProperty(PROPERTY, "true"); SystemPropertySetter.PRODUCTION_IMPL.setProperty(PROPERTY, "true");

View file

@ -0,0 +1,169 @@
// Copyright 2021 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.beam.comparedb;
import static google.registry.beam.comparedb.ValidateSqlUtils.createSqlEntityTupleTag;
import static google.registry.beam.initsql.Transforms.createTagForKind;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableSet;
import com.googlecode.objectify.Key;
import google.registry.backup.VersionedEntity;
import google.registry.beam.initsql.Transforms;
import google.registry.model.billing.BillingEvent;
import google.registry.model.common.Cursor;
import google.registry.model.contact.ContactHistory;
import google.registry.model.contact.ContactResource;
import google.registry.model.domain.DomainBase;
import google.registry.model.domain.DomainHistory;
import google.registry.model.domain.token.AllocationToken;
import google.registry.model.host.HostHistory;
import google.registry.model.host.HostResource;
import google.registry.model.poll.PollMessage;
import google.registry.model.registrar.Registrar;
import google.registry.model.registrar.RegistrarContact;
import google.registry.model.replay.SqlEntity;
import google.registry.model.reporting.HistoryEntry;
import google.registry.model.tld.Registry;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.joda.time.DateTime;
/** Utilities for loading Datastore snapshots. */
public final class DatastoreSnapshots {
private DatastoreSnapshots() {}
/**
* Datastore kinds eligible for validation. This set must be consistent with {@link
* SqlSnapshots#ALL_SQL_ENTITIES}.
*/
@VisibleForTesting
static final ImmutableSet<Class<?>> ALL_DATASTORE_KINDS =
ImmutableSet.of(
Registry.class,
Cursor.class,
Registrar.class,
ContactResource.class,
RegistrarContact.class,
HostResource.class,
HistoryEntry.class,
AllocationToken.class,
BillingEvent.Recurring.class,
BillingEvent.OneTime.class,
BillingEvent.Cancellation.class,
PollMessage.class,
DomainBase.class);
/**
* Returns the Datastore snapshot right before {@code commitLogToTime} for the user specified
* {@code kinds}. The resulting snapshot has all changes that happened before {@code
* commitLogToTime}, and none at or after {@code commitLogToTime}.
*
* <p>If {@code HistoryEntry} is included in {@code kinds}, the result will contain {@code
* PCollections} for the child entities, {@code DomainHistory}, {@code ContactHistory}, and {@code
* HostHistory}.
*/
static PCollectionTuple loadDatastoreSnapshotByKind(
Pipeline pipeline,
String exportDir,
String commitLogDir,
DateTime commitLogFromTime,
DateTime commitLogToTime,
Set<Class<?>> kinds) {
PCollectionTuple snapshot =
pipeline.apply(
"Load Datastore snapshot.",
Transforms.loadDatastoreSnapshot(
exportDir,
commitLogDir,
commitLogFromTime,
commitLogToTime,
kinds.stream().map(Key::getKind).collect(ImmutableSet.toImmutableSet())));
PCollectionTuple perTypeSnapshots = PCollectionTuple.empty(pipeline);
for (Class<?> kind : kinds) {
PCollection<VersionedEntity> perKindSnapshot =
snapshot.get(createTagForKind(Key.getKind(kind)));
if (SqlEntity.class.isAssignableFrom(kind)) {
perTypeSnapshots =
perTypeSnapshots.and(
createSqlEntityTupleTag((Class<? extends SqlEntity>) kind),
datastoreEntityToPojo(perKindSnapshot, kind.getSimpleName()));
continue;
}
Verify.verify(kind == HistoryEntry.class, "Unexpected Non-SqlEntity class: %s", kind);
PCollectionTuple historyEntriesByType = splitHistoryEntry(perKindSnapshot);
for (Map.Entry<TupleTag<?>, PCollection<?>> entry :
historyEntriesByType.getAll().entrySet()) {
perTypeSnapshots = perTypeSnapshots.and(entry.getKey().getId(), entry.getValue());
}
}
return perTypeSnapshots;
}
/**
* Splits a {@link PCollection} of {@link HistoryEntry HistoryEntries} into three collections of
* its child entities by type.
*/
static PCollectionTuple splitHistoryEntry(PCollection<VersionedEntity> historyEntries) {
return historyEntries.apply(
"Split HistoryEntry by Resource Type",
ParDo.of(
new DoFn<VersionedEntity, SqlEntity>() {
@ProcessElement
public void processElement(
@Element VersionedEntity historyEntry, MultiOutputReceiver out) {
Optional.ofNullable(Transforms.convertVersionedEntityToSqlEntity(historyEntry))
.ifPresent(
sqlEntity ->
out.get(createSqlEntityTupleTag(sqlEntity.getClass()))
.output(sqlEntity));
}
})
.withOutputTags(
createSqlEntityTupleTag(DomainHistory.class),
TupleTagList.of(createSqlEntityTupleTag(ContactHistory.class))
.and(createSqlEntityTupleTag(HostHistory.class))));
}
/**
* Transforms a {@link PCollection} of {@link VersionedEntity VersionedEntities} to Ofy Java
* objects.
*/
static PCollection<SqlEntity> datastoreEntityToPojo(
PCollection<VersionedEntity> entities, String desc) {
return entities.apply(
"Datastore Entity to Pojo " + desc,
ParDo.of(
new DoFn<VersionedEntity, SqlEntity>() {
@ProcessElement
public void processElement(
@Element VersionedEntity entity, OutputReceiver<SqlEntity> out) {
Optional.ofNullable(Transforms.convertVersionedEntityToSqlEntity(entity))
.ifPresent(out::output);
}
}));
}
}

View file

@ -0,0 +1,145 @@
// Copyright 2021 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.beam.comparedb;
import com.google.auto.value.AutoValue;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import dagger.Component;
import google.registry.config.CloudTasksUtilsModule;
import google.registry.config.CredentialModule;
import google.registry.config.RegistryConfig;
import google.registry.config.RegistryConfig.Config;
import google.registry.config.RegistryConfig.ConfigModule;
import google.registry.gcs.GcsUtils;
import google.registry.util.Clock;
import google.registry.util.UtilsModule;
import java.io.IOException;
import java.util.Comparator;
import java.util.NoSuchElementException;
import java.util.Optional;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.joda.time.DateTime;
import org.joda.time.Instant;
import org.joda.time.Interval;
/** Finds the necessary information for loading the most recent Datastore snapshot. */
public class LatestDatastoreSnapshotFinder {
private final String projectId;
private final GcsUtils gcsUtils;
private final Clock clock;
@Inject
LatestDatastoreSnapshotFinder(
@Config("projectId") String projectId, GcsUtils gcsUtils, Clock clock) {
this.projectId = projectId;
this.gcsUtils = gcsUtils;
this.clock = clock;
}
/**
* Finds information of the most recent Datastore snapshot, including the GCS folder of the
* exported data files and the start and stop times of the export. The folder of the CommitLogs is
* also included in the return.
*/
public DatastoreSnapshotInfo getSnapshotInfo() {
String bucketName = RegistryConfig.getDatastoreBackupsBucket().substring("gs://".length());
/**
* Find the bucket-relative path to the overall metadata file of the last Datastore export.
* Since Datastore export is saved daily, we may need to look back to yesterday. If found, the
* return value is like
* "2021-11-19T06:00:00_76493/2021-11-19T06:00:00_76493.overall_export_metadata".
*/
Optional<String> metaFilePathOptional = findMostRecentExportMetadataFile(bucketName, 2);
if (!metaFilePathOptional.isPresent()) {
throw new NoSuchElementException("No exports found over the past 2 days.");
}
String metaFilePath = metaFilePathOptional.get();
String metaFileFolder = metaFilePath.substring(0, metaFilePath.indexOf('/'));
Instant exportStartTime = Instant.parse(metaFileFolder.replace('_', '.') + 'Z');
BlobInfo blobInfo = gcsUtils.getBlobInfo(BlobId.of(bucketName, metaFilePath));
Instant exportEndTime = new Instant(blobInfo.getCreateTime());
return DatastoreSnapshotInfo.create(
String.format("gs://%s/%s", bucketName, metaFileFolder),
getCommitLogDir(),
new Interval(exportStartTime, exportEndTime));
}
public String getCommitLogDir() {
return "gs://" + projectId + "-commits";
}
/**
* Finds the bucket-relative path of the overall export metadata file, in the given bucket,
* searching back up to {@code lookBackDays} days, including today.
*
* <p>The overall export metadata file is the last file created during a Datastore export. All
* data has been exported by the creation time of this file. The name of this file, like that of
* all files in the same export, begins with the timestamp when the export starts.
*
* <p>An example return value: {@code
* 2021-11-19T06:00:00_76493/2021-11-19T06:00:00_76493.overall_export_metadata}.
*/
private Optional<String> findMostRecentExportMetadataFile(String bucketName, int lookBackDays) {
DateTime today = clock.nowUtc();
for (int day = 0; day < lookBackDays; day++) {
String dateString = today.minusDays(day).toString("yyyy-MM-dd");
try {
Optional<String> metaFilePath =
gcsUtils.listFolderObjects(bucketName, dateString).stream()
.filter(s -> s.endsWith("overall_export_metadata"))
.map(s -> dateString + s)
.sorted(Comparator.<String>naturalOrder().reversed())
.findFirst();
if (metaFilePath.isPresent()) {
return metaFilePath;
}
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}
return Optional.empty();
}
/** Holds information about a Datastore snapshot. */
@AutoValue
abstract static class DatastoreSnapshotInfo {
abstract String exportDir();
abstract String commitLogDir();
abstract Interval exportInterval();
static DatastoreSnapshotInfo create(
String exportDir, String commitLogDir, Interval exportOperationInterval) {
return new AutoValue_LatestDatastoreSnapshotFinder_DatastoreSnapshotInfo(
exportDir, commitLogDir, exportOperationInterval);
}
}
@Singleton
@Component(
modules = {
CredentialModule.class,
ConfigModule.class,
CloudTasksUtilsModule.class,
UtilsModule.class
})
interface LatestDatastoreSnapshotFinderFinderComponent {
LatestDatastoreSnapshotFinder datastoreSnapshotInfoFinder();
}
}

View file

@ -0,0 +1,384 @@
// Copyright 2021 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.beam.comparedb;
import static google.registry.beam.comparedb.ValidateSqlUtils.createSqlEntityTupleTag;
import static google.registry.beam.comparedb.ValidateSqlUtils.getMedianIdForHistoryTable;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.collect.Streams;
import google.registry.beam.common.RegistryJpaIO;
import google.registry.model.billing.BillingEvent;
import google.registry.model.bulkquery.BulkQueryEntities;
import google.registry.model.bulkquery.DomainBaseLite;
import google.registry.model.bulkquery.DomainHistoryHost;
import google.registry.model.bulkquery.DomainHistoryLite;
import google.registry.model.bulkquery.DomainHost;
import google.registry.model.common.Cursor;
import google.registry.model.contact.ContactHistory;
import google.registry.model.contact.ContactResource;
import google.registry.model.domain.DomainBase;
import google.registry.model.domain.DomainHistory;
import google.registry.model.domain.DomainHistory.DomainHistoryId;
import google.registry.model.domain.GracePeriod;
import google.registry.model.domain.GracePeriod.GracePeriodHistory;
import google.registry.model.domain.secdns.DelegationSignerData;
import google.registry.model.domain.secdns.DomainDsDataHistory;
import google.registry.model.domain.token.AllocationToken;
import google.registry.model.host.HostHistory;
import google.registry.model.host.HostResource;
import google.registry.model.poll.PollMessage;
import google.registry.model.registrar.Registrar;
import google.registry.model.registrar.RegistrarContact;
import google.registry.model.replay.SqlEntity;
import google.registry.model.reporting.DomainTransactionRecord;
import google.registry.model.tld.Registry;
import google.registry.persistence.transaction.CriteriaQueryBuilder;
import java.io.Serializable;
import java.util.Optional;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
/**
* Utilities for loading SQL snapshots.
*
* <p>For {@link DomainBase} and {@link DomainHistory}, this class assumes the presence of the
* {@link google.registry.persistence.PersistenceModule.JpaTransactionManagerType#BULK_QUERY
* bulk-query-capable JpaTransactionManager}, and takes advantage of it for higher throughput.
*
* <p>For now this class is meant for use during the database migration period only. Therefore, it
* contains optimizations specifically for the production database at the current size, e.g.,
* parallel queries for select tables.
*/
public final class SqlSnapshots {
private SqlSnapshots() {}
/**
* SQL entity types that are eligible for validation. This set must be consistent with {@link
* DatastoreSnapshots#ALL_DATASTORE_KINDS}.
*/
static final ImmutableSet<Class<? extends SqlEntity>> ALL_SQL_ENTITIES =
ImmutableSet.of(
Registry.class,
Cursor.class,
Registrar.class,
ContactResource.class,
RegistrarContact.class,
HostResource.class,
AllocationToken.class,
BillingEvent.Recurring.class,
BillingEvent.OneTime.class,
BillingEvent.Cancellation.class,
PollMessage.class,
DomainBase.class,
ContactHistory.class,
HostHistory.class,
DomainHistory.class);
/**
* Loads a SQL snapshot for the given {@code sqlEntityTypes}.
*
* <p>If {@code snapshotId} is present, all queries use the specified database snapshot,
* guaranteeing a consistent result.
*/
public static PCollectionTuple loadCloudSqlSnapshotByType(
Pipeline pipeline,
ImmutableSet<Class<? extends SqlEntity>> sqlEntityTypes,
Optional<String> snapshotId) {
PCollectionTuple perTypeSnapshots = PCollectionTuple.empty(pipeline);
for (Class<? extends SqlEntity> clazz : sqlEntityTypes) {
if (clazz == DomainBase.class) {
perTypeSnapshots =
perTypeSnapshots.and(
createSqlEntityTupleTag(DomainBase.class),
loadAndAssembleDomainBase(pipeline, snapshotId));
continue;
}
if (clazz == DomainHistory.class) {
perTypeSnapshots =
perTypeSnapshots.and(
createSqlEntityTupleTag(DomainHistory.class),
loadAndAssembleDomainHistory(pipeline, snapshotId));
continue;
}
if (clazz == ContactHistory.class) {
perTypeSnapshots =
perTypeSnapshots.and(
createSqlEntityTupleTag(ContactHistory.class),
loadContactHistory(pipeline, snapshotId));
continue;
}
perTypeSnapshots =
perTypeSnapshots.and(
createSqlEntityTupleTag(clazz),
pipeline.apply(
"SQL Load " + clazz.getSimpleName(),
RegistryJpaIO.read(
() -> CriteriaQueryBuilder.create(clazz).build(), SqlEntity.class::cast)
.withSnapshot(snapshotId.orElse(null))));
}
return perTypeSnapshots;
}
/**
* Bulk-loads parts of {@link DomainBase} and assembles them in the pipeline.
*
* @see BulkQueryEntities
*/
public static PCollection<SqlEntity> loadAndAssembleDomainBase(
Pipeline pipeline, Optional<String> snapshotId) {
PCollection<KV<String, Serializable>> baseObjects =
readAllAndAssignKey(pipeline, DomainBaseLite.class, DomainBaseLite::getRepoId, snapshotId);
PCollection<KV<String, Serializable>> gracePeriods =
readAllAndAssignKey(pipeline, GracePeriod.class, GracePeriod::getDomainRepoId, snapshotId);
PCollection<KV<String, Serializable>> delegationSigners =
readAllAndAssignKey(
pipeline,
DelegationSignerData.class,
DelegationSignerData::getDomainRepoId,
snapshotId);
PCollection<KV<String, Serializable>> domainHosts =
readAllAndAssignKey(pipeline, DomainHost.class, DomainHost::getDomainRepoId, snapshotId);
return PCollectionList.of(
ImmutableList.of(baseObjects, gracePeriods, delegationSigners, domainHosts))
.apply("SQL Merge DomainBase parts", Flatten.pCollections())
.apply("Group by Domain Parts by RepoId", GroupByKey.create())
.apply(
"Assemble DomainBase",
ParDo.of(
new DoFn<KV<String, Iterable<Serializable>>, SqlEntity>() {
@ProcessElement
public void processElement(
@Element KV<String, Iterable<Serializable>> kv,
OutputReceiver<SqlEntity> outputReceiver) {
TypedClassifier partsByType = new TypedClassifier(kv.getValue());
ImmutableSet<DomainBaseLite> baseObjects =
partsByType.getAllOf(DomainBaseLite.class);
Verify.verify(
baseObjects.size() == 1,
"Expecting one DomainBaseLite object per repoId: " + kv.getKey());
outputReceiver.output(
BulkQueryEntities.assembleDomainBase(
baseObjects.iterator().next(),
partsByType.getAllOf(GracePeriod.class),
partsByType.getAllOf(DelegationSignerData.class),
partsByType.getAllOf(DomainHost.class).stream()
.map(DomainHost::getHostVKey)
.collect(ImmutableSet.toImmutableSet())));
}
}));
}
/**
* Loads all {@link ContactHistory} entities from the database.
*
* <p>This method uses two queries to load data in parallel. This is a performance optimization
* specifically for the production database.
*/
static PCollection<SqlEntity> loadContactHistory(Pipeline pipeline, Optional<String> snapshotId) {
long medianId =
getMedianIdForHistoryTable("ContactHistory")
.orElseThrow(
() -> new IllegalStateException("Not a valid database: no ContactHistory."));
PCollection<SqlEntity> part1 =
pipeline.apply(
"SQL Load ContactHistory first half",
RegistryJpaIO.read(
String.format("select c from ContactHistory c where id <= %s", medianId),
false,
SqlEntity.class::cast)
.withSnapshot(snapshotId.orElse(null)));
PCollection<SqlEntity> part2 =
pipeline.apply(
"SQL Load ContactHistory second half",
RegistryJpaIO.read(
String.format("select c from ContactHistory c where id > %s", medianId),
false,
SqlEntity.class::cast)
.withSnapshot(snapshotId.orElse(null)));
return PCollectionList.of(part1)
.and(part2)
.apply("Combine ContactHistory parts", Flatten.pCollections());
}
/**
* Bulk-loads all parts of {@link DomainHistory} and assembles them in the pipeline.
*
* <p>This method uses two queries to load {@link DomainBaseLite} in parallel. This is a
* performance optimization specifically for the production database.
*
* @see BulkQueryEntities
*/
static PCollection<SqlEntity> loadAndAssembleDomainHistory(
Pipeline pipeline, Optional<String> snapshotId) {
long medianId =
getMedianIdForHistoryTable("DomainHistory")
.orElseThrow(
() -> new IllegalStateException("Not a valid database: no DomainHistory."));
PCollection<KV<String, Serializable>> baseObjectsPart1 =
queryAndAssignKey(
pipeline,
"first half",
String.format("select c from DomainHistory c where id <= %s", medianId),
DomainHistoryLite.class,
compose(DomainHistoryLite::getDomainHistoryId, DomainHistoryId::toString),
snapshotId);
PCollection<KV<String, Serializable>> baseObjectsPart2 =
queryAndAssignKey(
pipeline,
"second half",
String.format("select c from DomainHistory c where id > %s", medianId),
DomainHistoryLite.class,
compose(DomainHistoryLite::getDomainHistoryId, DomainHistoryId::toString),
snapshotId);
PCollection<KV<String, Serializable>> gracePeriods =
readAllAndAssignKey(
pipeline,
GracePeriodHistory.class,
compose(GracePeriodHistory::getDomainHistoryId, DomainHistoryId::toString),
snapshotId);
PCollection<KV<String, Serializable>> delegationSigners =
readAllAndAssignKey(
pipeline,
DomainDsDataHistory.class,
compose(DomainDsDataHistory::getDomainHistoryId, DomainHistoryId::toString),
snapshotId);
PCollection<KV<String, Serializable>> domainHosts =
readAllAndAssignKey(
pipeline,
DomainHistoryHost.class,
compose(DomainHistoryHost::getDomainHistoryId, DomainHistoryId::toString),
snapshotId);
PCollection<KV<String, Serializable>> transactionRecords =
readAllAndAssignKey(
pipeline,
DomainTransactionRecord.class,
compose(DomainTransactionRecord::getDomainHistoryId, DomainHistoryId::toString),
snapshotId);
return PCollectionList.of(
ImmutableList.of(
baseObjectsPart1,
baseObjectsPart2,
gracePeriods,
delegationSigners,
domainHosts,
transactionRecords))
.apply("Merge DomainHistory parts", Flatten.pCollections())
.apply("Group by DomainHistory Parts by DomainHistoryId string", GroupByKey.create())
.apply(
"Assemble DomainHistory",
ParDo.of(
new DoFn<KV<String, Iterable<Serializable>>, SqlEntity>() {
@ProcessElement
public void processElement(
@Element KV<String, Iterable<Serializable>> kv,
OutputReceiver<SqlEntity> outputReceiver) {
TypedClassifier partsByType = new TypedClassifier(kv.getValue());
ImmutableSet<DomainHistoryLite> baseObjects =
partsByType.getAllOf(DomainHistoryLite.class);
Verify.verify(
baseObjects.size() == 1,
"Expecting one DomainHistoryLite object per domainHistoryId: "
+ kv.getKey());
outputReceiver.output(
BulkQueryEntities.assembleDomainHistory(
baseObjects.iterator().next(),
partsByType.getAllOf(DomainDsDataHistory.class),
partsByType.getAllOf(DomainHistoryHost.class).stream()
.map(DomainHistoryHost::getHostVKey)
.collect(ImmutableSet.toImmutableSet()),
partsByType.getAllOf(GracePeriodHistory.class),
partsByType.getAllOf(DomainTransactionRecord.class)));
}
}));
}
static <R, T> PCollection<KV<String, Serializable>> readAllAndAssignKey(
Pipeline pipeline,
Class<R> type,
SerializableFunction<R, String> keyFunction,
Optional<String> snapshotId) {
return pipeline
.apply(
"SQL Load " + type.getSimpleName(),
RegistryJpaIO.read(() -> CriteriaQueryBuilder.create(type).build())
.withSnapshot(snapshotId.orElse(null)))
.apply(
"Assign Key to " + type.getSimpleName(),
MapElements.into(
TypeDescriptors.kvs(
TypeDescriptors.strings(), TypeDescriptor.of(Serializable.class)))
.via(obj -> KV.of(keyFunction.apply(obj), (Serializable) obj)));
}
static <R, T> PCollection<KV<String, Serializable>> queryAndAssignKey(
Pipeline pipeline,
String diffrentiator,
String jplQuery,
Class<R> type,
SerializableFunction<R, String> keyFunction,
Optional<String> snapshotId) {
return pipeline
.apply(
"SQL Load " + type.getSimpleName() + " " + diffrentiator,
RegistryJpaIO.read(jplQuery, false, type::cast).withSnapshot(snapshotId.orElse(null)))
.apply(
"Assign Key to " + type.getSimpleName() + " " + diffrentiator,
MapElements.into(
TypeDescriptors.kvs(
TypeDescriptors.strings(), TypeDescriptor.of(Serializable.class)))
.via(obj -> KV.of(keyFunction.apply(obj), (Serializable) obj)));
}
// TODO(b/205988530): don't use beam serializablefunction, make one that extends Java's Function.
private static <R, I, T> SerializableFunction<R, T> compose(
SerializableFunction<R, I> f1, SerializableFunction<I, T> f2) {
return r -> f2.apply(f1.apply(r));
}
/** Container that receives mixed-typed data and groups them by {@link Class}. */
static class TypedClassifier {
private final ImmutableSetMultimap<Class<?>, Object> classifiedEntities;
TypedClassifier(Iterable<Serializable> inputs) {
this.classifiedEntities =
Streams.stream(inputs)
.collect(ImmutableSetMultimap.toImmutableSetMultimap(Object::getClass, x -> x));
}
<T> ImmutableSet<T> getAllOf(Class<T> clazz) {
return classifiedEntities.get(clazz).stream()
.map(clazz::cast)
.collect(ImmutableSet.toImmutableSet());
}
}
}

View file

@ -0,0 +1,159 @@
// Copyright 2021 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.beam.comparedb;
import static com.google.common.base.Verify.verify;
import static org.apache.beam.sdk.values.TypeDescriptors.strings;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import google.registry.beam.common.DatabaseSnapshot;
import google.registry.beam.common.RegistryPipelineWorkerInitializer;
import google.registry.beam.comparedb.LatestDatastoreSnapshotFinder.DatastoreSnapshotInfo;
import google.registry.beam.comparedb.ValidateSqlUtils.CompareSqlEntity;
import google.registry.model.domain.DomainBase;
import google.registry.model.domain.DomainHistory;
import google.registry.model.replay.SqlEntity;
import google.registry.model.replay.SqlReplayCheckpoint;
import google.registry.persistence.PersistenceModule.JpaTransactionManagerType;
import google.registry.persistence.PersistenceModule.TransactionIsolationLevel;
import google.registry.persistence.transaction.TransactionManagerFactory;
import java.io.Serializable;
import java.util.Optional;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.joda.time.DateTime;
/**
* Validates the asynchronous data replication process from Datastore (primary storage) to Cloud SQL
* (secondary storage).
*/
public class ValidateSqlPipeline {
/** Specifies the extra CommitLogs to load before the start of a Database export. */
private static final int COMMIT_LOG_MARGIN_MINUTES = 10;
private final ValidateSqlPipelineOptions options;
private final DatastoreSnapshotInfo mostRecentExport;
public ValidateSqlPipeline(
ValidateSqlPipelineOptions options, DatastoreSnapshotInfo mostRecentExport) {
this.options = options;
this.mostRecentExport = mostRecentExport;
}
void run() {
run(Pipeline.create(options));
}
@VisibleForTesting
void run(Pipeline pipeline) {
// TODO(weiminyu): Acquire the commit log replay lock when the lock release bug is fixed.
DateTime latestCommitLogTime =
TransactionManagerFactory.jpaTm().transact(() -> SqlReplayCheckpoint.get());
Preconditions.checkState(
latestCommitLogTime.isAfter(mostRecentExport.exportInterval().getEnd()),
"Cannot recreate Datastore snapshot since target time is in the middle of an export.");
try (DatabaseSnapshot databaseSnapshot = DatabaseSnapshot.createSnapshot()) {
setupPipeline(pipeline, Optional.of(databaseSnapshot.getSnapshotId()), latestCommitLogTime);
State state = pipeline.run().waitUntilFinish();
if (!State.DONE.equals(state)) {
throw new IllegalStateException("Unexpected pipeline state: " + state);
}
}
}
void setupPipeline(
Pipeline pipeline, Optional<String> sqlSnapshotId, DateTime latestCommitLogTime) {
pipeline
.getCoderRegistry()
.registerCoderForClass(SqlEntity.class, SerializableCoder.of(Serializable.class));
PCollectionTuple datastoreSnapshot =
DatastoreSnapshots.loadDatastoreSnapshotByKind(
pipeline,
mostRecentExport.exportDir(),
mostRecentExport.commitLogDir(),
mostRecentExport.exportInterval().getStart().minusMinutes(COMMIT_LOG_MARGIN_MINUTES),
// Increase by 1ms since we want to include commitLogs latestCommitLogTime but
// this parameter is exclusive.
latestCommitLogTime.plusMillis(1),
DatastoreSnapshots.ALL_DATASTORE_KINDS);
PCollectionTuple cloudSqlSnapshot =
SqlSnapshots.loadCloudSqlSnapshotByType(
pipeline, SqlSnapshots.ALL_SQL_ENTITIES, sqlSnapshotId);
verify(
datastoreSnapshot.getAll().keySet().equals(cloudSqlSnapshot.getAll().keySet()),
"Expecting the same set of types in both snapshots.");
for (Class<? extends SqlEntity> clazz : SqlSnapshots.ALL_SQL_ENTITIES) {
TupleTag<SqlEntity> tag = ValidateSqlUtils.createSqlEntityTupleTag(clazz);
verify(
datastoreSnapshot.has(tag), "Missing %s in Datastore snapshot.", clazz.getSimpleName());
verify(cloudSqlSnapshot.has(tag), "Missing %s in Cloud SQL snapshot.", clazz.getSimpleName());
PCollectionList.of(datastoreSnapshot.get(tag))
.and(cloudSqlSnapshot.get(tag))
.apply("Combine from both snapshots: " + clazz.getSimpleName(), Flatten.pCollections())
.apply(
"Assign primary key to merged " + clazz.getSimpleName(),
WithKeys.of(ValidateSqlPipeline::getPrimaryKeyString).withKeyType(strings()))
.apply("Group by primary key " + clazz.getSimpleName(), GroupByKey.create())
.apply("Compare " + clazz.getSimpleName(), ParDo.of(new CompareSqlEntity()));
}
}
private static String getPrimaryKeyString(SqlEntity sqlEntity) {
// SqlEntity.getPrimaryKeyString only works with entities registered with Hibernate.
// We are using the BulkQueryJpaTransactionManager, which does not recognize DomainBase and
// DomainHistory. See BulkQueryEntities.java for more information.
if (sqlEntity instanceof DomainBase) {
return "DomainBase_" + ((DomainBase) sqlEntity).getRepoId();
}
if (sqlEntity instanceof DomainHistory) {
return "DomainHistory_" + ((DomainHistory) sqlEntity).getDomainHistoryId().toString();
}
return sqlEntity.getPrimaryKeyString();
}
public static void main(String[] args) {
ValidateSqlPipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(ValidateSqlPipelineOptions.class);
// Defensively set important options.
options.setIsolationOverride(TransactionIsolationLevel.TRANSACTION_REPEATABLE_READ);
options.setJpaTransactionManagerType(JpaTransactionManagerType.BULK_QUERY);
// Reuse Dataflow worker initialization code to set up JPA in the pipeline harness.
new RegistryPipelineWorkerInitializer().beforeProcessing(options);
DatastoreSnapshotInfo mostRecentExport =
DaggerLatestDatastoreSnapshotFinder_LatestDatastoreSnapshotFinderFinderComponent.create()
.datastoreSnapshotInfoFinder()
.getSnapshotInfo();
new ValidateSqlPipeline(options, mostRecentExport).run(Pipeline.create(options));
}
}

View file

@ -0,0 +1,20 @@
// Copyright 2021 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.beam.comparedb;
import google.registry.beam.common.RegistryPipelineOptions;
/** BEAM pipeline options for {@link ValidateSqlPipeline}. */
public interface ValidateSqlPipelineOptions extends RegistryPipelineOptions {}

View file

@ -0,0 +1,321 @@
// Copyright 2021 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.beam.comparedb;
import static com.google.common.base.Verify.verify;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.Maps;
import com.google.common.flogger.FluentLogger;
import google.registry.model.EppResource;
import google.registry.model.ImmutableObject;
import google.registry.model.billing.BillingEvent.OneTime;
import google.registry.model.contact.ContactBase;
import google.registry.model.contact.ContactHistory;
import google.registry.model.domain.DomainContent;
import google.registry.model.domain.DomainHistory;
import google.registry.model.eppcommon.AuthInfo;
import google.registry.model.host.HostHistory;
import google.registry.model.poll.PollMessage;
import google.registry.model.replay.SqlEntity;
import google.registry.model.reporting.HistoryEntry;
import google.registry.model.tld.Registry;
import java.lang.reflect.Field;
import java.math.BigInteger;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.joda.money.Money;
/** Helpers for use by {@link ValidateSqlPipeline}. */
final class ValidateSqlUtils {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private ValidateSqlUtils() {}
/**
* Query template for finding the median value of the {@code history_revision_id} column in one of
* the History tables.
*
* <p>The {@link ValidateSqlPipeline} uses this query to parallelize the query to some of the
* history tables. Although the {@code repo_id} column is the leading column in the primary keys
* of these tables, in practice and with production data, division by {@code history_revision_id}
* works slightly faster for unknown reasons.
*/
private static final String MEDIAN_ID_QUERY_TEMPLATE =
"SELECT history_revision_id FROM ( "
+ " SELECT"
+ " ROW_NUMBER() OVER (ORDER BY history_revision_id ASC) AS rownumber,"
+ " history_revision_id"
+ " FROM \"%TABLE%\""
+ ") AS foo\n"
+ "WHERE rownumber in (select count(*) / 2 + 1 from \"%TABLE%\")";
static Optional<Long> getMedianIdForHistoryTable(String tableName) {
Preconditions.checkArgument(
tableName.endsWith("History"), "Table must be one of the History tables.");
String sqlText = MEDIAN_ID_QUERY_TEMPLATE.replace("%TABLE%", tableName);
List results =
jpaTm()
.transact(() -> jpaTm().getEntityManager().createNativeQuery(sqlText).getResultList());
verify(results.size() < 2, "MidPoint query should have at most one result.");
if (results.isEmpty()) {
return Optional.empty();
}
return Optional.of(((BigInteger) results.get(0)).longValue());
}
static TupleTag<SqlEntity> createSqlEntityTupleTag(Class<? extends SqlEntity> actualType) {
return new TupleTag<SqlEntity>(actualType.getSimpleName()) {};
}
static class CompareSqlEntity extends DoFn<KV<String, Iterable<SqlEntity>>, Void> {
private final HashMap<String, Counter> totalCounters = new HashMap<>();
private final HashMap<String, Counter> missingCounters = new HashMap<>();
private final HashMap<String, Counter> unequalCounters = new HashMap<>();
private final HashMap<String, Counter> badEntityCounters = new HashMap<>();
private volatile boolean logPrinted = false;
private String getCounterKey(Class<?> clazz) {
return PollMessage.class.isAssignableFrom(clazz) ? "PollMessage" : clazz.getSimpleName();
}
private synchronized void ensureCounterExists(String counterKey) {
if (totalCounters.containsKey(counterKey)) {
return;
}
totalCounters.put(counterKey, Metrics.counter("CompareDB", "Total Compared: " + counterKey));
missingCounters.put(
counterKey, Metrics.counter("CompareDB", "Missing In One DB: " + counterKey));
unequalCounters.put(counterKey, Metrics.counter("CompareDB", "Not Equal:" + counterKey));
badEntityCounters.put(counterKey, Metrics.counter("CompareDB", "Bad Entities:" + counterKey));
}
/**
* A rudimentary debugging helper that prints the first pair of unequal entities in each worker.
* This will be removed when we start exporting such entities to GCS.
*/
void logDiff(String key, Object entry0, Object entry1) {
if (logPrinted) {
return;
}
logPrinted = true;
Map<String, Object> fields0 = ((ImmutableObject) entry0).toDiffableFieldMap();
Map<String, Object> fields1 = ((ImmutableObject) entry1).toDiffableFieldMap();
StringBuilder sb = new StringBuilder();
fields0.forEach(
(field, value) -> {
if (fields1.containsKey(field)) {
if (!Objects.equals(value, fields1.get(field))) {
sb.append(field + " not match: " + value + " -> " + fields1.get(field) + "\n");
}
} else {
sb.append(field + "Not found in entity 2\n");
}
});
fields1.forEach(
(field, value) -> {
if (!fields0.containsKey(field)) {
sb.append(field + "Not found in entity 1\n");
}
});
logger.atWarning().log(key + " " + sb.toString());
}
@ProcessElement
public void processElement(@Element KV<String, Iterable<SqlEntity>> kv) {
ImmutableList<SqlEntity> entities = ImmutableList.copyOf(kv.getValue());
verify(!entities.isEmpty(), "Can't happen: no value for key %s.", kv.getKey());
verify(entities.size() <= 2, "Unexpected duplicates for key %s", kv.getKey());
String counterKey = getCounterKey(entities.get(0).getClass());
ensureCounterExists(counterKey);
totalCounters.get(counterKey).inc();
if (entities.size() == 1) {
missingCounters.get(counterKey).inc();
// Temporary debugging help. See logDiff() above.
if (!logPrinted) {
logPrinted = true;
logger.atWarning().log("Unexpected single entity: %s", kv.getKey());
}
return;
}
SqlEntity entity0;
SqlEntity entity1;
try {
entity0 = normalizeEntity(entities.get(0));
entity1 = normalizeEntity(entities.get(1));
} catch (Exception e) {
// Temporary debugging help. See logDiff() above.
if (!logPrinted) {
logPrinted = true;
badEntityCounters.get(counterKey).inc();
}
return;
}
if (!Objects.equals(entity0, entity1)) {
unequalCounters.get(counterKey).inc();
logDiff(kv.getKey(), entities.get(0), entities.get(1));
}
}
}
static SqlEntity normalizeEntity(SqlEntity sqlEntity) {
if (sqlEntity instanceof EppResource) {
return normalizeEppResource(sqlEntity);
}
if (sqlEntity instanceof HistoryEntry) {
return (SqlEntity) normalizeHistoryEntry((HistoryEntry) sqlEntity);
}
if (sqlEntity instanceof Registry) {
return normalizeRegistry((Registry) sqlEntity);
}
if (sqlEntity instanceof OneTime) {
return normalizeOnetime((OneTime) sqlEntity);
}
return sqlEntity;
}
/**
* Normalizes an {@link EppResource} instance for comparison.
*
* <p>This method may modify the input object using reflection instead of making a copy with
* {@code eppResource.asBuilder().build()}, because when {@code eppResource} is a {@link
* google.registry.model.domain.DomainBase}, the {@code build} method accesses the Database, which
* we want to avoid.
*/
static SqlEntity normalizeEppResource(SqlEntity eppResource) {
try {
Field authField =
eppResource instanceof DomainContent
? DomainContent.class.getDeclaredField("authInfo")
: eppResource instanceof ContactBase
? ContactBase.class.getDeclaredField("authInfo")
: null;
if (authField != null) {
authField.setAccessible(true);
AuthInfo authInfo = (AuthInfo) authField.get(eppResource);
// When AuthInfo is missing, the authInfo field is null if the object is loaded from
// Datastore, or a PasswordAuth with null properties if loaded from SQL. In the second case
// we set the authInfo field to null.
if (authInfo != null
&& authInfo.getPw() != null
&& authInfo.getPw().getRepoId() == null
&& authInfo.getPw().getValue() == null) {
authField.set(eppResource, null);
}
}
Field field = EppResource.class.getDeclaredField("revisions");
field.setAccessible(true);
field.set(eppResource, null);
return eppResource;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* Normalizes a {@link HistoryEntry} for comparison.
*
* <p>This method modifies the input using reflection because relevant builder methods performs
* unwanted checks and changes.
*/
static HistoryEntry normalizeHistoryEntry(HistoryEntry historyEntry) {
// History objects from Datastore do not have details of their EppResource objects
// (domainContent, contactBase, hostBase).
try {
if (historyEntry instanceof DomainHistory) {
Field domainContent = DomainHistory.class.getDeclaredField("domainContent");
domainContent.setAccessible(true);
domainContent.set(historyEntry, null);
Field domainTransactionRecords =
HistoryEntry.class.getDeclaredField("domainTransactionRecords");
domainTransactionRecords.setAccessible(true);
Set<?> domainTransactionRecordsValue = (Set<?>) domainTransactionRecords.get(historyEntry);
if (domainTransactionRecordsValue != null && domainTransactionRecordsValue.isEmpty()) {
domainTransactionRecords.set(historyEntry, null);
}
} else if (historyEntry instanceof ContactHistory) {
Field contactBase = ContactHistory.class.getDeclaredField("contactBase");
contactBase.setAccessible(true);
contactBase.set(historyEntry, null);
} else if (historyEntry instanceof HostHistory) {
Field hostBase = HostHistory.class.getDeclaredField("hostBase");
hostBase.setAccessible(true);
hostBase.set(historyEntry, null);
}
return historyEntry;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
static Registry normalizeRegistry(Registry registry) {
if (registry.getStandardCreateCost().getAmount().scale() == 0) {
return registry;
}
return registry
.asBuilder()
.setCreateBillingCost(normalizeMoney(registry.getStandardCreateCost()))
.setRestoreBillingCost(normalizeMoney(registry.getStandardRestoreCost()))
.setServerStatusChangeBillingCost(normalizeMoney(registry.getServerStatusChangeCost()))
.setRegistryLockOrUnlockBillingCost(
normalizeMoney(registry.getRegistryLockOrUnlockBillingCost()))
.setRenewBillingCostTransitions(
ImmutableSortedMap.copyOf(
Maps.transformValues(
registry.getRenewBillingCostTransitions(), ValidateSqlUtils::normalizeMoney)))
.setEapFeeSchedule(
ImmutableSortedMap.copyOf(
Maps.transformValues(
registry.getEapFeeScheduleAsMap(), ValidateSqlUtils::normalizeMoney)))
.build();
}
/** Normalizes an {@link OneTime} instance for comparison. */
static OneTime normalizeOnetime(OneTime oneTime) {
Money cost = oneTime.getCost();
if (cost.getAmount().scale() == 0) {
return oneTime;
}
try {
return oneTime.asBuilder().setCost(normalizeMoney(oneTime.getCost())).build();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
static Money normalizeMoney(Money original) {
// Strips ".00" from the amount.
return Money.of(original.getCurrencyUnit(), original.getAmount().stripTrailingZeros());
}
}

View file

@ -101,8 +101,9 @@ public final class Transforms {
} }
/** /**
* Composite {@link PTransform transform} that loads the Datastore snapshot at {@code * Composite {@link PTransform transform} that loads the Datastore snapshot right before {@code
* commitLogToTime} for caller specified {@code kinds}. * commitLogToTime} for caller specified {@code kinds}. The resulting snapshot has all changes
* that happened before {@code commitLogToTime}, and none at or after {@code commitLogToTime}.
* *
* <p>Caller must provide the location of a Datastore export that started AFTER {@code * <p>Caller must provide the location of a Datastore export that started AFTER {@code
* commitLogFromTime} and completed BEFORE {@code commitLogToTime}, as well as the root directory * commitLogFromTime} and completed BEFORE {@code commitLogToTime}, as well as the root directory
@ -363,7 +364,7 @@ public final class Transforms {
* to make Optional work with BEAM) * to make Optional work with BEAM)
*/ */
@Nullable @Nullable
public static Object convertVersionedEntityToSqlEntity(VersionedEntity dsEntity) { public static SqlEntity convertVersionedEntityToSqlEntity(VersionedEntity dsEntity) {
return dsEntity return dsEntity
.getEntity() .getEntity()
.filter(Transforms::isMigratable) .filter(Transforms::isMigratable)

View file

@ -156,6 +156,10 @@ public abstract class ImmutableObject implements Cloneable {
* } * }
* } * }
* </pre> * </pre>
*
* <p>This method makes use of {@link #toStringHelper}, which embeds {@link
* System#identityHashCode} in the output string. Subclasses that require deterministic string
* representations <em>across</em> JVM instances should override this method.
*/ */
@Override @Override
public String toString() { public String toString() {

View file

@ -107,6 +107,10 @@ public class DomainHistoryLite extends HistoryEntry implements SqlOnlyEntity {
return VKey.create(DomainBase.class, getDomainRepoId()); return VKey.create(DomainBase.class, getDomainRepoId());
} }
public DomainHistoryId getDomainHistoryId() {
return new DomainHistoryId(getDomainRepoId(), getId());
}
@PostLoad @PostLoad
void postLoad() { void postLoad() {
if (domainContent == null) { if (domainContent == null) {

View file

@ -19,7 +19,9 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static google.registry.model.common.EntityGroupRoot.getCrossTldKey; import static google.registry.model.common.EntityGroupRoot.getCrossTldKey;
import static google.registry.util.DateTimeUtils.START_OF_TIME; import static google.registry.util.DateTimeUtils.START_OF_TIME;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter; import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableSortedMap;
import com.googlecode.objectify.Key; import com.googlecode.objectify.Key;
import com.googlecode.objectify.annotation.Entity; import com.googlecode.objectify.annotation.Entity;
import com.googlecode.objectify.annotation.Id; import com.googlecode.objectify.annotation.Id;
@ -294,5 +296,22 @@ public class Cursor extends ImmutableObject implements DatastoreAndSqlEntity, Un
this.type = type; this.type = type;
this.scope = scope; this.scope = scope;
} }
/**
* A deterministic string representation of a {@link CursorId}. See {@link
* ImmutableObject#toString} for more information.
*/
@Override
public String toString() {
return String.format(
"%s: {\n%s",
getClass().getSimpleName(),
Joiner.on('\n')
.join(
ImmutableSortedMap.<String, Object>of("scope", scope, "type", type)
.entrySet()))
.replaceAll("\n", "\n ")
+ "\n}";
}
} }
} }

View file

@ -16,6 +16,8 @@ package google.registry.model.contact;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableSortedMap;
import com.googlecode.objectify.Key; import com.googlecode.objectify.Key;
import com.googlecode.objectify.annotation.EntitySubclass; import com.googlecode.objectify.annotation.EntitySubclass;
import google.registry.model.EppResource; import google.registry.model.EppResource;
@ -200,6 +202,24 @@ public class ContactHistory extends HistoryEntry implements SqlEntity, UnsafeSer
private void setId(long id) { private void setId(long id) {
this.id = id; this.id = id;
} }
/**
* A deterministic string representation of a {@link ContactHistoryId}. See {@link
* ImmutableObject#toString} for more information.
*/
@Override
public String toString() {
return String.format(
"%s: {\n%s",
getClass().getSimpleName(),
Joiner.on('\n')
.join(
ImmutableSortedMap.<String, Object>of(
"contactRepoId", getContactRepoId(), "id", getId())
.entrySet()))
.replaceAll("\n", "\n ")
+ "\n}";
}
} }
@Override @Override

View file

@ -18,7 +18,9 @@ import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import static google.registry.util.CollectionUtils.nullToEmptyImmutableCopy; import static google.registry.util.CollectionUtils.nullToEmptyImmutableCopy;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedMap;
import com.googlecode.objectify.Key; import com.googlecode.objectify.Key;
import com.googlecode.objectify.annotation.EntitySubclass; import com.googlecode.objectify.annotation.EntitySubclass;
import google.registry.model.EppResource; import google.registry.model.EppResource;
@ -382,6 +384,24 @@ public class DomainHistory extends HistoryEntry implements SqlEntity {
private void setId(long id) { private void setId(long id) {
this.id = id; this.id = id;
} }
/**
* A deterministic string representation of a {@link DomainHistoryId}. See {@link
* ImmutableObject#toString} for more information.
*/
@Override
public String toString() {
return String.format(
"%s: {\n%s",
getClass().getSimpleName(),
Joiner.on('\n')
.join(
ImmutableSortedMap.<String, Object>of(
"domainRepoId", getDomainRepoId(), "id", getId())
.entrySet()))
.replaceAll("\n", "\n ")
+ "\n}";
}
} }
@Override @Override

View file

@ -16,6 +16,8 @@ package google.registry.model.host;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableSortedMap;
import com.googlecode.objectify.Key; import com.googlecode.objectify.Key;
import com.googlecode.objectify.annotation.EntitySubclass; import com.googlecode.objectify.annotation.EntitySubclass;
import google.registry.model.EppResource; import google.registry.model.EppResource;
@ -203,6 +205,24 @@ public class HostHistory extends HistoryEntry implements SqlEntity, UnsafeSerial
private void setId(long id) { private void setId(long id) {
this.id = id; this.id = id;
} }
/**
* A deterministic string representation of a {@link HostHistoryId}. See {@link
* ImmutableObject#toString} for more information.
*/
@Override
public String toString() {
return String.format(
"%s: {\n%s",
getClass().getSimpleName(),
Joiner.on('\n')
.join(
ImmutableSortedMap.<String, Object>of(
"hostRepoId", getHostRepoId(), "id", getId())
.entrySet()))
.replaceAll("\n", "\n ")
+ "\n}";
}
} }
@Override @Override

View file

@ -32,7 +32,9 @@ import static java.util.stream.Collectors.joining;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Enums; import com.google.common.base.Enums;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.ImmutableSortedSet; import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Streams; import com.google.common.collect.Streams;
import com.googlecode.objectify.Key; import com.googlecode.objectify.Key;
@ -413,6 +415,24 @@ public class RegistrarContact extends ImmutableObject
this.emailAddress = emailAddress; this.emailAddress = emailAddress;
this.registrarId = registrarId; this.registrarId = registrarId;
} }
/**
* A deterministic string representation of a {@link RegistrarPocId}. See {@link
* ImmutableObject#toString} for more information.
*/
@Override
public String toString() {
return String.format(
"%s: {\n%s",
getClass().getSimpleName(),
Joiner.on('\n')
.join(
ImmutableSortedMap.<String, Object>of(
"emailAddress", emailAddress, "registrarId", registrarId)
.entrySet()))
.replaceAll("\n", "\n ")
+ "\n}";
}
} }
/** A builder for constructing a {@link RegistrarContact}, since it is immutable. */ /** A builder for constructing a {@link RegistrarContact}, since it is immutable. */

View file

@ -55,7 +55,8 @@ class RegistryJpaWriteTest implements Serializable {
@RegisterExtension @RegisterExtension
@Order(Order.DEFAULT - 1) @Order(Order.DEFAULT - 1)
final transient DatastoreEntityExtension datastore = new DatastoreEntityExtension(); final transient DatastoreEntityExtension datastore =
new DatastoreEntityExtension().allThreads(true);
@RegisterExtension final transient InjectExtension injectExtension = new InjectExtension(); @RegisterExtension final transient InjectExtension injectExtension = new InjectExtension();

View file

@ -0,0 +1,93 @@
// Copyright 2021 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.beam.comparedb;
import com.google.common.collect.ImmutableSet;
import google.registry.beam.TestPipelineExtension;
import google.registry.beam.initsql.DatastoreSetupHelper;
import google.registry.model.domain.DomainHistory;
import google.registry.model.ofy.Ofy;
import google.registry.model.registrar.Registrar;
import google.registry.model.replay.SqlEntity;
import google.registry.persistence.transaction.JpaTestExtensions;
import google.registry.persistence.transaction.JpaTestExtensions.JpaIntegrationTestExtension;
import google.registry.testing.DatastoreEntityExtension;
import google.registry.testing.FakeClock;
import google.registry.testing.InjectExtension;
import java.io.Serializable;
import java.nio.file.Path;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.joda.time.DateTime;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;
/** Unit tests for {@link DatastoreSnapshots}. */
class DatastoreSnapshotsTest {
static final DateTime START_TIME = DateTime.parse("2000-01-01T00:00:00.0Z");
private FakeClock fakeClock = new FakeClock(START_TIME);
@RegisterExtension
@Order(Order.DEFAULT - 1)
final transient DatastoreEntityExtension datastore =
new DatastoreEntityExtension().allThreads(true);
@RegisterExtension final transient InjectExtension injectExtension = new InjectExtension();
@SuppressWarnings("WeakerAccess")
@TempDir
transient Path tmpDir;
@RegisterExtension
final transient TestPipelineExtension testPipeline =
TestPipelineExtension.create().enableAbandonedNodeEnforcement(true);
@RegisterExtension
final transient JpaIntegrationTestExtension database =
new JpaTestExtensions.Builder().withClock(fakeClock).buildIntegrationTestExtension();
DatastoreSetupHelper setupHelper;
@BeforeEach
void beforeEach() throws Exception {
injectExtension.setStaticField(Ofy.class, "clock", fakeClock);
setupHelper = new DatastoreSetupHelper(tmpDir, fakeClock).initializeData();
testPipeline
.getCoderRegistry()
.registerCoderForClass(SqlEntity.class, SerializableCoder.of(Serializable.class));
}
@Test
void loadDatastoreSnapshotByKind() {
PCollectionTuple tuple =
DatastoreSnapshots.loadDatastoreSnapshotByKind(
testPipeline,
setupHelper.exportDir.getAbsolutePath(),
setupHelper.commitLogDir.getAbsolutePath(),
START_TIME,
fakeClock.nowUtc().plusMillis(1),
ImmutableSet.copyOf(DatastoreSetupHelper.ALL_KINDS));
PAssert.that(tuple.get(ValidateSqlUtils.createSqlEntityTupleTag(Registrar.class)))
.containsInAnyOrder(setupHelper.registrar1, setupHelper.registrar2);
PAssert.that(tuple.get(ValidateSqlUtils.createSqlEntityTupleTag(DomainHistory.class)))
.containsInAnyOrder(setupHelper.historyEntry);
testPipeline.run();
}
}

View file

@ -0,0 +1,100 @@
// Copyright 2021 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.beam.comparedb;
import static google.registry.beam.comparedb.ValidateSqlUtils.createSqlEntityTupleTag;
import static org.joda.time.DateTimeZone.UTC;
import com.google.common.collect.ImmutableSet;
import google.registry.beam.TestPipelineExtension;
import google.registry.model.bulkquery.TestSetupHelper;
import google.registry.model.contact.ContactResource;
import google.registry.model.domain.DomainBase;
import google.registry.model.domain.DomainHistory;
import google.registry.model.host.HostResource;
import google.registry.model.registrar.Registrar;
import google.registry.model.replay.SqlEntity;
import google.registry.model.tld.Registry;
import google.registry.testing.AppEngineExtension;
import google.registry.testing.DatastoreEntityExtension;
import google.registry.testing.FakeClock;
import java.io.Serializable;
import java.util.Optional;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.joda.time.DateTime;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
/** Unit tests for {@link SqlSnapshots}. */
class SqlSnapshotsTest {
protected FakeClock fakeClock = new FakeClock(DateTime.now(UTC));
@RegisterExtension
@Order(Order.DEFAULT - 1)
final transient DatastoreEntityExtension datastore =
new DatastoreEntityExtension().allThreads(true);
@RegisterExtension
public final AppEngineExtension appEngine =
AppEngineExtension.builder().withDatastoreAndCloudSql().withClock(fakeClock).build();
@RegisterExtension
final transient TestPipelineExtension testPipeline =
TestPipelineExtension.create().enableAbandonedNodeEnforcement(true);
private final TestSetupHelper setupHelper = new TestSetupHelper(fakeClock);
@BeforeEach
void setUp() {
testPipeline
.getCoderRegistry()
.registerCoderForClass(SqlEntity.class, SerializableCoder.of(Serializable.class));
setupHelper.initializeAllEntities();
setupHelper.setupBulkQueryJpaTm(appEngine);
}
@AfterEach
void afterEach() {
setupHelper.tearDownBulkQueryJpaTm();
}
@Test
void loadCloudSqlSnapshotByType() {
PCollectionTuple sqlSnapshot =
SqlSnapshots.loadCloudSqlSnapshotByType(
testPipeline,
ImmutableSet.of(
Registry.class,
Registrar.class,
DomainBase.class,
DomainHistory.class,
ContactResource.class,
HostResource.class),
Optional.empty());
PAssert.that(sqlSnapshot.get(createSqlEntityTupleTag(Registry.class)))
.containsInAnyOrder(setupHelper.registry);
PAssert.that(sqlSnapshot.get(createSqlEntityTupleTag(DomainBase.class)))
.containsInAnyOrder(setupHelper.domain);
PAssert.that(sqlSnapshot.get(createSqlEntityTupleTag(DomainHistory.class)))
.containsInAnyOrder(setupHelper.domainHistory);
testPipeline.run();
}
}

View file

@ -0,0 +1,63 @@
// Copyright 2021 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.beam.comparedb;
import static com.google.common.truth.Truth8.assertThat;
import static google.registry.beam.comparedb.ValidateSqlUtils.getMedianIdForHistoryTable;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import static org.joda.time.DateTimeZone.UTC;
import com.google.common.truth.Truth;
import google.registry.model.bulkquery.TestSetupHelper;
import google.registry.model.domain.DomainHistory;
import google.registry.testing.AppEngineExtension;
import google.registry.testing.FakeClock;
import org.joda.time.DateTime;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
/** Unit tests for {@link ValidateSqlUtils}. */
class ValidateSqlUtilsTest {
private final FakeClock fakeClock = new FakeClock(DateTime.now(UTC));
private final TestSetupHelper setupHelper = new TestSetupHelper(fakeClock);
@RegisterExtension
public final AppEngineExtension appEngine =
AppEngineExtension.builder().withDatastoreAndCloudSql().withClock(fakeClock).build();
@Test
void getMedianIdForHistoryTable_emptyTable() {
assertThat(getMedianIdForHistoryTable("DomainHistory")).isEmpty();
}
@Test
void getMedianIdForHistoryTable_oneRow() {
setupHelper.initializeAllEntities();
Truth.assertThat(jpaTm().transact(() -> jpaTm().loadAllOf(DomainHistory.class))).hasSize(1);
assertThat(getMedianIdForHistoryTable("DomainHistory"))
.hasValue(setupHelper.domainHistory.getId());
}
@Test
void getMedianIdForHistoryTable_twoRows() {
setupHelper.initializeAllEntities();
setupHelper.applyChangeToDomainAndHistory();
Truth.assertThat(jpaTm().transact(() -> jpaTm().loadAllOf(DomainHistory.class))).hasSize(2);
assertThat(getMedianIdForHistoryTable("DomainHistory"))
.hasValue(setupHelper.domainHistory.getId());
}
}

View file

@ -0,0 +1,295 @@
// Copyright 2021 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.beam.initsql;
import static google.registry.model.common.Cursor.CursorType.BRDA;
import static google.registry.model.common.Cursor.CursorType.RECURRING_BILLING;
import static google.registry.model.domain.token.AllocationToken.TokenType.SINGLE_USE;
import static google.registry.testing.DatabaseHelper.newRegistry;
import static google.registry.testing.DatabaseHelper.persistResource;
import static google.registry.testing.DatabaseHelper.persistSimpleResource;
import static google.registry.util.DateTimeUtils.END_OF_TIME;
import static google.registry.util.DateTimeUtils.START_OF_TIME;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.googlecode.objectify.Key;
import google.registry.flows.domain.DomainFlowUtils;
import google.registry.model.billing.BillingEvent;
import google.registry.model.billing.BillingEvent.Flag;
import google.registry.model.billing.BillingEvent.Reason;
import google.registry.model.common.Cursor;
import google.registry.model.contact.ContactResource;
import google.registry.model.domain.DesignatedContact;
import google.registry.model.domain.DomainAuthInfo;
import google.registry.model.domain.DomainBase;
import google.registry.model.domain.DomainHistory;
import google.registry.model.domain.GracePeriod;
import google.registry.model.domain.launch.LaunchNotice;
import google.registry.model.domain.rgp.GracePeriodStatus;
import google.registry.model.domain.secdns.DelegationSignerData;
import google.registry.model.domain.token.AllocationToken;
import google.registry.model.eppcommon.AuthInfo.PasswordAuth;
import google.registry.model.eppcommon.StatusValue;
import google.registry.model.eppcommon.Trid;
import google.registry.model.host.HostResource;
import google.registry.model.poll.PollMessage;
import google.registry.model.registrar.Registrar;
import google.registry.model.registrar.RegistrarContact;
import google.registry.model.reporting.HistoryEntry;
import google.registry.model.tld.Registry;
import google.registry.model.transfer.DomainTransferData;
import google.registry.model.transfer.TransferStatus;
import google.registry.persistence.VKey;
import google.registry.testing.AppEngineExtension;
import google.registry.testing.FakeClock;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import org.joda.money.Money;
/**
* Sets up a test scenario in Datastore.
*
* <p>The {@link #initializeData} populates Datastore with test data, including {@link DomainBase},
* {@link DomainHistory}, and commit logs. The up-to-date version of the relevant entities are saved
* in public instance variables (e.g., {@link #domain} for easy access.
*/
public class DatastoreSetupHelper {
/**
* All kinds of entities to be set up in the Datastore. Must contain all kinds known to {@link
* InitSqlPipeline}.
*/
public static final ImmutableList<Class<?>> ALL_KINDS =
ImmutableList.of(
Registry.class,
Cursor.class,
Registrar.class,
ContactResource.class,
RegistrarContact.class,
DomainBase.class,
HostResource.class,
HistoryEntry.class,
AllocationToken.class,
BillingEvent.Recurring.class,
BillingEvent.OneTime.class,
BillingEvent.Cancellation.class,
PollMessage.class);
private final Path tmpDir;
private final FakeClock fakeClock;
public File exportRootDir;
public File exportDir;
public File commitLogDir;
public Registrar registrar1;
public Registrar registrar2;
public DomainBase domain;
public ContactResource contact1;
public ContactResource contact2;
public HostResource hostResource;
public DomainHistory historyEntry;
public Cursor globalCursor;
public Cursor tldCursor;
public DatastoreSetupHelper(Path tempDir, FakeClock fakeClock) {
this.tmpDir = tempDir;
this.fakeClock = fakeClock;
}
public DatastoreSetupHelper initializeData() throws Exception {
try (BackupTestStore store = new BackupTestStore(fakeClock)) {
exportRootDir = Files.createDirectory(tmpDir.resolve("exports")).toFile();
persistResource(newRegistry("com", "COM"));
registrar1 = persistResource(AppEngineExtension.makeRegistrar1());
registrar2 = persistResource(AppEngineExtension.makeRegistrar2());
Key<DomainBase> domainKey = Key.create(null, DomainBase.class, "4-COM");
hostResource =
persistResource(
new HostResource.Builder()
.setHostName("ns1.example.com")
.setSuperordinateDomain(VKey.from(domainKey))
.setRepoId("1-COM")
.setCreationRegistrarId(registrar1.getRegistrarId())
.setPersistedCurrentSponsorRegistrarId(registrar2.getRegistrarId())
.build());
contact1 =
persistResource(
new ContactResource.Builder()
.setContactId("contact_id1")
.setRepoId("2-COM")
.setCreationRegistrarId(registrar1.getRegistrarId())
.setPersistedCurrentSponsorRegistrarId(registrar2.getRegistrarId())
.build());
contact2 =
persistResource(
new ContactResource.Builder()
.setContactId("contact_id2")
.setRepoId("3-COM")
.setCreationRegistrarId(registrar1.getRegistrarId())
.setPersistedCurrentSponsorRegistrarId(registrar1.getRegistrarId())
.build());
persistSimpleResource(
new RegistrarContact.Builder()
.setParent(registrar1)
.setName("John Abused")
.setEmailAddress("johnabuse@example.com")
.setVisibleInWhoisAsAdmin(true)
.setVisibleInWhoisAsTech(false)
.setPhoneNumber("+1.2125551213")
.setFaxNumber("+1.2125551213")
.setTypes(ImmutableSet.of(RegistrarContact.Type.ABUSE, RegistrarContact.Type.ADMIN))
.build());
historyEntry =
persistResource(
new DomainHistory.Builder()
.setDomainRepoId(domainKey.getName())
.setModificationTime(fakeClock.nowUtc())
.setRegistrarId(registrar1.getRegistrarId())
.setType(HistoryEntry.Type.DOMAIN_CREATE)
.build());
persistResource(
new AllocationToken.Builder().setToken("abc123").setTokenType(SINGLE_USE).build());
Key<DomainHistory> historyEntryKey = Key.create(historyEntry);
BillingEvent.OneTime onetimeBillEvent =
new BillingEvent.OneTime.Builder()
.setId(1)
.setReason(Reason.RENEW)
.setTargetId("example.com")
.setRegistrarId("TheRegistrar")
.setCost(Money.parse("USD 44.00"))
.setPeriodYears(4)
.setEventTime(fakeClock.nowUtc())
.setBillingTime(fakeClock.nowUtc())
.setParent(historyEntryKey)
.build();
persistResource(onetimeBillEvent);
Key<BillingEvent.OneTime> oneTimeBillKey = Key.create(onetimeBillEvent);
BillingEvent.Recurring recurringBillEvent =
new BillingEvent.Recurring.Builder()
.setId(2)
.setReason(Reason.RENEW)
.setFlags(ImmutableSet.of(Flag.AUTO_RENEW))
.setTargetId("example.com")
.setRegistrarId("TheRegistrar")
.setEventTime(fakeClock.nowUtc())
.setRecurrenceEndTime(END_OF_TIME)
.setParent(historyEntryKey)
.build();
persistResource(recurringBillEvent);
VKey<BillingEvent.Recurring> recurringBillKey = recurringBillEvent.createVKey();
PollMessage.Autorenew autorenewPollMessage =
new PollMessage.Autorenew.Builder()
.setId(3L)
.setTargetId("example.com")
.setRegistrarId("TheRegistrar")
.setEventTime(fakeClock.nowUtc())
.setMsg("Domain was auto-renewed.")
.setParent(historyEntry)
.build();
persistResource(autorenewPollMessage);
VKey<PollMessage.Autorenew> autorenewPollKey = autorenewPollMessage.createVKey();
PollMessage.OneTime oneTimePollMessage =
new PollMessage.OneTime.Builder()
.setId(1L)
.setParent(historyEntry)
.setEventTime(fakeClock.nowUtc())
.setRegistrarId("TheRegistrar")
.setMsg(DomainFlowUtils.COLLISION_MESSAGE)
.build();
persistResource(oneTimePollMessage);
VKey<PollMessage.OneTime> onetimePollKey = oneTimePollMessage.createVKey();
domain =
persistResource(
new DomainBase.Builder()
.setDomainName("example.com")
.setRepoId("4-COM")
.setCreationRegistrarId(registrar1.getRegistrarId())
.setLastEppUpdateTime(fakeClock.nowUtc())
.setLastEppUpdateRegistrarId(registrar2.getRegistrarId())
.setLastTransferTime(fakeClock.nowUtc())
.setStatusValues(
ImmutableSet.of(
StatusValue.CLIENT_DELETE_PROHIBITED,
StatusValue.SERVER_DELETE_PROHIBITED,
StatusValue.SERVER_TRANSFER_PROHIBITED,
StatusValue.SERVER_UPDATE_PROHIBITED,
StatusValue.SERVER_RENEW_PROHIBITED,
StatusValue.SERVER_HOLD))
.setRegistrant(contact1.createVKey())
.setContacts(
ImmutableSet.of(
DesignatedContact.create(
DesignatedContact.Type.ADMIN, contact2.createVKey())))
.setNameservers(ImmutableSet.of(hostResource.createVKey()))
.setSubordinateHosts(ImmutableSet.of("ns1.example.com"))
.setPersistedCurrentSponsorRegistrarId(registrar2.getRegistrarId())
.setRegistrationExpirationTime(fakeClock.nowUtc().plusYears(1))
.setAuthInfo(DomainAuthInfo.create(PasswordAuth.create("password")))
.setDsData(
ImmutableSet.of(DelegationSignerData.create(1, 2, 3, new byte[] {0, 1, 2})))
.setLaunchNotice(
LaunchNotice.create("tcnid", "validatorId", START_OF_TIME, START_OF_TIME))
.setTransferData(
new DomainTransferData.Builder()
.setGainingRegistrarId(registrar1.getRegistrarId())
.setLosingRegistrarId(registrar2.getRegistrarId())
.setPendingTransferExpirationTime(fakeClock.nowUtc())
.setServerApproveEntities(
ImmutableSet.of(
VKey.from(oneTimeBillKey), recurringBillKey, autorenewPollKey))
.setServerApproveBillingEvent(VKey.from(oneTimeBillKey))
.setServerApproveAutorenewEvent(recurringBillKey)
.setServerApproveAutorenewPollMessage(autorenewPollKey)
.setTransferRequestTime(fakeClock.nowUtc().plusDays(1))
.setTransferStatus(TransferStatus.SERVER_APPROVED)
.setTransferRequestTrid(Trid.create("client-trid", "server-trid"))
.build())
.setDeletePollMessage(onetimePollKey)
.setAutorenewBillingEvent(recurringBillKey)
.setAutorenewPollMessage(autorenewPollKey)
.setSmdId("smdid")
.addGracePeriod(
GracePeriod.create(
GracePeriodStatus.ADD,
"4-COM",
fakeClock.nowUtc().plusDays(1),
"TheRegistrar",
null))
.build());
persistResource(
new BillingEvent.Cancellation.Builder()
.setReason(Reason.RENEW)
.setTargetId(domain.getDomainName())
.setRegistrarId(domain.getCurrentSponsorRegistrarId())
.setEventTime(fakeClock.nowUtc())
.setBillingTime(fakeClock.nowUtc())
.setRecurringEventKey(recurringBillEvent.createVKey())
.setParent(historyEntryKey)
.build());
globalCursor = persistResource(Cursor.createGlobal(RECURRING_BILLING, fakeClock.nowUtc()));
tldCursor = persistResource(Cursor.create(BRDA, fakeClock.nowUtc(), Registry.get("com")));
exportDir = store.export(exportRootDir.getAbsolutePath(), ALL_KINDS, ImmutableSet.of());
commitLogDir = Files.createDirectory(tmpDir.resolve("commits")).toFile();
fakeClock.advanceOneMilli();
}
return this;
}
}

View file

@ -79,7 +79,7 @@ class ExportLoadingTransformsTest implements Serializable {
@RegisterExtension @RegisterExtension
@Order(value = 1) @Order(value = 1)
final transient DatastoreEntityExtension datastoreEntityExtension = final transient DatastoreEntityExtension datastoreEntityExtension =
new DatastoreEntityExtension(); new DatastoreEntityExtension().allThreads(true);
@RegisterExtension @RegisterExtension
final transient TestPipelineExtension testPipeline = final transient TestPipelineExtension testPipeline =

View file

@ -17,59 +17,22 @@ package google.registry.beam.initsql;
import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.Truth.assertThat;
import static google.registry.model.ImmutableObjectSubject.assertAboutImmutableObjects; import static google.registry.model.ImmutableObjectSubject.assertAboutImmutableObjects;
import static google.registry.model.ImmutableObjectSubject.immutableObjectCorrespondence; import static google.registry.model.ImmutableObjectSubject.immutableObjectCorrespondence;
import static google.registry.model.common.Cursor.CursorType.BRDA;
import static google.registry.model.common.Cursor.CursorType.RECURRING_BILLING;
import static google.registry.model.domain.token.AllocationToken.TokenType.SINGLE_USE;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import static google.registry.testing.DatabaseHelper.newRegistry;
import static google.registry.testing.DatabaseHelper.persistResource;
import static google.registry.testing.DatabaseHelper.persistSimpleResource;
import static google.registry.util.DateTimeUtils.END_OF_TIME;
import static google.registry.util.DateTimeUtils.START_OF_TIME;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.googlecode.objectify.Key;
import google.registry.beam.TestPipelineExtension; import google.registry.beam.TestPipelineExtension;
import google.registry.flows.domain.DomainFlowUtils;
import google.registry.model.billing.BillingEvent;
import google.registry.model.billing.BillingEvent.Flag;
import google.registry.model.billing.BillingEvent.Reason;
import google.registry.model.common.Cursor; import google.registry.model.common.Cursor;
import google.registry.model.contact.ContactResource; import google.registry.model.contact.ContactResource;
import google.registry.model.domain.DesignatedContact;
import google.registry.model.domain.DomainAuthInfo;
import google.registry.model.domain.DomainBase; import google.registry.model.domain.DomainBase;
import google.registry.model.domain.DomainHistory;
import google.registry.model.domain.GracePeriod;
import google.registry.model.domain.launch.LaunchNotice;
import google.registry.model.domain.rgp.GracePeriodStatus;
import google.registry.model.domain.secdns.DelegationSignerData;
import google.registry.model.domain.token.AllocationToken;
import google.registry.model.eppcommon.AuthInfo.PasswordAuth;
import google.registry.model.eppcommon.StatusValue;
import google.registry.model.eppcommon.Trid;
import google.registry.model.host.HostResource; import google.registry.model.host.HostResource;
import google.registry.model.ofy.Ofy; import google.registry.model.ofy.Ofy;
import google.registry.model.poll.PollMessage;
import google.registry.model.registrar.Registrar; import google.registry.model.registrar.Registrar;
import google.registry.model.registrar.RegistrarContact;
import google.registry.model.reporting.HistoryEntry;
import google.registry.model.tld.Registry;
import google.registry.model.transfer.DomainTransferData;
import google.registry.model.transfer.TransferStatus;
import google.registry.persistence.VKey;
import google.registry.persistence.transaction.JpaTestExtensions; import google.registry.persistence.transaction.JpaTestExtensions;
import google.registry.persistence.transaction.JpaTestExtensions.JpaIntegrationTestExtension; import google.registry.persistence.transaction.JpaTestExtensions.JpaIntegrationTestExtension;
import google.registry.testing.AppEngineExtension;
import google.registry.testing.DatastoreEntityExtension; import google.registry.testing.DatastoreEntityExtension;
import google.registry.testing.FakeClock; import google.registry.testing.FakeClock;
import google.registry.testing.InjectExtension; import google.registry.testing.InjectExtension;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.joda.money.Money;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Order;
@ -81,27 +44,7 @@ import org.junit.jupiter.api.io.TempDir;
class InitSqlPipelineTest { class InitSqlPipelineTest {
private static final DateTime START_TIME = DateTime.parse("2000-01-01T00:00:00.0Z"); private static final DateTime START_TIME = DateTime.parse("2000-01-01T00:00:00.0Z");
/** private FakeClock fakeClock = new FakeClock(START_TIME);
* All kinds of entities to be set up in the Datastore. Must contain all kinds known to {@link
* InitSqlPipeline}.
*/
private static final ImmutableList<Class<?>> ALL_KINDS =
ImmutableList.of(
Registry.class,
Cursor.class,
Registrar.class,
ContactResource.class,
RegistrarContact.class,
DomainBase.class,
HostResource.class,
HistoryEntry.class,
AllocationToken.class,
BillingEvent.Recurring.class,
BillingEvent.OneTime.class,
BillingEvent.Cancellation.class,
PollMessage.class);
private transient FakeClock fakeClock = new FakeClock(START_TIME);
@RegisterExtension @RegisterExtension
@Order(Order.DEFAULT - 1) @Order(Order.DEFAULT - 1)
@ -122,201 +65,12 @@ class InitSqlPipelineTest {
final transient JpaIntegrationTestExtension database = final transient JpaIntegrationTestExtension database =
new JpaTestExtensions.Builder().withClock(fakeClock).buildIntegrationTestExtension(); new JpaTestExtensions.Builder().withClock(fakeClock).buildIntegrationTestExtension();
private File exportRootDir; DatastoreSetupHelper setupHelper;
private File exportDir;
private File commitLogDir;
private transient Registrar registrar1;
private transient Registrar registrar2;
private transient DomainBase domain;
private transient ContactResource contact1;
private transient ContactResource contact2;
private transient HostResource hostResource;
private transient DomainHistory historyEntry;
private transient Cursor globalCursor;
private transient Cursor tldCursor;
@BeforeEach @BeforeEach
void beforeEach() throws Exception { void beforeEach() throws Exception {
try (BackupTestStore store = new BackupTestStore(fakeClock)) { injectExtension.setStaticField(Ofy.class, "clock", fakeClock);
injectExtension.setStaticField(Ofy.class, "clock", fakeClock); setupHelper = new DatastoreSetupHelper(tmpDir, fakeClock).initializeData();
exportRootDir = Files.createDirectory(tmpDir.resolve("exports")).toFile();
persistResource(newRegistry("com", "COM"));
registrar1 = persistResource(AppEngineExtension.makeRegistrar1());
registrar2 = persistResource(AppEngineExtension.makeRegistrar2());
Key<DomainBase> domainKey = Key.create(null, DomainBase.class, "4-COM");
hostResource =
persistResource(
new HostResource.Builder()
.setHostName("ns1.example.com")
.setSuperordinateDomain(VKey.from(domainKey))
.setRepoId("1-COM")
.setCreationRegistrarId(registrar1.getRegistrarId())
.setPersistedCurrentSponsorRegistrarId(registrar2.getRegistrarId())
.build());
contact1 =
persistResource(
new ContactResource.Builder()
.setContactId("contact_id1")
.setRepoId("2-COM")
.setCreationRegistrarId(registrar1.getRegistrarId())
.setPersistedCurrentSponsorRegistrarId(registrar2.getRegistrarId())
.build());
contact2 =
persistResource(
new ContactResource.Builder()
.setContactId("contact_id2")
.setRepoId("3-COM")
.setCreationRegistrarId(registrar1.getRegistrarId())
.setPersistedCurrentSponsorRegistrarId(registrar1.getRegistrarId())
.build());
persistSimpleResource(
new RegistrarContact.Builder()
.setParent(registrar1)
.setName("John Abused")
.setEmailAddress("johnabuse@example.com")
.setVisibleInWhoisAsAdmin(true)
.setVisibleInWhoisAsTech(false)
.setPhoneNumber("+1.2125551213")
.setFaxNumber("+1.2125551213")
.setTypes(ImmutableSet.of(RegistrarContact.Type.ABUSE, RegistrarContact.Type.ADMIN))
.build());
historyEntry =
persistResource(
new DomainHistory.Builder()
.setDomainRepoId(domainKey.getName())
.setModificationTime(fakeClock.nowUtc())
.setRegistrarId(registrar1.getRegistrarId())
.setType(HistoryEntry.Type.DOMAIN_CREATE)
.build());
persistResource(
new AllocationToken.Builder().setToken("abc123").setTokenType(SINGLE_USE).build());
Key<DomainHistory> historyEntryKey = Key.create(historyEntry);
BillingEvent.OneTime onetimeBillEvent =
new BillingEvent.OneTime.Builder()
.setId(1)
.setReason(Reason.RENEW)
.setTargetId("example.com")
.setRegistrarId("TheRegistrar")
.setCost(Money.parse("USD 44.00"))
.setPeriodYears(4)
.setEventTime(fakeClock.nowUtc())
.setBillingTime(fakeClock.nowUtc())
.setParent(historyEntryKey)
.build();
persistResource(onetimeBillEvent);
Key<BillingEvent.OneTime> oneTimeBillKey = Key.create(onetimeBillEvent);
BillingEvent.Recurring recurringBillEvent =
new BillingEvent.Recurring.Builder()
.setId(2)
.setReason(Reason.RENEW)
.setFlags(ImmutableSet.of(Flag.AUTO_RENEW))
.setTargetId("example.com")
.setRegistrarId("TheRegistrar")
.setEventTime(fakeClock.nowUtc())
.setRecurrenceEndTime(END_OF_TIME)
.setParent(historyEntryKey)
.build();
persistResource(recurringBillEvent);
VKey<BillingEvent.Recurring> recurringBillKey = recurringBillEvent.createVKey();
PollMessage.Autorenew autorenewPollMessage =
new PollMessage.Autorenew.Builder()
.setId(3L)
.setTargetId("example.com")
.setRegistrarId("TheRegistrar")
.setEventTime(fakeClock.nowUtc())
.setMsg("Domain was auto-renewed.")
.setParent(historyEntry)
.build();
persistResource(autorenewPollMessage);
VKey<PollMessage.Autorenew> autorenewPollKey = autorenewPollMessage.createVKey();
PollMessage.OneTime oneTimePollMessage =
new PollMessage.OneTime.Builder()
.setId(1L)
.setParent(historyEntry)
.setEventTime(fakeClock.nowUtc())
.setRegistrarId("TheRegistrar")
.setMsg(DomainFlowUtils.COLLISION_MESSAGE)
.build();
persistResource(oneTimePollMessage);
VKey<PollMessage.OneTime> onetimePollKey = oneTimePollMessage.createVKey();
domain =
persistResource(
new DomainBase.Builder()
.setDomainName("example.com")
.setRepoId("4-COM")
.setCreationRegistrarId(registrar1.getRegistrarId())
.setLastEppUpdateTime(fakeClock.nowUtc())
.setLastEppUpdateRegistrarId(registrar2.getRegistrarId())
.setLastTransferTime(fakeClock.nowUtc())
.setStatusValues(
ImmutableSet.of(
StatusValue.CLIENT_DELETE_PROHIBITED,
StatusValue.SERVER_DELETE_PROHIBITED,
StatusValue.SERVER_TRANSFER_PROHIBITED,
StatusValue.SERVER_UPDATE_PROHIBITED,
StatusValue.SERVER_RENEW_PROHIBITED,
StatusValue.SERVER_HOLD))
.setRegistrant(contact1.createVKey())
.setContacts(
ImmutableSet.of(
DesignatedContact.create(
DesignatedContact.Type.ADMIN, contact2.createVKey())))
.setNameservers(ImmutableSet.of(hostResource.createVKey()))
.setSubordinateHosts(ImmutableSet.of("ns1.example.com"))
.setPersistedCurrentSponsorRegistrarId(registrar2.getRegistrarId())
.setRegistrationExpirationTime(fakeClock.nowUtc().plusYears(1))
.setAuthInfo(DomainAuthInfo.create(PasswordAuth.create("password")))
.setDsData(
ImmutableSet.of(DelegationSignerData.create(1, 2, 3, new byte[] {0, 1, 2})))
.setLaunchNotice(
LaunchNotice.create("tcnid", "validatorId", START_OF_TIME, START_OF_TIME))
.setTransferData(
new DomainTransferData.Builder()
.setGainingRegistrarId(registrar1.getRegistrarId())
.setLosingRegistrarId(registrar2.getRegistrarId())
.setPendingTransferExpirationTime(fakeClock.nowUtc())
.setServerApproveEntities(
ImmutableSet.of(
VKey.from(oneTimeBillKey), recurringBillKey, autorenewPollKey))
.setServerApproveBillingEvent(VKey.from(oneTimeBillKey))
.setServerApproveAutorenewEvent(recurringBillKey)
.setServerApproveAutorenewPollMessage(autorenewPollKey)
.setTransferRequestTime(fakeClock.nowUtc().plusDays(1))
.setTransferStatus(TransferStatus.SERVER_APPROVED)
.setTransferRequestTrid(Trid.create("client-trid", "server-trid"))
.build())
.setDeletePollMessage(onetimePollKey)
.setAutorenewBillingEvent(recurringBillKey)
.setAutorenewPollMessage(autorenewPollKey)
.setSmdId("smdid")
.addGracePeriod(
GracePeriod.create(
GracePeriodStatus.ADD,
"4-COM",
fakeClock.nowUtc().plusDays(1),
"TheRegistrar",
null))
.build());
persistResource(
new BillingEvent.Cancellation.Builder()
.setReason(Reason.RENEW)
.setTargetId(domain.getDomainName())
.setRegistrarId(domain.getCurrentSponsorRegistrarId())
.setEventTime(fakeClock.nowUtc())
.setBillingTime(fakeClock.nowUtc())
.setRecurringEventKey(recurringBillEvent.createVKey())
.setParent(historyEntryKey)
.build());
globalCursor = persistResource(Cursor.createGlobal(RECURRING_BILLING, fakeClock.nowUtc()));
tldCursor = persistResource(Cursor.create(BRDA, fakeClock.nowUtc(), Registry.get("com")));
exportDir = store.export(exportRootDir.getAbsolutePath(), ALL_KINDS, ImmutableSet.of());
commitLogDir = Files.createDirectory(tmpDir.resolve("commits")).toFile();
fakeClock.advanceOneMilli();
}
} }
@Test @Test
@ -325,24 +79,27 @@ class InitSqlPipelineTest {
PipelineOptionsFactory.fromArgs( PipelineOptionsFactory.fromArgs(
"--commitLogStartTimestamp=" + START_TIME, "--commitLogStartTimestamp=" + START_TIME,
"--commitLogEndTimestamp=" + fakeClock.nowUtc().plusMillis(1), "--commitLogEndTimestamp=" + fakeClock.nowUtc().plusMillis(1),
"--datastoreExportDir=" + exportDir.getAbsolutePath(), "--datastoreExportDir=" + setupHelper.exportDir.getAbsolutePath(),
"--commitLogDir=" + commitLogDir.getAbsolutePath()) "--commitLogDir=" + setupHelper.commitLogDir.getAbsolutePath())
.withValidation() .withValidation()
.as(InitSqlPipelineOptions.class); .as(InitSqlPipelineOptions.class);
InitSqlPipeline initSqlPipeline = new InitSqlPipeline(options); InitSqlPipeline initSqlPipeline = new InitSqlPipeline(options);
initSqlPipeline.run(testPipeline).waitUntilFinish(); initSqlPipeline.run(testPipeline).waitUntilFinish();
assertHostResourceEquals( assertHostResourceEquals(
jpaTm().transact(() -> jpaTm().loadByKey(hostResource.createVKey())), hostResource); jpaTm().transact(() -> jpaTm().loadByKey(setupHelper.hostResource.createVKey())),
setupHelper.hostResource);
assertThat(jpaTm().transact(() -> jpaTm().loadAllOf(Registrar.class))) assertThat(jpaTm().transact(() -> jpaTm().loadAllOf(Registrar.class)))
.comparingElementsUsing(immutableObjectCorrespondence("lastUpdateTime")) .comparingElementsUsing(immutableObjectCorrespondence("lastUpdateTime"))
.containsExactly(registrar1, registrar2); .containsExactly(setupHelper.registrar1, setupHelper.registrar2);
assertThat(jpaTm().transact(() -> jpaTm().loadAllOf(ContactResource.class))) assertThat(jpaTm().transact(() -> jpaTm().loadAllOf(ContactResource.class)))
.comparingElementsUsing(immutableObjectCorrespondence("revisions", "updateTimestamp")) .comparingElementsUsing(immutableObjectCorrespondence("revisions", "updateTimestamp"))
.containsExactly(contact1, contact2); .containsExactly(setupHelper.contact1, setupHelper.contact2);
assertDomainEquals(jpaTm().transact(() -> jpaTm().loadByKey(domain.createVKey())), domain); assertDomainEquals(
jpaTm().transact(() -> jpaTm().loadByKey(setupHelper.domain.createVKey())),
setupHelper.domain);
assertThat(jpaTm().transact(() -> jpaTm().loadAllOf(Cursor.class))) assertThat(jpaTm().transact(() -> jpaTm().loadAllOf(Cursor.class)))
.comparingElementsUsing(immutableObjectCorrespondence()) .comparingElementsUsing(immutableObjectCorrespondence())
.containsExactly(globalCursor, tldCursor); .containsExactly(setupHelper.globalCursor, setupHelper.tldCursor);
} }
private static void assertHostResourceEquals(HostResource actual, HostResource expected) { private static void assertHostResourceEquals(HostResource actual, HostResource expected) {

View file

@ -50,8 +50,18 @@ import google.registry.testing.AppEngineExtension;
import google.registry.testing.DatabaseHelper; import google.registry.testing.DatabaseHelper;
import google.registry.testing.FakeClock; import google.registry.testing.FakeClock;
/** Entity creation utilities for domain-related tests. */ /**
class TestSetupHelper { * Utilities for managing domain-related SQL test scenarios.
*
* <p>The {@link #initializeAllEntities} method initializes the database, and the {@link
* #applyChangeToDomainAndHistory} makes one change to the domain, generating an additional history
* event. The most up-to-date values of the relevant entities are saved in public instance
* variables, {@link #domain} etc.
*
* <p>This class makes use of {@link DatabaseHelper}, which requires Datastore. Tests that use this
* class should use {@link AppEngineExtension}.
*/
public final class TestSetupHelper {
public static final String TLD = "tld"; public static final String TLD = "tld";
public static final String DOMAIN_REPO_ID = "4-TLD"; public static final String DOMAIN_REPO_ID = "4-TLD";
@ -60,21 +70,21 @@ class TestSetupHelper {
private final FakeClock fakeClock; private final FakeClock fakeClock;
Registry registry; public Registry registry;
Registrar registrar; public Registrar registrar;
ContactResource contact; public ContactResource contact;
DomainBase domain; public DomainBase domain;
DomainHistory domainHistory; public DomainHistory domainHistory;
HostResource host; public HostResource host;
private JpaTransactionManager originalJpaTm; private JpaTransactionManager originalJpaTm;
private JpaTransactionManager bulkQueryJpaTm; private JpaTransactionManager bulkQueryJpaTm;
TestSetupHelper(FakeClock fakeClock) { public TestSetupHelper(FakeClock fakeClock) {
this.fakeClock = fakeClock; this.fakeClock = fakeClock;
} }
void initializeAllEntities() { public void initializeAllEntities() {
registry = putInDb(DatabaseHelper.newRegistry(TLD, Ascii.toUpperCase(TLD))); registry = putInDb(DatabaseHelper.newRegistry(TLD, Ascii.toUpperCase(TLD)));
registrar = saveRegistrar(REGISTRAR_ID); registrar = saveRegistrar(REGISTRAR_ID);
contact = putInDb(createContact(DOMAIN_REPO_ID, REGISTRAR_ID)); contact = putInDb(createContact(DOMAIN_REPO_ID, REGISTRAR_ID));
@ -83,12 +93,12 @@ class TestSetupHelper {
host = putInDb(createHost()); host = putInDb(createHost());
} }
void applyChangeToDomainAndHistory() { public void applyChangeToDomainAndHistory() {
domain = putInDb(createFullDomain(contact, host, fakeClock)); domain = putInDb(createFullDomain(contact, host, fakeClock));
domainHistory = putInDb(createFullHistory(domain, fakeClock)); domainHistory = putInDb(createFullHistory(domain, fakeClock));
} }
void setupBulkQueryJpaTm(AppEngineExtension appEngineExtension) { public void setupBulkQueryJpaTm(AppEngineExtension appEngineExtension) {
bulkQueryJpaTm = bulkQueryJpaTm =
BulkQueryJpaFactory.createBulkQueryJpaTransactionManager( BulkQueryJpaFactory.createBulkQueryJpaTransactionManager(
appEngineExtension appEngineExtension
@ -101,7 +111,7 @@ class TestSetupHelper {
TransactionManagerFactory.setJpaTm(() -> bulkQueryJpaTm); TransactionManagerFactory.setJpaTm(() -> bulkQueryJpaTm);
} }
void tearDownBulkQueryJpaTm() { public void tearDownBulkQueryJpaTm() {
if (bulkQueryJpaTm != null) { if (bulkQueryJpaTm != null) {
bulkQueryJpaTm.teardown(); bulkQueryJpaTm.teardown();
TransactionManagerFactory.setJpaTm(() -> originalJpaTm); TransactionManagerFactory.setJpaTm(() -> originalJpaTm);