Only compare recent changes in Datastore and SQL (#1485)

* Only compare recent changes in Datastore and SQL

When comparing Datastore and SQL, ignore older History and EPP resource
objects. This cuts the run time in half compared with a full comparison.
The intention is to run a full comparison before the switch-over from
Datastore and SQL, and run this incremental comparison during the down
time.

The incremental comparison takes about 25 minutes in production.
Performance can be improved further by filtering out older billing
events (OneTime and Cancellation). However, we don't think further
optimization is worth the effort (considering that Recurring events
cannot be filtered since they are mutable but without lastUpdateTime).

Verified in Sandbox and prod with and without time filter.
This commit is contained in:
Weimin Yu 2022-01-11 14:17:32 -05:00 committed by GitHub
parent 9df8d83963
commit fe0c37eee6
6 changed files with 236 additions and 42 deletions

View file

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableSet;
import com.googlecode.objectify.Key; import com.googlecode.objectify.Key;
import google.registry.backup.VersionedEntity; import google.registry.backup.VersionedEntity;
import google.registry.beam.initsql.Transforms; import google.registry.beam.initsql.Transforms;
import google.registry.model.EppResource;
import google.registry.model.annotations.DeleteAfterMigration; import google.registry.model.annotations.DeleteAfterMigration;
import google.registry.model.billing.BillingEvent; import google.registry.model.billing.BillingEvent;
import google.registry.model.common.Cursor; import google.registry.model.common.Cursor;
@ -42,6 +43,7 @@ import google.registry.model.tld.Registry;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.ParDo;
@ -93,7 +95,8 @@ public final class DatastoreSnapshots {
String commitLogDir, String commitLogDir,
DateTime commitLogFromTime, DateTime commitLogFromTime,
DateTime commitLogToTime, DateTime commitLogToTime,
Set<Class<?>> kinds) { Set<Class<?>> kinds,
Optional<DateTime> compareStartTime) {
PCollectionTuple snapshot = PCollectionTuple snapshot =
pipeline.apply( pipeline.apply(
"Load Datastore snapshot.", "Load Datastore snapshot.",
@ -112,11 +115,11 @@ public final class DatastoreSnapshots {
perTypeSnapshots = perTypeSnapshots =
perTypeSnapshots.and( perTypeSnapshots.and(
createSqlEntityTupleTag((Class<? extends SqlEntity>) kind), createSqlEntityTupleTag((Class<? extends SqlEntity>) kind),
datastoreEntityToPojo(perKindSnapshot, kind.getSimpleName())); datastoreEntityToPojo(perKindSnapshot, kind.getSimpleName(), compareStartTime));
continue; continue;
} }
Verify.verify(kind == HistoryEntry.class, "Unexpected Non-SqlEntity class: %s", kind); Verify.verify(kind == HistoryEntry.class, "Unexpected Non-SqlEntity class: %s", kind);
PCollectionTuple historyEntriesByType = splitHistoryEntry(perKindSnapshot); PCollectionTuple historyEntriesByType = splitHistoryEntry(perKindSnapshot, compareStartTime);
for (Map.Entry<TupleTag<?>, PCollection<?>> entry : for (Map.Entry<TupleTag<?>, PCollection<?>> entry :
historyEntriesByType.getAll().entrySet()) { historyEntriesByType.getAll().entrySet()) {
perTypeSnapshots = perTypeSnapshots.and(entry.getKey().getId(), entry.getValue()); perTypeSnapshots = perTypeSnapshots.and(entry.getKey().getId(), entry.getValue());
@ -129,7 +132,9 @@ public final class DatastoreSnapshots {
* Splits a {@link PCollection} of {@link HistoryEntry HistoryEntries} into three collections of * Splits a {@link PCollection} of {@link HistoryEntry HistoryEntries} into three collections of
* its child entities by type. * its child entities by type.
*/ */
static PCollectionTuple splitHistoryEntry(PCollection<VersionedEntity> historyEntries) { static PCollectionTuple splitHistoryEntry(
PCollection<VersionedEntity> historyEntries, Optional<DateTime> compareStartTime) {
DateTime nullableStartTime = compareStartTime.orElse(null);
return historyEntries.apply( return historyEntries.apply(
"Split HistoryEntry by Resource Type", "Split HistoryEntry by Resource Type",
ParDo.of( ParDo.of(
@ -138,6 +143,7 @@ public final class DatastoreSnapshots {
public void processElement( public void processElement(
@Element VersionedEntity historyEntry, MultiOutputReceiver out) { @Element VersionedEntity historyEntry, MultiOutputReceiver out) {
Optional.ofNullable(Transforms.convertVersionedEntityToSqlEntity(historyEntry)) Optional.ofNullable(Transforms.convertVersionedEntityToSqlEntity(historyEntry))
.filter(e -> isEntityIncludedForComparison(e, nullableStartTime))
.ifPresent( .ifPresent(
sqlEntity -> sqlEntity ->
out.get(createSqlEntityTupleTag(sqlEntity.getClass())) out.get(createSqlEntityTupleTag(sqlEntity.getClass()))
@ -155,7 +161,8 @@ public final class DatastoreSnapshots {
* objects. * objects.
*/ */
static PCollection<SqlEntity> datastoreEntityToPojo( static PCollection<SqlEntity> datastoreEntityToPojo(
PCollection<VersionedEntity> entities, String desc) { PCollection<VersionedEntity> entities, String desc, Optional<DateTime> compareStartTime) {
DateTime nullableStartTime = compareStartTime.orElse(null);
return entities.apply( return entities.apply(
"Datastore Entity to Pojo " + desc, "Datastore Entity to Pojo " + desc,
ParDo.of( ParDo.of(
@ -164,8 +171,23 @@ public final class DatastoreSnapshots {
public void processElement( public void processElement(
@Element VersionedEntity entity, OutputReceiver<SqlEntity> out) { @Element VersionedEntity entity, OutputReceiver<SqlEntity> out) {
Optional.ofNullable(Transforms.convertVersionedEntityToSqlEntity(entity)) Optional.ofNullable(Transforms.convertVersionedEntityToSqlEntity(entity))
.filter(e -> isEntityIncludedForComparison(e, nullableStartTime))
.ifPresent(out::output); .ifPresent(out::output);
} }
})); }));
} }
static boolean isEntityIncludedForComparison(
SqlEntity entity, @Nullable DateTime compareStartTime) {
if (compareStartTime == null) {
return true;
}
if (entity instanceof HistoryEntry) {
return compareStartTime.isBefore(((HistoryEntry) entity).getModificationTime());
}
if (entity instanceof EppResource) {
return compareStartTime.isBefore(((EppResource) entity).getUpdateTimestamp().getTimestamp());
}
return true;
}
} }

View file

@ -14,15 +14,22 @@
package google.registry.beam.comparedb; package google.registry.beam.comparedb;
import static com.google.common.base.Preconditions.checkState;
import static google.registry.beam.comparedb.ValidateSqlUtils.createSqlEntityTupleTag; import static google.registry.beam.comparedb.ValidateSqlUtils.createSqlEntityTupleTag;
import static google.registry.beam.comparedb.ValidateSqlUtils.getMedianIdForHistoryTable; import static google.registry.beam.comparedb.ValidateSqlUtils.getMedianIdForHistoryTable;
import com.google.auto.value.AutoValue;
import com.google.common.base.Strings;
import com.google.common.base.Verify; import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSetMultimap; import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.collect.Streams; import com.google.common.collect.Streams;
import google.registry.beam.common.RegistryJpaIO; import google.registry.beam.common.RegistryJpaIO;
import google.registry.beam.common.RegistryJpaIO.Read;
import google.registry.model.EppResource;
import google.registry.model.UpdateAutoTimestamp;
import google.registry.model.annotations.DeleteAfterMigration; import google.registry.model.annotations.DeleteAfterMigration;
import google.registry.model.billing.BillingEvent; import google.registry.model.billing.BillingEvent;
import google.registry.model.bulkquery.BulkQueryEntities; import google.registry.model.bulkquery.BulkQueryEntities;
@ -50,8 +57,10 @@ import google.registry.model.replay.SqlEntity;
import google.registry.model.reporting.DomainTransactionRecord; import google.registry.model.reporting.DomainTransactionRecord;
import google.registry.model.tld.Registry; import google.registry.model.tld.Registry;
import google.registry.persistence.transaction.CriteriaQueryBuilder; import google.registry.persistence.transaction.CriteriaQueryBuilder;
import google.registry.util.DateTimeUtils;
import java.io.Serializable; import java.io.Serializable;
import java.util.Optional; import java.util.Optional;
import javax.persistence.Entity;
import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.Flatten;
@ -65,6 +74,7 @@ import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.DateTime;
/** /**
* Utilities for loading SQL snapshots. * Utilities for loading SQL snapshots.
@ -113,28 +123,48 @@ public final class SqlSnapshots {
public static PCollectionTuple loadCloudSqlSnapshotByType( public static PCollectionTuple loadCloudSqlSnapshotByType(
Pipeline pipeline, Pipeline pipeline,
ImmutableSet<Class<? extends SqlEntity>> sqlEntityTypes, ImmutableSet<Class<? extends SqlEntity>> sqlEntityTypes,
Optional<String> snapshotId) { Optional<String> snapshotId,
Optional<DateTime> compareStartTime) {
PCollectionTuple perTypeSnapshots = PCollectionTuple.empty(pipeline); PCollectionTuple perTypeSnapshots = PCollectionTuple.empty(pipeline);
for (Class<? extends SqlEntity> clazz : sqlEntityTypes) { for (Class<? extends SqlEntity> clazz : sqlEntityTypes) {
if (clazz == DomainBase.class) { if (clazz == DomainBase.class) {
perTypeSnapshots = perTypeSnapshots =
perTypeSnapshots.and( perTypeSnapshots.and(
createSqlEntityTupleTag(DomainBase.class), createSqlEntityTupleTag(DomainBase.class),
loadAndAssembleDomainBase(pipeline, snapshotId)); loadAndAssembleDomainBase(pipeline, snapshotId, compareStartTime));
continue; continue;
} }
if (clazz == DomainHistory.class) { if (clazz == DomainHistory.class) {
perTypeSnapshots = perTypeSnapshots =
perTypeSnapshots.and( perTypeSnapshots.and(
createSqlEntityTupleTag(DomainHistory.class), createSqlEntityTupleTag(DomainHistory.class),
loadAndAssembleDomainHistory(pipeline, snapshotId)); loadAndAssembleDomainHistory(pipeline, snapshotId, compareStartTime));
continue; continue;
} }
if (clazz == ContactHistory.class) { if (clazz == ContactHistory.class) {
perTypeSnapshots = perTypeSnapshots =
perTypeSnapshots.and( perTypeSnapshots.and(
createSqlEntityTupleTag(ContactHistory.class), createSqlEntityTupleTag(ContactHistory.class),
loadContactHistory(pipeline, snapshotId)); loadContactHistory(pipeline, snapshotId, compareStartTime));
continue;
}
if (clazz == HostHistory.class) {
perTypeSnapshots =
perTypeSnapshots.and(
createSqlEntityTupleTag(HostHistory.class),
loadHostHistory(
pipeline, snapshotId, compareStartTime.orElse(DateTimeUtils.START_OF_TIME)));
continue;
}
if (EppResource.class.isAssignableFrom(clazz) && compareStartTime.isPresent()) {
perTypeSnapshots =
perTypeSnapshots.and(
createSqlEntityTupleTag(clazz),
pipeline.apply(
"SQL Load " + clazz.getSimpleName(),
buildEppResourceQueryWithTimeFilter(
clazz, SqlEntity.class, snapshotId, compareStartTime.get())
.withSnapshot(snapshotId.orElse(null))));
continue; continue;
} }
perTypeSnapshots = perTypeSnapshots =
@ -155,20 +185,33 @@ public final class SqlSnapshots {
* @see BulkQueryEntities * @see BulkQueryEntities
*/ */
public static PCollection<SqlEntity> loadAndAssembleDomainBase( public static PCollection<SqlEntity> loadAndAssembleDomainBase(
Pipeline pipeline, Optional<String> snapshotId) { Pipeline pipeline, Optional<String> snapshotId, Optional<DateTime> compareStartTime) {
PCollection<KV<String, Serializable>> baseObjects = PCollection<KV<String, Serializable>> baseObjects =
readAllAndAssignKey(pipeline, DomainBaseLite.class, DomainBaseLite::getRepoId, snapshotId); readAllAndAssignKey(
pipeline,
DomainBaseLite.class,
DomainBaseLite::getRepoId,
snapshotId,
compareStartTime);
PCollection<KV<String, Serializable>> gracePeriods = PCollection<KV<String, Serializable>> gracePeriods =
readAllAndAssignKey(pipeline, GracePeriod.class, GracePeriod::getDomainRepoId, snapshotId); readAllAndAssignKey(
pipeline,
GracePeriod.class,
GracePeriod::getDomainRepoId,
snapshotId,
compareStartTime);
PCollection<KV<String, Serializable>> delegationSigners = PCollection<KV<String, Serializable>> delegationSigners =
readAllAndAssignKey( readAllAndAssignKey(
pipeline, pipeline,
DelegationSignerData.class, DelegationSignerData.class,
DelegationSignerData::getDomainRepoId, DelegationSignerData::getDomainRepoId,
snapshotId); snapshotId,
compareStartTime);
PCollection<KV<String, Serializable>> domainHosts = PCollection<KV<String, Serializable>> domainHosts =
readAllAndAssignKey(pipeline, DomainHost.class, DomainHost::getDomainRepoId, snapshotId); readAllAndAssignKey(
pipeline, DomainHost.class, DomainHost::getDomainRepoId, snapshotId, compareStartTime);
DateTime nullableCompareStartTime = compareStartTime.orElse(null);
return PCollectionList.of( return PCollectionList.of(
ImmutableList.of(baseObjects, gracePeriods, delegationSigners, domainHosts)) ImmutableList.of(baseObjects, gracePeriods, delegationSigners, domainHosts))
.apply("SQL Merge DomainBase parts", Flatten.pCollections()) .apply("SQL Merge DomainBase parts", Flatten.pCollections())
@ -184,6 +227,14 @@ public final class SqlSnapshots {
TypedClassifier partsByType = new TypedClassifier(kv.getValue()); TypedClassifier partsByType = new TypedClassifier(kv.getValue());
ImmutableSet<DomainBaseLite> baseObjects = ImmutableSet<DomainBaseLite> baseObjects =
partsByType.getAllOf(DomainBaseLite.class); partsByType.getAllOf(DomainBaseLite.class);
if (nullableCompareStartTime != null) {
Verify.verify(
baseObjects.size() <= 1,
"Found duplicate DomainBaseLite object per repoId: " + kv.getKey());
if (baseObjects.isEmpty()) {
return;
}
}
Verify.verify( Verify.verify(
baseObjects.size() == 1, baseObjects.size() == 1,
"Expecting one DomainBaseLite object per repoId: " + kv.getKey()); "Expecting one DomainBaseLite object per repoId: " + kv.getKey());
@ -205,16 +256,16 @@ public final class SqlSnapshots {
* <p>This method uses two queries to load data in parallel. This is a performance optimization * <p>This method uses two queries to load data in parallel. This is a performance optimization
* specifically for the production database. * specifically for the production database.
*/ */
static PCollection<SqlEntity> loadContactHistory(Pipeline pipeline, Optional<String> snapshotId) { static PCollection<SqlEntity> loadContactHistory(
long medianId = Pipeline pipeline, Optional<String> snapshotId, Optional<DateTime> compareStartTime) {
getMedianIdForHistoryTable("ContactHistory") PartitionedQuery partitionedQuery =
.orElseThrow( buildPartitonedHistoryQuery(ContactHistory.class, compareStartTime);
() -> new IllegalStateException("Not a valid database: no ContactHistory."));
PCollection<SqlEntity> part1 = PCollection<SqlEntity> part1 =
pipeline.apply( pipeline.apply(
"SQL Load ContactHistory first half", "SQL Load ContactHistory first half",
RegistryJpaIO.read( RegistryJpaIO.read(
String.format("select c from ContactHistory c where id <= %s", medianId), partitionedQuery.firstHalfQuery(),
partitionedQuery.parameters(),
false, false,
SqlEntity.class::cast) SqlEntity.class::cast)
.withSnapshot(snapshotId.orElse(null))); .withSnapshot(snapshotId.orElse(null)));
@ -222,7 +273,8 @@ public final class SqlSnapshots {
pipeline.apply( pipeline.apply(
"SQL Load ContactHistory second half", "SQL Load ContactHistory second half",
RegistryJpaIO.read( RegistryJpaIO.read(
String.format("select c from ContactHistory c where id > %s", medianId), partitionedQuery.secondHalfQuery(),
partitionedQuery.parameters(),
false, false,
SqlEntity.class::cast) SqlEntity.class::cast)
.withSnapshot(snapshotId.orElse(null))); .withSnapshot(snapshotId.orElse(null)));
@ -231,6 +283,19 @@ public final class SqlSnapshots {
.apply("Combine ContactHistory parts", Flatten.pCollections()); .apply("Combine ContactHistory parts", Flatten.pCollections());
} }
/** Loads all {@link HostHistory} entities from the database. */
static PCollection<SqlEntity> loadHostHistory(
Pipeline pipeline, Optional<String> snapshotId, DateTime compareStartTime) {
return pipeline.apply(
"SQL Load HostHistory",
RegistryJpaIO.read(
"select c from HostHistory c where :compareStartTime < modificationTime",
ImmutableMap.of("compareStartTime", compareStartTime),
false,
SqlEntity.class::cast)
.withSnapshot(snapshotId.orElse(null)));
}
/** /**
* Bulk-loads all parts of {@link DomainHistory} and assembles them in the pipeline. * Bulk-loads all parts of {@link DomainHistory} and assembles them in the pipeline.
* *
@ -240,16 +305,15 @@ public final class SqlSnapshots {
* @see BulkQueryEntities * @see BulkQueryEntities
*/ */
static PCollection<SqlEntity> loadAndAssembleDomainHistory( static PCollection<SqlEntity> loadAndAssembleDomainHistory(
Pipeline pipeline, Optional<String> snapshotId) { Pipeline pipeline, Optional<String> snapshotId, Optional<DateTime> compareStartTime) {
long medianId = PartitionedQuery partitionedQuery =
getMedianIdForHistoryTable("DomainHistory") buildPartitonedHistoryQuery(DomainHistoryLite.class, compareStartTime);
.orElseThrow(
() -> new IllegalStateException("Not a valid database: no DomainHistory."));
PCollection<KV<String, Serializable>> baseObjectsPart1 = PCollection<KV<String, Serializable>> baseObjectsPart1 =
queryAndAssignKey( queryAndAssignKey(
pipeline, pipeline,
"first half", "first half",
String.format("select c from DomainHistory c where id <= %s", medianId), partitionedQuery.firstHalfQuery(),
partitionedQuery.parameters(),
DomainHistoryLite.class, DomainHistoryLite.class,
compose(DomainHistoryLite::getDomainHistoryId, DomainHistoryId::toString), compose(DomainHistoryLite::getDomainHistoryId, DomainHistoryId::toString),
snapshotId); snapshotId);
@ -257,7 +321,8 @@ public final class SqlSnapshots {
queryAndAssignKey( queryAndAssignKey(
pipeline, pipeline,
"second half", "second half",
String.format("select c from DomainHistory c where id > %s", medianId), partitionedQuery.secondHalfQuery(),
partitionedQuery.parameters(),
DomainHistoryLite.class, DomainHistoryLite.class,
compose(DomainHistoryLite::getDomainHistoryId, DomainHistoryId::toString), compose(DomainHistoryLite::getDomainHistoryId, DomainHistoryId::toString),
snapshotId); snapshotId);
@ -266,26 +331,31 @@ public final class SqlSnapshots {
pipeline, pipeline,
GracePeriodHistory.class, GracePeriodHistory.class,
compose(GracePeriodHistory::getDomainHistoryId, DomainHistoryId::toString), compose(GracePeriodHistory::getDomainHistoryId, DomainHistoryId::toString),
snapshotId); snapshotId,
compareStartTime);
PCollection<KV<String, Serializable>> delegationSigners = PCollection<KV<String, Serializable>> delegationSigners =
readAllAndAssignKey( readAllAndAssignKey(
pipeline, pipeline,
DomainDsDataHistory.class, DomainDsDataHistory.class,
compose(DomainDsDataHistory::getDomainHistoryId, DomainHistoryId::toString), compose(DomainDsDataHistory::getDomainHistoryId, DomainHistoryId::toString),
snapshotId); snapshotId,
compareStartTime);
PCollection<KV<String, Serializable>> domainHosts = PCollection<KV<String, Serializable>> domainHosts =
readAllAndAssignKey( readAllAndAssignKey(
pipeline, pipeline,
DomainHistoryHost.class, DomainHistoryHost.class,
compose(DomainHistoryHost::getDomainHistoryId, DomainHistoryId::toString), compose(DomainHistoryHost::getDomainHistoryId, DomainHistoryId::toString),
snapshotId); snapshotId,
compareStartTime);
PCollection<KV<String, Serializable>> transactionRecords = PCollection<KV<String, Serializable>> transactionRecords =
readAllAndAssignKey( readAllAndAssignKey(
pipeline, pipeline,
DomainTransactionRecord.class, DomainTransactionRecord.class,
compose(DomainTransactionRecord::getDomainHistoryId, DomainHistoryId::toString), compose(DomainTransactionRecord::getDomainHistoryId, DomainHistoryId::toString),
snapshotId); snapshotId,
compareStartTime);
DateTime nullableCompareStartTime = compareStartTime.orElse(null);
return PCollectionList.of( return PCollectionList.of(
ImmutableList.of( ImmutableList.of(
baseObjectsPart1, baseObjectsPart1,
@ -307,6 +377,15 @@ public final class SqlSnapshots {
TypedClassifier partsByType = new TypedClassifier(kv.getValue()); TypedClassifier partsByType = new TypedClassifier(kv.getValue());
ImmutableSet<DomainHistoryLite> baseObjects = ImmutableSet<DomainHistoryLite> baseObjects =
partsByType.getAllOf(DomainHistoryLite.class); partsByType.getAllOf(DomainHistoryLite.class);
if (nullableCompareStartTime != null) {
Verify.verify(
baseObjects.size() <= 1,
"Found duplicate DomainHistoryLite object per domainHistoryId: "
+ kv.getKey());
if (baseObjects.isEmpty()) {
return;
}
}
Verify.verify( Verify.verify(
baseObjects.size() == 1, baseObjects.size() == 1,
"Expecting one DomainHistoryLite object per domainHistoryId: " "Expecting one DomainHistoryLite object per domainHistoryId: "
@ -328,12 +407,19 @@ public final class SqlSnapshots {
Pipeline pipeline, Pipeline pipeline,
Class<R> type, Class<R> type,
SerializableFunction<R, String> keyFunction, SerializableFunction<R, String> keyFunction,
Optional<String> snapshotId) { Optional<String> snapshotId,
return pipeline Optional<DateTime> compareStartTime) {
.apply( Read<R, R> queryObject;
"SQL Load " + type.getSimpleName(), if (compareStartTime.isPresent() && EppResource.class.isAssignableFrom(type)) {
queryObject =
buildEppResourceQueryWithTimeFilter(type, type, snapshotId, compareStartTime.get());
} else {
queryObject =
RegistryJpaIO.read(() -> CriteriaQueryBuilder.create(type).build()) RegistryJpaIO.read(() -> CriteriaQueryBuilder.create(type).build())
.withSnapshot(snapshotId.orElse(null))) .withSnapshot(snapshotId.orElse(null));
}
return pipeline
.apply("SQL Load " + type.getSimpleName(), queryObject)
.apply( .apply(
"Assign Key to " + type.getSimpleName(), "Assign Key to " + type.getSimpleName(),
MapElements.into( MapElements.into(
@ -346,13 +432,15 @@ public final class SqlSnapshots {
Pipeline pipeline, Pipeline pipeline,
String diffrentiator, String diffrentiator,
String jplQuery, String jplQuery,
ImmutableMap<String, Object> queryParameters,
Class<R> type, Class<R> type,
SerializableFunction<R, String> keyFunction, SerializableFunction<R, String> keyFunction,
Optional<String> snapshotId) { Optional<String> snapshotId) {
return pipeline return pipeline
.apply( .apply(
"SQL Load " + type.getSimpleName() + " " + diffrentiator, "SQL Load " + type.getSimpleName() + " " + diffrentiator,
RegistryJpaIO.read(jplQuery, false, type::cast).withSnapshot(snapshotId.orElse(null))) RegistryJpaIO.read(jplQuery, queryParameters, false, type::cast)
.withSnapshot(snapshotId.orElse(null)))
.apply( .apply(
"Assign Key to " + type.getSimpleName() + " " + diffrentiator, "Assign Key to " + type.getSimpleName() + " " + diffrentiator,
MapElements.into( MapElements.into(
@ -367,6 +455,71 @@ public final class SqlSnapshots {
return r -> f2.apply(f1.apply(r)); return r -> f2.apply(f1.apply(r));
} }
static <R, T> Read<R, T> buildEppResourceQueryWithTimeFilter(
Class<R> entityType,
Class<T> castOutputAsType,
Optional<String> snapshotId,
DateTime compareStartTime) {
String tableName = getJpaEntityName(entityType);
String jpql =
String.format("select c from %s c where :compareStartTime < updateTimestamp", tableName);
return RegistryJpaIO.read(
jpql,
ImmutableMap.of("compareStartTime", UpdateAutoTimestamp.create(compareStartTime)),
false,
(R x) -> castOutputAsType.cast(x))
.withSnapshot(snapshotId.orElse(null));
}
static PartitionedQuery buildPartitonedHistoryQuery(
Class<?> entityType, Optional<DateTime> compareStartTime) {
String tableName = getJpaEntityName(entityType);
Verify.verify(
!Strings.isNullOrEmpty(tableName), "Invalid entity type %s", entityType.getSimpleName());
long medianId =
getMedianIdForHistoryTable(tableName)
.orElseThrow(() -> new IllegalStateException("Not a valid database: no " + tableName));
String firstHalfQuery = String.format("select c from %s c where id <= :historyId", tableName);
String secondHalfQuery = String.format("select c from %s c where id > :historyId", tableName);
if (compareStartTime.isPresent()) {
String timeFilter = " and :compareStartTime < modificationTime";
firstHalfQuery += timeFilter;
secondHalfQuery += timeFilter;
return PartitionedQuery.createPartitionedQuery(
firstHalfQuery,
secondHalfQuery,
ImmutableMap.of("historyId", medianId, "compareStartTime", compareStartTime.get()));
} else {
return PartitionedQuery.createPartitionedQuery(
firstHalfQuery, secondHalfQuery, ImmutableMap.of("historyId", medianId));
}
}
private static String getJpaEntityName(Class entityType) {
Entity entityAnnotation = (Entity) entityType.getAnnotation(Entity.class);
checkState(
entityAnnotation != null, "Unexpected non-entity type %s", entityType.getSimpleName());
return Strings.isNullOrEmpty(entityAnnotation.name())
? entityType.getSimpleName()
: entityAnnotation.name();
}
/** Contains two queries that partition the target table in two. */
@AutoValue
abstract static class PartitionedQuery {
abstract String firstHalfQuery();
abstract String secondHalfQuery();
abstract ImmutableMap<String, Object> parameters();
public static PartitionedQuery createPartitionedQuery(
String firstHalfQuery, String secondHalfQuery, ImmutableMap<String, Object> parameters) {
return new AutoValue_SqlSnapshots_PartitionedQuery(
firstHalfQuery, secondHalfQuery, parameters);
}
}
/** Container that receives mixed-typed data and groups them by {@link Class}. */ /** Container that receives mixed-typed data and groups them by {@link Class}. */
static class TypedClassifier { static class TypedClassifier {
private final ImmutableSetMultimap<Class<?>, Object> classifiedEntities; private final ImmutableSetMultimap<Class<?>, Object> classifiedEntities;

View file

@ -126,6 +126,9 @@ public class ValidateSqlPipeline {
.getCoderRegistry() .getCoderRegistry()
.registerCoderForClass(SqlEntity.class, SerializableCoder.of(Serializable.class)); .registerCoderForClass(SqlEntity.class, SerializableCoder.of(Serializable.class));
Optional<DateTime> compareStartTime =
Optional.ofNullable(options.getComparisonStartTimestamp()).map(DateTime::parse);
PCollectionTuple datastoreSnapshot = PCollectionTuple datastoreSnapshot =
DatastoreSnapshots.loadDatastoreSnapshotByKind( DatastoreSnapshots.loadDatastoreSnapshotByKind(
pipeline, pipeline,
@ -135,11 +138,12 @@ public class ValidateSqlPipeline {
// Increase by 1ms since we want to include commitLogs latestCommitLogTime but // Increase by 1ms since we want to include commitLogs latestCommitLogTime but
// this parameter is exclusive. // this parameter is exclusive.
latestCommitLogTime.plusMillis(1), latestCommitLogTime.plusMillis(1),
DatastoreSnapshots.ALL_DATASTORE_KINDS); DatastoreSnapshots.ALL_DATASTORE_KINDS,
compareStartTime);
PCollectionTuple cloudSqlSnapshot = PCollectionTuple cloudSqlSnapshot =
SqlSnapshots.loadCloudSqlSnapshotByType( SqlSnapshots.loadCloudSqlSnapshotByType(
pipeline, SqlSnapshots.ALL_SQL_ENTITIES, sqlSnapshotId); pipeline, SqlSnapshots.ALL_SQL_ENTITIES, sqlSnapshotId, compareStartTime);
verify( verify(
datastoreSnapshot.getAll().keySet().equals(cloudSqlSnapshot.getAll().keySet()), datastoreSnapshot.getAll().keySet().equals(cloudSqlSnapshot.getAll().keySet()),

View file

@ -16,7 +16,19 @@ package google.registry.beam.comparedb;
import google.registry.beam.common.RegistryPipelineOptions; import google.registry.beam.common.RegistryPipelineOptions;
import google.registry.model.annotations.DeleteAfterMigration; import google.registry.model.annotations.DeleteAfterMigration;
import javax.annotation.Nullable;
import org.apache.beam.sdk.options.Description;
/** BEAM pipeline options for {@link ValidateSqlPipeline}. */ /** BEAM pipeline options for {@link ValidateSqlPipeline}. */
@DeleteAfterMigration @DeleteAfterMigration
public interface ValidateSqlPipelineOptions extends RegistryPipelineOptions {} public interface ValidateSqlPipelineOptions extends RegistryPipelineOptions {
@Description(
"For history entries and EPP resources, only those modified strictly after this time are "
+ "included in comparison. Value is in ISO8601 format. "
+ "Other entity types are not affected.")
@Nullable
String getComparisonStartTimestamp();
void setComparisonStartTimestamp(String comparisonStartTimestamp);
}

View file

@ -28,6 +28,7 @@ import google.registry.testing.FakeClock;
import google.registry.testing.InjectExtension; import google.registry.testing.InjectExtension;
import java.io.Serializable; import java.io.Serializable;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.Optional;
import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionTuple;
@ -83,7 +84,8 @@ class DatastoreSnapshotsTest {
setupHelper.commitLogDir.getAbsolutePath(), setupHelper.commitLogDir.getAbsolutePath(),
START_TIME, START_TIME,
fakeClock.nowUtc().plusMillis(1), fakeClock.nowUtc().plusMillis(1),
ImmutableSet.copyOf(DatastoreSetupHelper.ALL_KINDS)); ImmutableSet.copyOf(DatastoreSetupHelper.ALL_KINDS),
Optional.empty());
PAssert.that(tuple.get(ValidateSqlUtils.createSqlEntityTupleTag(Registrar.class))) PAssert.that(tuple.get(ValidateSqlUtils.createSqlEntityTupleTag(Registrar.class)))
.containsInAnyOrder(setupHelper.registrar1, setupHelper.registrar2); .containsInAnyOrder(setupHelper.registrar1, setupHelper.registrar2);
PAssert.that(tuple.get(ValidateSqlUtils.createSqlEntityTupleTag(DomainHistory.class))) PAssert.that(tuple.get(ValidateSqlUtils.createSqlEntityTupleTag(DomainHistory.class)))

View file

@ -88,6 +88,7 @@ class SqlSnapshotsTest {
DomainHistory.class, DomainHistory.class,
ContactResource.class, ContactResource.class,
HostResource.class), HostResource.class),
Optional.empty(),
Optional.empty()); Optional.empty());
PAssert.that(sqlSnapshot.get(createSqlEntityTupleTag(Registry.class))) PAssert.that(sqlSnapshot.get(createSqlEntityTupleTag(Registry.class)))
.containsInAnyOrder(setupHelper.registry); .containsInAnyOrder(setupHelper.registry);