From fe0c37eee6f581b2bd2dd952b24b05ddea6de6d8 Mon Sep 17 00:00:00 2001 From: Weimin Yu Date: Tue, 11 Jan 2022 14:17:32 -0500 Subject: [PATCH] Only compare recent changes in Datastore and SQL (#1485) * Only compare recent changes in Datastore and SQL When comparing Datastore and SQL, ignore older History and EPP resource objects. This cuts the run time in half compared with a full comparison. The intention is to run a full comparison before the switch-over from Datastore and SQL, and run this incremental comparison during the down time. The incremental comparison takes about 25 minutes in production. Performance can be improved further by filtering out older billing events (OneTime and Cancellation). However, we don't think further optimization is worth the effort (considering that Recurring events cannot be filtered since they are mutable but without lastUpdateTime). Verified in Sandbox and prod with and without time filter. --- .../beam/comparedb/DatastoreSnapshots.java | 32 ++- .../registry/beam/comparedb/SqlSnapshots.java | 219 +++++++++++++++--- .../beam/comparedb/ValidateSqlPipeline.java | 8 +- .../comparedb/ValidateSqlPipelineOptions.java | 14 +- .../comparedb/DatastoreSnapshotsTest.java | 4 +- .../beam/comparedb/SqlSnapshotsTest.java | 1 + 6 files changed, 236 insertions(+), 42 deletions(-) diff --git a/core/src/main/java/google/registry/beam/comparedb/DatastoreSnapshots.java b/core/src/main/java/google/registry/beam/comparedb/DatastoreSnapshots.java index b2a92e93d..c9277ce18 100644 --- a/core/src/main/java/google/registry/beam/comparedb/DatastoreSnapshots.java +++ b/core/src/main/java/google/registry/beam/comparedb/DatastoreSnapshots.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableSet; import com.googlecode.objectify.Key; import google.registry.backup.VersionedEntity; import google.registry.beam.initsql.Transforms; +import google.registry.model.EppResource; import google.registry.model.annotations.DeleteAfterMigration; import google.registry.model.billing.BillingEvent; import google.registry.model.common.Cursor; @@ -42,6 +43,7 @@ import google.registry.model.tld.Registry; import java.util.Map; import java.util.Optional; import java.util.Set; +import javax.annotation.Nullable; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; @@ -93,7 +95,8 @@ public final class DatastoreSnapshots { String commitLogDir, DateTime commitLogFromTime, DateTime commitLogToTime, - Set> kinds) { + Set> kinds, + Optional compareStartTime) { PCollectionTuple snapshot = pipeline.apply( "Load Datastore snapshot.", @@ -112,11 +115,11 @@ public final class DatastoreSnapshots { perTypeSnapshots = perTypeSnapshots.and( createSqlEntityTupleTag((Class) kind), - datastoreEntityToPojo(perKindSnapshot, kind.getSimpleName())); + datastoreEntityToPojo(perKindSnapshot, kind.getSimpleName(), compareStartTime)); continue; } Verify.verify(kind == HistoryEntry.class, "Unexpected Non-SqlEntity class: %s", kind); - PCollectionTuple historyEntriesByType = splitHistoryEntry(perKindSnapshot); + PCollectionTuple historyEntriesByType = splitHistoryEntry(perKindSnapshot, compareStartTime); for (Map.Entry, PCollection> entry : historyEntriesByType.getAll().entrySet()) { perTypeSnapshots = perTypeSnapshots.and(entry.getKey().getId(), entry.getValue()); @@ -129,7 +132,9 @@ public final class DatastoreSnapshots { * Splits a {@link PCollection} of {@link HistoryEntry HistoryEntries} into three collections of * its child entities by type. */ - static PCollectionTuple splitHistoryEntry(PCollection historyEntries) { + static PCollectionTuple splitHistoryEntry( + PCollection historyEntries, Optional compareStartTime) { + DateTime nullableStartTime = compareStartTime.orElse(null); return historyEntries.apply( "Split HistoryEntry by Resource Type", ParDo.of( @@ -138,6 +143,7 @@ public final class DatastoreSnapshots { public void processElement( @Element VersionedEntity historyEntry, MultiOutputReceiver out) { Optional.ofNullable(Transforms.convertVersionedEntityToSqlEntity(historyEntry)) + .filter(e -> isEntityIncludedForComparison(e, nullableStartTime)) .ifPresent( sqlEntity -> out.get(createSqlEntityTupleTag(sqlEntity.getClass())) @@ -155,7 +161,8 @@ public final class DatastoreSnapshots { * objects. */ static PCollection datastoreEntityToPojo( - PCollection entities, String desc) { + PCollection entities, String desc, Optional compareStartTime) { + DateTime nullableStartTime = compareStartTime.orElse(null); return entities.apply( "Datastore Entity to Pojo " + desc, ParDo.of( @@ -164,8 +171,23 @@ public final class DatastoreSnapshots { public void processElement( @Element VersionedEntity entity, OutputReceiver out) { Optional.ofNullable(Transforms.convertVersionedEntityToSqlEntity(entity)) + .filter(e -> isEntityIncludedForComparison(e, nullableStartTime)) .ifPresent(out::output); } })); } + + static boolean isEntityIncludedForComparison( + SqlEntity entity, @Nullable DateTime compareStartTime) { + if (compareStartTime == null) { + return true; + } + if (entity instanceof HistoryEntry) { + return compareStartTime.isBefore(((HistoryEntry) entity).getModificationTime()); + } + if (entity instanceof EppResource) { + return compareStartTime.isBefore(((EppResource) entity).getUpdateTimestamp().getTimestamp()); + } + return true; + } } diff --git a/core/src/main/java/google/registry/beam/comparedb/SqlSnapshots.java b/core/src/main/java/google/registry/beam/comparedb/SqlSnapshots.java index da1ff1093..fa05b9582 100644 --- a/core/src/main/java/google/registry/beam/comparedb/SqlSnapshots.java +++ b/core/src/main/java/google/registry/beam/comparedb/SqlSnapshots.java @@ -14,15 +14,22 @@ package google.registry.beam.comparedb; +import static com.google.common.base.Preconditions.checkState; import static google.registry.beam.comparedb.ValidateSqlUtils.createSqlEntityTupleTag; import static google.registry.beam.comparedb.ValidateSqlUtils.getMedianIdForHistoryTable; +import com.google.auto.value.AutoValue; +import com.google.common.base.Strings; import com.google.common.base.Verify; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSetMultimap; import com.google.common.collect.Streams; import google.registry.beam.common.RegistryJpaIO; +import google.registry.beam.common.RegistryJpaIO.Read; +import google.registry.model.EppResource; +import google.registry.model.UpdateAutoTimestamp; import google.registry.model.annotations.DeleteAfterMigration; import google.registry.model.billing.BillingEvent; import google.registry.model.bulkquery.BulkQueryEntities; @@ -50,8 +57,10 @@ import google.registry.model.replay.SqlEntity; import google.registry.model.reporting.DomainTransactionRecord; import google.registry.model.tld.Registry; import google.registry.persistence.transaction.CriteriaQueryBuilder; +import google.registry.util.DateTimeUtils; import java.io.Serializable; import java.util.Optional; +import javax.persistence.Entity; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; @@ -65,6 +74,7 @@ import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.TypeDescriptors; +import org.joda.time.DateTime; /** * Utilities for loading SQL snapshots. @@ -113,28 +123,48 @@ public final class SqlSnapshots { public static PCollectionTuple loadCloudSqlSnapshotByType( Pipeline pipeline, ImmutableSet> sqlEntityTypes, - Optional snapshotId) { + Optional snapshotId, + Optional compareStartTime) { PCollectionTuple perTypeSnapshots = PCollectionTuple.empty(pipeline); for (Class clazz : sqlEntityTypes) { if (clazz == DomainBase.class) { perTypeSnapshots = perTypeSnapshots.and( createSqlEntityTupleTag(DomainBase.class), - loadAndAssembleDomainBase(pipeline, snapshotId)); + loadAndAssembleDomainBase(pipeline, snapshotId, compareStartTime)); continue; } if (clazz == DomainHistory.class) { perTypeSnapshots = perTypeSnapshots.and( createSqlEntityTupleTag(DomainHistory.class), - loadAndAssembleDomainHistory(pipeline, snapshotId)); + loadAndAssembleDomainHistory(pipeline, snapshotId, compareStartTime)); continue; } if (clazz == ContactHistory.class) { perTypeSnapshots = perTypeSnapshots.and( createSqlEntityTupleTag(ContactHistory.class), - loadContactHistory(pipeline, snapshotId)); + loadContactHistory(pipeline, snapshotId, compareStartTime)); + continue; + } + if (clazz == HostHistory.class) { + perTypeSnapshots = + perTypeSnapshots.and( + createSqlEntityTupleTag(HostHistory.class), + loadHostHistory( + pipeline, snapshotId, compareStartTime.orElse(DateTimeUtils.START_OF_TIME))); + continue; + } + if (EppResource.class.isAssignableFrom(clazz) && compareStartTime.isPresent()) { + perTypeSnapshots = + perTypeSnapshots.and( + createSqlEntityTupleTag(clazz), + pipeline.apply( + "SQL Load " + clazz.getSimpleName(), + buildEppResourceQueryWithTimeFilter( + clazz, SqlEntity.class, snapshotId, compareStartTime.get()) + .withSnapshot(snapshotId.orElse(null)))); continue; } perTypeSnapshots = @@ -155,20 +185,33 @@ public final class SqlSnapshots { * @see BulkQueryEntities */ public static PCollection loadAndAssembleDomainBase( - Pipeline pipeline, Optional snapshotId) { + Pipeline pipeline, Optional snapshotId, Optional compareStartTime) { PCollection> baseObjects = - readAllAndAssignKey(pipeline, DomainBaseLite.class, DomainBaseLite::getRepoId, snapshotId); + readAllAndAssignKey( + pipeline, + DomainBaseLite.class, + DomainBaseLite::getRepoId, + snapshotId, + compareStartTime); PCollection> gracePeriods = - readAllAndAssignKey(pipeline, GracePeriod.class, GracePeriod::getDomainRepoId, snapshotId); + readAllAndAssignKey( + pipeline, + GracePeriod.class, + GracePeriod::getDomainRepoId, + snapshotId, + compareStartTime); PCollection> delegationSigners = readAllAndAssignKey( pipeline, DelegationSignerData.class, DelegationSignerData::getDomainRepoId, - snapshotId); + snapshotId, + compareStartTime); PCollection> domainHosts = - readAllAndAssignKey(pipeline, DomainHost.class, DomainHost::getDomainRepoId, snapshotId); + readAllAndAssignKey( + pipeline, DomainHost.class, DomainHost::getDomainRepoId, snapshotId, compareStartTime); + DateTime nullableCompareStartTime = compareStartTime.orElse(null); return PCollectionList.of( ImmutableList.of(baseObjects, gracePeriods, delegationSigners, domainHosts)) .apply("SQL Merge DomainBase parts", Flatten.pCollections()) @@ -184,6 +227,14 @@ public final class SqlSnapshots { TypedClassifier partsByType = new TypedClassifier(kv.getValue()); ImmutableSet baseObjects = partsByType.getAllOf(DomainBaseLite.class); + if (nullableCompareStartTime != null) { + Verify.verify( + baseObjects.size() <= 1, + "Found duplicate DomainBaseLite object per repoId: " + kv.getKey()); + if (baseObjects.isEmpty()) { + return; + } + } Verify.verify( baseObjects.size() == 1, "Expecting one DomainBaseLite object per repoId: " + kv.getKey()); @@ -205,16 +256,16 @@ public final class SqlSnapshots { *

This method uses two queries to load data in parallel. This is a performance optimization * specifically for the production database. */ - static PCollection loadContactHistory(Pipeline pipeline, Optional snapshotId) { - long medianId = - getMedianIdForHistoryTable("ContactHistory") - .orElseThrow( - () -> new IllegalStateException("Not a valid database: no ContactHistory.")); + static PCollection loadContactHistory( + Pipeline pipeline, Optional snapshotId, Optional compareStartTime) { + PartitionedQuery partitionedQuery = + buildPartitonedHistoryQuery(ContactHistory.class, compareStartTime); PCollection part1 = pipeline.apply( "SQL Load ContactHistory first half", RegistryJpaIO.read( - String.format("select c from ContactHistory c where id <= %s", medianId), + partitionedQuery.firstHalfQuery(), + partitionedQuery.parameters(), false, SqlEntity.class::cast) .withSnapshot(snapshotId.orElse(null))); @@ -222,7 +273,8 @@ public final class SqlSnapshots { pipeline.apply( "SQL Load ContactHistory second half", RegistryJpaIO.read( - String.format("select c from ContactHistory c where id > %s", medianId), + partitionedQuery.secondHalfQuery(), + partitionedQuery.parameters(), false, SqlEntity.class::cast) .withSnapshot(snapshotId.orElse(null))); @@ -231,6 +283,19 @@ public final class SqlSnapshots { .apply("Combine ContactHistory parts", Flatten.pCollections()); } + /** Loads all {@link HostHistory} entities from the database. */ + static PCollection loadHostHistory( + Pipeline pipeline, Optional snapshotId, DateTime compareStartTime) { + return pipeline.apply( + "SQL Load HostHistory", + RegistryJpaIO.read( + "select c from HostHistory c where :compareStartTime < modificationTime", + ImmutableMap.of("compareStartTime", compareStartTime), + false, + SqlEntity.class::cast) + .withSnapshot(snapshotId.orElse(null))); + } + /** * Bulk-loads all parts of {@link DomainHistory} and assembles them in the pipeline. * @@ -240,16 +305,15 @@ public final class SqlSnapshots { * @see BulkQueryEntities */ static PCollection loadAndAssembleDomainHistory( - Pipeline pipeline, Optional snapshotId) { - long medianId = - getMedianIdForHistoryTable("DomainHistory") - .orElseThrow( - () -> new IllegalStateException("Not a valid database: no DomainHistory.")); + Pipeline pipeline, Optional snapshotId, Optional compareStartTime) { + PartitionedQuery partitionedQuery = + buildPartitonedHistoryQuery(DomainHistoryLite.class, compareStartTime); PCollection> baseObjectsPart1 = queryAndAssignKey( pipeline, "first half", - String.format("select c from DomainHistory c where id <= %s", medianId), + partitionedQuery.firstHalfQuery(), + partitionedQuery.parameters(), DomainHistoryLite.class, compose(DomainHistoryLite::getDomainHistoryId, DomainHistoryId::toString), snapshotId); @@ -257,7 +321,8 @@ public final class SqlSnapshots { queryAndAssignKey( pipeline, "second half", - String.format("select c from DomainHistory c where id > %s", medianId), + partitionedQuery.secondHalfQuery(), + partitionedQuery.parameters(), DomainHistoryLite.class, compose(DomainHistoryLite::getDomainHistoryId, DomainHistoryId::toString), snapshotId); @@ -266,26 +331,31 @@ public final class SqlSnapshots { pipeline, GracePeriodHistory.class, compose(GracePeriodHistory::getDomainHistoryId, DomainHistoryId::toString), - snapshotId); + snapshotId, + compareStartTime); PCollection> delegationSigners = readAllAndAssignKey( pipeline, DomainDsDataHistory.class, compose(DomainDsDataHistory::getDomainHistoryId, DomainHistoryId::toString), - snapshotId); + snapshotId, + compareStartTime); PCollection> domainHosts = readAllAndAssignKey( pipeline, DomainHistoryHost.class, compose(DomainHistoryHost::getDomainHistoryId, DomainHistoryId::toString), - snapshotId); + snapshotId, + compareStartTime); PCollection> transactionRecords = readAllAndAssignKey( pipeline, DomainTransactionRecord.class, compose(DomainTransactionRecord::getDomainHistoryId, DomainHistoryId::toString), - snapshotId); + snapshotId, + compareStartTime); + DateTime nullableCompareStartTime = compareStartTime.orElse(null); return PCollectionList.of( ImmutableList.of( baseObjectsPart1, @@ -307,6 +377,15 @@ public final class SqlSnapshots { TypedClassifier partsByType = new TypedClassifier(kv.getValue()); ImmutableSet baseObjects = partsByType.getAllOf(DomainHistoryLite.class); + if (nullableCompareStartTime != null) { + Verify.verify( + baseObjects.size() <= 1, + "Found duplicate DomainHistoryLite object per domainHistoryId: " + + kv.getKey()); + if (baseObjects.isEmpty()) { + return; + } + } Verify.verify( baseObjects.size() == 1, "Expecting one DomainHistoryLite object per domainHistoryId: " @@ -328,12 +407,19 @@ public final class SqlSnapshots { Pipeline pipeline, Class type, SerializableFunction keyFunction, - Optional snapshotId) { + Optional snapshotId, + Optional compareStartTime) { + Read queryObject; + if (compareStartTime.isPresent() && EppResource.class.isAssignableFrom(type)) { + queryObject = + buildEppResourceQueryWithTimeFilter(type, type, snapshotId, compareStartTime.get()); + } else { + queryObject = + RegistryJpaIO.read(() -> CriteriaQueryBuilder.create(type).build()) + .withSnapshot(snapshotId.orElse(null)); + } return pipeline - .apply( - "SQL Load " + type.getSimpleName(), - RegistryJpaIO.read(() -> CriteriaQueryBuilder.create(type).build()) - .withSnapshot(snapshotId.orElse(null))) + .apply("SQL Load " + type.getSimpleName(), queryObject) .apply( "Assign Key to " + type.getSimpleName(), MapElements.into( @@ -346,13 +432,15 @@ public final class SqlSnapshots { Pipeline pipeline, String diffrentiator, String jplQuery, + ImmutableMap queryParameters, Class type, SerializableFunction keyFunction, Optional snapshotId) { return pipeline .apply( "SQL Load " + type.getSimpleName() + " " + diffrentiator, - RegistryJpaIO.read(jplQuery, false, type::cast).withSnapshot(snapshotId.orElse(null))) + RegistryJpaIO.read(jplQuery, queryParameters, false, type::cast) + .withSnapshot(snapshotId.orElse(null))) .apply( "Assign Key to " + type.getSimpleName() + " " + diffrentiator, MapElements.into( @@ -367,6 +455,71 @@ public final class SqlSnapshots { return r -> f2.apply(f1.apply(r)); } + static Read buildEppResourceQueryWithTimeFilter( + Class entityType, + Class castOutputAsType, + Optional snapshotId, + DateTime compareStartTime) { + String tableName = getJpaEntityName(entityType); + String jpql = + String.format("select c from %s c where :compareStartTime < updateTimestamp", tableName); + return RegistryJpaIO.read( + jpql, + ImmutableMap.of("compareStartTime", UpdateAutoTimestamp.create(compareStartTime)), + false, + (R x) -> castOutputAsType.cast(x)) + .withSnapshot(snapshotId.orElse(null)); + } + + static PartitionedQuery buildPartitonedHistoryQuery( + Class entityType, Optional compareStartTime) { + String tableName = getJpaEntityName(entityType); + Verify.verify( + !Strings.isNullOrEmpty(tableName), "Invalid entity type %s", entityType.getSimpleName()); + long medianId = + getMedianIdForHistoryTable(tableName) + .orElseThrow(() -> new IllegalStateException("Not a valid database: no " + tableName)); + String firstHalfQuery = String.format("select c from %s c where id <= :historyId", tableName); + String secondHalfQuery = String.format("select c from %s c where id > :historyId", tableName); + if (compareStartTime.isPresent()) { + String timeFilter = " and :compareStartTime < modificationTime"; + firstHalfQuery += timeFilter; + secondHalfQuery += timeFilter; + return PartitionedQuery.createPartitionedQuery( + firstHalfQuery, + secondHalfQuery, + ImmutableMap.of("historyId", medianId, "compareStartTime", compareStartTime.get())); + } else { + return PartitionedQuery.createPartitionedQuery( + firstHalfQuery, secondHalfQuery, ImmutableMap.of("historyId", medianId)); + } + } + + private static String getJpaEntityName(Class entityType) { + Entity entityAnnotation = (Entity) entityType.getAnnotation(Entity.class); + checkState( + entityAnnotation != null, "Unexpected non-entity type %s", entityType.getSimpleName()); + return Strings.isNullOrEmpty(entityAnnotation.name()) + ? entityType.getSimpleName() + : entityAnnotation.name(); + } + + /** Contains two queries that partition the target table in two. */ + @AutoValue + abstract static class PartitionedQuery { + abstract String firstHalfQuery(); + + abstract String secondHalfQuery(); + + abstract ImmutableMap parameters(); + + public static PartitionedQuery createPartitionedQuery( + String firstHalfQuery, String secondHalfQuery, ImmutableMap parameters) { + return new AutoValue_SqlSnapshots_PartitionedQuery( + firstHalfQuery, secondHalfQuery, parameters); + } + } + /** Container that receives mixed-typed data and groups them by {@link Class}. */ static class TypedClassifier { private final ImmutableSetMultimap, Object> classifiedEntities; diff --git a/core/src/main/java/google/registry/beam/comparedb/ValidateSqlPipeline.java b/core/src/main/java/google/registry/beam/comparedb/ValidateSqlPipeline.java index 86ccfacec..b9aace11d 100644 --- a/core/src/main/java/google/registry/beam/comparedb/ValidateSqlPipeline.java +++ b/core/src/main/java/google/registry/beam/comparedb/ValidateSqlPipeline.java @@ -126,6 +126,9 @@ public class ValidateSqlPipeline { .getCoderRegistry() .registerCoderForClass(SqlEntity.class, SerializableCoder.of(Serializable.class)); + Optional compareStartTime = + Optional.ofNullable(options.getComparisonStartTimestamp()).map(DateTime::parse); + PCollectionTuple datastoreSnapshot = DatastoreSnapshots.loadDatastoreSnapshotByKind( pipeline, @@ -135,11 +138,12 @@ public class ValidateSqlPipeline { // Increase by 1ms since we want to include commitLogs latestCommitLogTime but // this parameter is exclusive. latestCommitLogTime.plusMillis(1), - DatastoreSnapshots.ALL_DATASTORE_KINDS); + DatastoreSnapshots.ALL_DATASTORE_KINDS, + compareStartTime); PCollectionTuple cloudSqlSnapshot = SqlSnapshots.loadCloudSqlSnapshotByType( - pipeline, SqlSnapshots.ALL_SQL_ENTITIES, sqlSnapshotId); + pipeline, SqlSnapshots.ALL_SQL_ENTITIES, sqlSnapshotId, compareStartTime); verify( datastoreSnapshot.getAll().keySet().equals(cloudSqlSnapshot.getAll().keySet()), diff --git a/core/src/main/java/google/registry/beam/comparedb/ValidateSqlPipelineOptions.java b/core/src/main/java/google/registry/beam/comparedb/ValidateSqlPipelineOptions.java index 88b2c822c..a0f3f46ee 100644 --- a/core/src/main/java/google/registry/beam/comparedb/ValidateSqlPipelineOptions.java +++ b/core/src/main/java/google/registry/beam/comparedb/ValidateSqlPipelineOptions.java @@ -16,7 +16,19 @@ package google.registry.beam.comparedb; import google.registry.beam.common.RegistryPipelineOptions; import google.registry.model.annotations.DeleteAfterMigration; +import javax.annotation.Nullable; +import org.apache.beam.sdk.options.Description; /** BEAM pipeline options for {@link ValidateSqlPipeline}. */ @DeleteAfterMigration -public interface ValidateSqlPipelineOptions extends RegistryPipelineOptions {} +public interface ValidateSqlPipelineOptions extends RegistryPipelineOptions { + + @Description( + "For history entries and EPP resources, only those modified strictly after this time are " + + "included in comparison. Value is in ISO8601 format. " + + "Other entity types are not affected.") + @Nullable + String getComparisonStartTimestamp(); + + void setComparisonStartTimestamp(String comparisonStartTimestamp); +} diff --git a/core/src/test/java/google/registry/beam/comparedb/DatastoreSnapshotsTest.java b/core/src/test/java/google/registry/beam/comparedb/DatastoreSnapshotsTest.java index 2fb62a5f7..194391e88 100644 --- a/core/src/test/java/google/registry/beam/comparedb/DatastoreSnapshotsTest.java +++ b/core/src/test/java/google/registry/beam/comparedb/DatastoreSnapshotsTest.java @@ -28,6 +28,7 @@ import google.registry.testing.FakeClock; import google.registry.testing.InjectExtension; import java.io.Serializable; import java.nio.file.Path; +import java.util.Optional; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.values.PCollectionTuple; @@ -83,7 +84,8 @@ class DatastoreSnapshotsTest { setupHelper.commitLogDir.getAbsolutePath(), START_TIME, fakeClock.nowUtc().plusMillis(1), - ImmutableSet.copyOf(DatastoreSetupHelper.ALL_KINDS)); + ImmutableSet.copyOf(DatastoreSetupHelper.ALL_KINDS), + Optional.empty()); PAssert.that(tuple.get(ValidateSqlUtils.createSqlEntityTupleTag(Registrar.class))) .containsInAnyOrder(setupHelper.registrar1, setupHelper.registrar2); PAssert.that(tuple.get(ValidateSqlUtils.createSqlEntityTupleTag(DomainHistory.class))) diff --git a/core/src/test/java/google/registry/beam/comparedb/SqlSnapshotsTest.java b/core/src/test/java/google/registry/beam/comparedb/SqlSnapshotsTest.java index fcc168aa1..5c0d0823b 100644 --- a/core/src/test/java/google/registry/beam/comparedb/SqlSnapshotsTest.java +++ b/core/src/test/java/google/registry/beam/comparedb/SqlSnapshotsTest.java @@ -88,6 +88,7 @@ class SqlSnapshotsTest { DomainHistory.class, ContactResource.class, HostResource.class), + Optional.empty(), Optional.empty()); PAssert.that(sqlSnapshot.get(createSqlEntityTupleTag(Registry.class))) .containsInAnyOrder(setupHelper.registry);