diff --git a/core/build.gradle b/core/build.gradle index e9ea4c2f2..9cb919016 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -79,6 +79,9 @@ def fragileTestPatterns = [ // Changes cache timeouts and for some reason appears to have contention // with other tests. "google/registry/whois/WhoisCommandFactoryTest.*", + // Currently changes a global configuration parameter that for some reason + // results in timestamp inversions for other tests. TODO(mmuller): fix. + "google/registry/flows/host/HostInfoFlowTest.*", ] + dockerIncompatibleTestPatterns sourceSets { diff --git a/core/src/main/java/google/registry/model/translators/CommitLogRevisionsTranslatorFactory.java b/core/src/main/java/google/registry/model/translators/CommitLogRevisionsTranslatorFactory.java index 266cd9f7c..e25a67fc8 100644 --- a/core/src/main/java/google/registry/model/translators/CommitLogRevisionsTranslatorFactory.java +++ b/core/src/main/java/google/registry/model/translators/CommitLogRevisionsTranslatorFactory.java @@ -17,13 +17,14 @@ package google.registry.model.translators; import static com.google.common.base.MoreObjects.firstNonNull; import static google.registry.config.RegistryConfig.getCommitLogDatastoreRetention; import static google.registry.model.ofy.ObjectifyService.auditedOfy; -import static google.registry.persistence.transaction.TransactionManagerFactory.tm; +import static google.registry.persistence.transaction.TransactionManagerFactory.ofyTm; import static google.registry.util.DateTimeUtils.START_OF_TIME; import com.google.common.collect.ImmutableSortedMap; import com.google.common.collect.Ordering; import com.googlecode.objectify.Key; import google.registry.model.ofy.CommitLogManifest; +import google.registry.persistence.transaction.Transaction; import org.joda.time.DateTime; /** @@ -58,12 +59,20 @@ public final class CommitLogRevisionsTranslatorFactory *

We store a maximum of one entry per day. It will be the last transaction that happened on * that day. * + *

In serialization mode, this method just returns "revisions" without modification. + * * @see google.registry.config.RegistryConfig#getCommitLogDatastoreRetention() */ @Override ImmutableSortedMap> transformBeforeSave( ImmutableSortedMap> revisions) { - DateTime now = tm().getTransactionTime(); + + // Don't do anything if we're just doing object serialization. + if (Transaction.inSerializationMode()) { + return revisions; + } + + DateTime now = ofyTm().getTransactionTime(); DateTime threshold = now.minus(getCommitLogDatastoreRetention()); DateTime preThresholdTime = firstNonNull(revisions.floorKey(threshold), START_OF_TIME); return new ImmutableSortedMap.Builder>(Ordering.natural()) diff --git a/core/src/main/java/google/registry/model/translators/CreateAutoTimestampTranslatorFactory.java b/core/src/main/java/google/registry/model/translators/CreateAutoTimestampTranslatorFactory.java index ff9e05eef..a7c28061e 100644 --- a/core/src/main/java/google/registry/model/translators/CreateAutoTimestampTranslatorFactory.java +++ b/core/src/main/java/google/registry/model/translators/CreateAutoTimestampTranslatorFactory.java @@ -15,10 +15,11 @@ package google.registry.model.translators; import static com.google.common.base.MoreObjects.firstNonNull; -import static google.registry.persistence.transaction.TransactionManagerFactory.tm; +import static google.registry.persistence.transaction.TransactionManagerFactory.ofyTm; import static org.joda.time.DateTimeZone.UTC; import google.registry.model.CreateAutoTimestamp; +import google.registry.persistence.transaction.Transaction; import java.util.Date; import org.joda.time.DateTime; @@ -46,7 +47,14 @@ public class CreateAutoTimestampTranslatorFactory /** Save a timestamp, setting it to the current time if it did not have a previous value. */ @Override public Date saveValue(CreateAutoTimestamp pojoValue) { - return firstNonNull(pojoValue.getTimestamp(), tm().getTransactionTime()).toDate(); - }}; + + // Don't do this if we're in the course of transaction serialization. + if (Transaction.inSerializationMode()) { + return pojoValue.getTimestamp() == null ? null : pojoValue.getTimestamp().toDate(); + } + + return firstNonNull(pojoValue.getTimestamp(), ofyTm().getTransactionTime()).toDate(); + } + }; } } diff --git a/core/src/main/java/google/registry/model/translators/UpdateAutoTimestampTranslatorFactory.java b/core/src/main/java/google/registry/model/translators/UpdateAutoTimestampTranslatorFactory.java index 21b461339..7c99c5a58 100644 --- a/core/src/main/java/google/registry/model/translators/UpdateAutoTimestampTranslatorFactory.java +++ b/core/src/main/java/google/registry/model/translators/UpdateAutoTimestampTranslatorFactory.java @@ -14,10 +14,11 @@ package google.registry.model.translators; -import static google.registry.persistence.transaction.TransactionManagerFactory.tm; +import static google.registry.persistence.transaction.TransactionManagerFactory.ofyTm; import static org.joda.time.DateTimeZone.UTC; import google.registry.model.UpdateAutoTimestamp; +import google.registry.persistence.transaction.Transaction; import java.util.Date; import org.joda.time.DateTime; @@ -46,8 +47,14 @@ public class UpdateAutoTimestampTranslatorFactory /** Save a timestamp, setting it to the current time. */ @Override public Date saveValue(UpdateAutoTimestamp pojoValue) { + + // Don't do this if we're in the course of transaction serialization. + if (Transaction.inSerializationMode()) { + return pojoValue.getTimestamp() == null ? null : pojoValue.getTimestamp().toDate(); + } + return UpdateAutoTimestamp.autoUpdateEnabled() - ? tm().getTransactionTime().toDate() + ? ofyTm().getTransactionTime().toDate() : pojoValue.getTimestamp().toDate(); } }; diff --git a/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManager.java b/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManager.java index 4e0db4e7d..ee6a60169 100644 --- a/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManager.java +++ b/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManager.java @@ -50,6 +50,11 @@ public interface JpaTransactionManager extends TransactionManager { */ Query query(String sqlString); + /** + * Execute the work in a transaction without recording the transaction for replay to datastore. + */ + T transactWithoutBackup(Supplier work); + /** Executes the work in a transaction with no retries and returns the result. */ T transactNoRetry(Supplier work); diff --git a/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManagerImpl.java b/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManagerImpl.java index fa4a9a285..a998eae27 100644 --- a/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManagerImpl.java +++ b/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManagerImpl.java @@ -41,6 +41,8 @@ import google.registry.model.server.KmsSecret; import google.registry.model.tmch.ClaimsList.ClaimsListSingleton; import google.registry.persistence.JpaRetries; import google.registry.persistence.VKey; +import google.registry.schema.replay.NonReplicatedEntity; +import google.registry.schema.replay.SqlOnlyEntity; import google.registry.util.Clock; import google.registry.util.Retrier; import google.registry.util.SystemSleeper; @@ -152,6 +154,15 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager { @Override public T transact(Supplier work) { + return transact(work, true); + } + + @Override + public T transactWithoutBackup(Supplier work) { + return transact(work, false); + } + + private T transact(Supplier work, boolean withBackup) { return retrier.callWithRetry( () -> { if (inTransaction()) { @@ -162,7 +173,7 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager { EntityTransaction txn = txnInfo.entityManager.getTransaction(); try { txn.begin(); - txnInfo.start(clock); + txnInfo.start(clock, withBackup); T result = work.get(); txnInfo.recordTransaction(); txn.commit(); @@ -194,7 +205,7 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager { EntityTransaction txn = txnInfo.entityManager.getTransaction(); try { txn.begin(); - txnInfo.start(clock); + txnInfo.start(clock, true); T result = work.get(); txnInfo.recordTransaction(); txn.commit(); @@ -740,11 +751,11 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager { Set objectsToSave = Collections.newSetFromMap(new IdentityHashMap()); /** Start a new transaction. */ - private void start(Clock clock) { + private void start(Clock clock, boolean withBackup) { checkArgumentNotNull(clock); inTransaction = true; transactionTime = clock.nowUtc(); - if (RegistryConfig.getCloudSqlReplicateTransactions()) { + if (withBackup && RegistryConfig.getCloudSqlReplicateTransactions()) { contentsBuilder = new Transaction.Builder(); } } @@ -763,17 +774,23 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager { } private void addUpdate(Object entity) { - if (contentsBuilder != null) { + if (contentsBuilder != null && shouldReplicate(entity.getClass())) { contentsBuilder.addUpdate(entity); } } private void addDelete(VKey key) { - if (contentsBuilder != null) { + if (contentsBuilder != null && shouldReplicate(key.getKind())) { contentsBuilder.addDelete(key); } } + /** Returns true if the entity class should be replicated from SQL to datastore. */ + private boolean shouldReplicate(Class entityClass) { + return !NonReplicatedEntity.class.isAssignableFrom(entityClass) + && !SqlOnlyEntity.class.isAssignableFrom(entityClass); + } + private void recordTransaction() { if (contentsBuilder != null) { Transaction persistedTxn = contentsBuilder.build(); diff --git a/core/src/main/java/google/registry/persistence/transaction/Transaction.java b/core/src/main/java/google/registry/persistence/transaction/Transaction.java index b93d1638b..efad3050d 100644 --- a/core/src/main/java/google/registry/persistence/transaction/Transaction.java +++ b/core/src/main/java/google/registry/persistence/transaction/Transaction.java @@ -18,7 +18,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static google.registry.model.ofy.ObjectifyService.auditedOfy; import static google.registry.persistence.transaction.TransactionManagerFactory.ofyTm; -import static google.registry.persistence.transaction.TransactionManagerFactory.tm; import com.google.appengine.api.datastore.Entity; import com.google.appengine.api.datastore.EntityTranslator; @@ -51,11 +50,17 @@ public class Transaction extends ImmutableObject implements Buildable { // unique and inherently informative. private static final int VERSION_ID = 20200604; + // Keep a per-thread flag to keep track of whether we're serializing an entity for a transaction. + // This is used by internal translators to avoid doing things that are dependent on being in a + // datastore transaction and alter the persisted representation of the entity. + private static ThreadLocal inSerializationMode = ThreadLocal.withInitial(() -> false); + private transient ImmutableList mutations; /** Write the entire transaction to the datastore in a datastore transaction. */ public void writeToDatastore() { - tm().transact( + ofyTm() + .transact( () -> { for (Mutation mutation : mutations) { mutation.writeToDatastore(); @@ -75,8 +80,13 @@ public class Transaction extends ImmutableObject implements Buildable { // Write all of the mutations, preceded by their count. out.writeInt(mutations.size()); - for (Mutation mutation : mutations) { - mutation.serializeTo(out); + try { + inSerializationMode.set(true); + for (Mutation mutation : mutations) { + mutation.serializeTo(out); + } + } finally { + inSerializationMode.set(false); } out.close(); @@ -114,6 +124,17 @@ public class Transaction extends ImmutableObject implements Buildable { return mutations.isEmpty(); } + /** + * Returns true if we are serializing a transaction in the current thread. + * + *

This should be checked by any Ofy translators prior to making any changes to an entity's + * state representation based on the assumption that we are currently pseristing the entity to + * datastore. + */ + public static boolean inSerializationMode() { + return inSerializationMode.get(); + } + @Override public Builder asBuilder() { return new Builder(clone(this)); diff --git a/core/src/main/java/google/registry/schema/replay/ReplicateToDatastoreAction.java b/core/src/main/java/google/registry/schema/replay/ReplicateToDatastoreAction.java index fa829b53c..5ec8a4866 100644 --- a/core/src/main/java/google/registry/schema/replay/ReplicateToDatastoreAction.java +++ b/core/src/main/java/google/registry/schema/replay/ReplicateToDatastoreAction.java @@ -42,7 +42,8 @@ import javax.persistence.NoResultException; method = GET, automaticallyPrintOk = true, auth = Auth.AUTH_INTERNAL_OR_ADMIN) -class ReplicateToDatastoreAction implements Runnable { +@VisibleForTesting +public class ReplicateToDatastoreAction implements Runnable { public static final String PATH = "/_dr/cron/replicateToDatastore"; private static final FluentLogger logger = FluentLogger.forEnclosingClass(); @@ -61,12 +62,12 @@ class ReplicateToDatastoreAction implements Runnable { } @VisibleForTesting - List getTransactionBatch() { + public List getTransactionBatch() { // Get the next batch of transactions that we haven't replicated. LastSqlTransaction lastSqlTxnBeforeBatch = ofyTm().transact(() -> LastSqlTransaction.load()); try { return jpaTm() - .transact( + .transactWithoutBackup( () -> jpaTm() .query( @@ -86,7 +87,7 @@ class ReplicateToDatastoreAction implements Runnable { * be aborted. */ @VisibleForTesting - boolean applyTransaction(TransactionEntity txnEntity) { + public boolean applyTransaction(TransactionEntity txnEntity) { logger.atInfo().log("Applying a single transaction Cloud SQL -> Cloud Datastore"); return ofyTm() .transact( diff --git a/core/src/test/java/google/registry/flows/host/HostInfoFlowTest.java b/core/src/test/java/google/registry/flows/host/HostInfoFlowTest.java index 7a312b647..a74c83c4b 100644 --- a/core/src/test/java/google/registry/flows/host/HostInfoFlowTest.java +++ b/core/src/test/java/google/registry/flows/host/HostInfoFlowTest.java @@ -51,7 +51,7 @@ class HostInfoFlowTest extends ResourceFlowTestCase @Order(value = Order.DEFAULT - 2) @RegisterExtension - final ReplayExtension replayExtension = ReplayExtension.createWithCompare(clock); + final ReplayExtension replayExtension = ReplayExtension.createWithDoubleReplay(clock); HostInfoFlowTest() { setEppInput("host_info.xml", ImmutableMap.of("HOSTNAME", "ns1.example.tld")); diff --git a/core/src/test/java/google/registry/testing/AppEngineExtension.java b/core/src/test/java/google/registry/testing/AppEngineExtension.java index 3ce7cb0ba..67dab888f 100644 --- a/core/src/test/java/google/registry/testing/AppEngineExtension.java +++ b/core/src/test/java/google/registry/testing/AppEngineExtension.java @@ -477,7 +477,7 @@ public final class AppEngineExtension implements BeforeEachCallback, AfterEachCa public void afterEach(ExtensionContext context) throws Exception { checkArgumentNotNull(context, "The ExtensionContext must not be null"); try { - // If there is a replay extension, we'll want to call its replayToSql() method. + // If there is a replay extension, we'll want to call its replay() method. // // We have to provide this hook here for ReplayExtension instead of relying on // ReplayExtension's afterEach() method because of ordering and the conflation of environment @@ -492,7 +492,7 @@ public final class AppEngineExtension implements BeforeEachCallback, AfterEachCa (ReplayExtension) context.getStore(ExtensionContext.Namespace.GLOBAL).get(ReplayExtension.class); if (replayer != null) { - replayer.replayToSql(); + replayer.replay(); } if (withCloudSql) { diff --git a/core/src/test/java/google/registry/testing/DualDatabaseTestInvocationContextProvider.java b/core/src/test/java/google/registry/testing/DualDatabaseTestInvocationContextProvider.java index 124b93658..05a3b4296 100644 --- a/core/src/test/java/google/registry/testing/DualDatabaseTestInvocationContextProvider.java +++ b/core/src/test/java/google/registry/testing/DualDatabaseTestInvocationContextProvider.java @@ -51,15 +51,23 @@ class DualDatabaseTestInvocationContextProvider implements TestTemplateInvocatio return true; } + /** + * Returns true if "context" is an objectify unit test. + * + *

Provided to allow ReplayExtension to make this determination. + */ + static boolean inOfyContext(ExtensionContext context) { + return (DatabaseType) context.getStore(NAMESPACE).get(INJECTED_TM_SUPPLIER_KEY) + == DatabaseType.OFY; + } + @Override public Stream provideTestTemplateInvocationContexts( ExtensionContext context) { TestTemplateInvocationContext ofyContext = - createInvocationContext( - context.getDisplayName() + " with Datastore", TransactionManagerFactory::ofyTm); + createInvocationContext(context.getDisplayName() + " with Datastore", DatabaseType.OFY); TestTemplateInvocationContext sqlContext = - createInvocationContext( - context.getDisplayName() + " with PostgreSQL", TransactionManagerFactory::jpaTm); + createInvocationContext(context.getDisplayName() + " with PostgreSQL", DatabaseType.JPA); Method testMethod = context.getTestMethod().orElseThrow(IllegalStateException::new); if (testMethod.isAnnotationPresent(TestOfyAndSql.class)) { return Stream.of(ofyContext, sqlContext); @@ -74,7 +82,7 @@ class DualDatabaseTestInvocationContextProvider implements TestTemplateInvocatio } private TestTemplateInvocationContext createInvocationContext( - String name, Supplier tmSupplier) { + String name, DatabaseType databaseType) { return new TestTemplateInvocationContext() { @Override public String getDisplayName(int invocationIndex) { @@ -83,17 +91,17 @@ class DualDatabaseTestInvocationContextProvider implements TestTemplateInvocatio @Override public List getAdditionalExtensions() { - return ImmutableList.of(new DatabaseSwitchInvocationContext(tmSupplier)); + return ImmutableList.of(new DatabaseSwitchInvocationContext(databaseType)); } }; } private static class DatabaseSwitchInvocationContext implements TestInstancePostProcessor { - private Supplier tmSupplier; + private DatabaseType databaseType; - private DatabaseSwitchInvocationContext(Supplier tmSupplier) { - this.tmSupplier = tmSupplier; + private DatabaseSwitchInvocationContext(DatabaseType databaseType) { + this.databaseType = databaseType; } @Override @@ -113,7 +121,7 @@ class DualDatabaseTestInvocationContextProvider implements TestTemplateInvocatio throw new IllegalStateException( "AppEngineExtension in @DualDatabaseTest test must set withDatastoreAndCloudSql()"); } - context.getStore(NAMESPACE).put(INJECTED_TM_SUPPLIER_KEY, tmSupplier); + context.getStore(NAMESPACE).put(INJECTED_TM_SUPPLIER_KEY, databaseType); } private static ImmutableList getAppEngineExtensionFields(Class clazz) { @@ -144,10 +152,9 @@ class DualDatabaseTestInvocationContextProvider implements TestTemplateInvocatio } }); context.getStore(NAMESPACE).put(ORIGINAL_TM_KEY, tm()); - Supplier tmSupplier = - (Supplier) - context.getStore(NAMESPACE).get(INJECTED_TM_SUPPLIER_KEY); - TransactionManagerFactory.setTm(tmSupplier.get()); + DatabaseType databaseType = + (DatabaseType) context.getStore(NAMESPACE).get(INJECTED_TM_SUPPLIER_KEY); + TransactionManagerFactory.setTm(databaseType.getTm()); } } @@ -171,4 +178,20 @@ class DualDatabaseTestInvocationContextProvider implements TestTemplateInvocatio return testInstance.getClass().isAnnotationPresent(DualDatabaseTest.class) && isDeclaredTestMethod; } + + private enum DatabaseType { + JPA(TransactionManagerFactory::jpaTm), + OFY(TransactionManagerFactory::ofyTm); + + @SuppressWarnings("Immutable") // Supplier is immutable, but not annotated as such. + private final Supplier supplier; + + DatabaseType(Supplier supplier) { + this.supplier = supplier; + } + + TransactionManager getTm() { + return supplier.get(); + } + } } diff --git a/core/src/test/java/google/registry/testing/ReplayExtension.java b/core/src/test/java/google/registry/testing/ReplayExtension.java index 1b015dc8d..659d3e841 100644 --- a/core/src/test/java/google/registry/testing/ReplayExtension.java +++ b/core/src/test/java/google/registry/testing/ReplayExtension.java @@ -22,13 +22,17 @@ import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.googlecode.objectify.Key; +import google.registry.config.RegistryConfig; import google.registry.model.ImmutableObject; import google.registry.model.ofy.CommitLogBucket; import google.registry.model.ofy.ReplayQueue; import google.registry.model.ofy.TransactionInfo; import google.registry.persistence.VKey; +import google.registry.persistence.transaction.TransactionEntity; import google.registry.schema.replay.DatastoreEntity; +import google.registry.schema.replay.ReplicateToDatastoreAction; import java.util.Optional; +import javax.annotation.Nullable; import org.junit.jupiter.api.extension.AfterEachCallback; import org.junit.jupiter.api.extension.BeforeEachCallback; import org.junit.jupiter.api.extension.ExtensionContext; @@ -48,19 +52,27 @@ public class ReplayExtension implements BeforeEachCallback, AfterEachCallback { FakeClock clock; boolean compare; + boolean replayed = false; + boolean inOfyContext; InjectExtension injectExtension = new InjectExtension(); + @Nullable ReplicateToDatastoreAction sqlToDsReplicator; - private ReplayExtension(FakeClock clock, boolean compare) { + private ReplayExtension( + FakeClock clock, boolean compare, @Nullable ReplicateToDatastoreAction sqlToDsReplicator) { this.clock = clock; this.compare = compare; + this.sqlToDsReplicator = sqlToDsReplicator; } public static ReplayExtension createWithCompare(FakeClock clock) { - return new ReplayExtension(clock, true); + return new ReplayExtension(clock, true, null); } - public static ReplayExtension createWithoutCompare(FakeClock clock) { - return new ReplayExtension(clock, false); + /** + * Create a replay extension that replays from SQL to cloud datastore when running in SQL mode. + */ + public static ReplayExtension createWithDoubleReplay(FakeClock clock) { + return new ReplayExtension(clock, true, new ReplicateToDatastoreAction(clock)); } @Override @@ -74,16 +86,27 @@ public class ReplayExtension implements BeforeEachCallback, AfterEachCallback { DatabaseHelper.setClock(clock); DatabaseHelper.setAlwaysSaveWithBackup(true); ReplayQueue.clear(); + + // When running in JPA mode with double replay enabled, enable JPA transaction replication. + // Note that we can't just use isOfy() here because this extension gets run before the dual-test + // transaction manager gets injected. + inOfyContext = DualDatabaseTestInvocationContextProvider.inOfyContext(context); + if (sqlToDsReplicator != null && !inOfyContext) { + RegistryConfig.overrideCloudSqlReplicateTransactions(true); + } + context.getStore(ExtensionContext.Namespace.GLOBAL).put(ReplayExtension.class, this); } @Override public void afterEach(ExtensionContext context) { // This ensures that we do the replay even if we're not called from AppEngineExtension. It - // should be safe to call replayToSql() twice, as the replay queue should be empty the second - // time. - replayToSql(); + // is safe to call replay() twice, as the method ensures idempotence. + replay(); injectExtension.afterEach(context); + if (sqlToDsReplicator != null) { + RegistryConfig.overrideCloudSqlReplicateTransactions(false); + } } private static ImmutableSet NON_REPLICATED_TYPES = @@ -104,7 +127,26 @@ public class ReplayExtension implements BeforeEachCallback, AfterEachCallback { "ForeignKeyContactIndex", "ForeignKeyDomainIndex"); - public void replayToSql() { + public void replay() { + if (!replayed) { + if (inOfyContext) { + replayToSql(); + } else { + // Disable database backups. For unknown reason, if we don't do this we get residual commit + // log entries that cause timestamp inversions in other tests. + DatabaseHelper.setAlwaysSaveWithBackup(false); + + // Do the ofy replay. + replayToOfy(); + + // Clean out anything that ends up in the replay queue. + ReplayQueue.clear(); + } + replayed = true; + } + } + + private void replayToSql() { DatabaseHelper.setAlwaysSaveWithBackup(false); ImmutableMap, Object> changes = ReplayQueue.replay(); @@ -139,4 +181,18 @@ public class ReplayExtension implements BeforeEachCallback, AfterEachCallback { } } } + + private void replayToOfy() { + if (sqlToDsReplicator == null) { + return; + } + + // TODO(mmuller): Verify that all entities are the same across both databases. + for (TransactionEntity txn : sqlToDsReplicator.getTransactionBatch()) { + if (sqlToDsReplicator.applyTransaction(txn)) { + break; + } + clock.advanceOneMilli(); + } + } }