From 6d26f3cc0e085b4ac32c66c228f7492fa66d161e Mon Sep 17 00:00:00 2001 From: Michael Muller Date: Tue, 29 Jun 2021 10:00:39 -0400 Subject: [PATCH] Support testing SQL -> DS replication in ReplayExt (#1216) * Support testing SQL -> DS replication in ReplayExt Support testing of Postgres -> Datastore replication in the ReplayExtension when running in SQL mode in a DualDatabaseTest. This is currently only enabled for one test (HostInfoFlowTest) since this form of replication is likely to be problematic in many cases. As part of this change: - Add a thread-local flag so that we don't attempt to do certain data transformations when serializing entities for storage in a Transaction record. (These typically need to be called in a datastore transaction). - Replace tm() in datastore translators with ofyTm() (these should only be called from within an ofy transaction) and also in the replay system itself. - Add a transactWithoutBackup() method for use within the replay itself. - Prevent replication of entities that are not intended to be replicated. - Make some of the ReplicateToDatastoreAction methods public so we can invoke them from ReplayExtension. - Change the way that the test type is stored in the extension context in a DualDatabaseTest so that we can check for it from the ReplayExtension. * Limit number of tests and show output Trying to debug why these are failing in kokoro. * Move HostInfoFlowTest to fragile for now The test now manipulates a globel variable that causes problems for other tests. There's likely a better fix for this, but for purposes of this PR we can just move it to "fragile." * Fix a few more problems - "replay" flag should have been initialized to false -- as it stands, replay wasn't happening. - disable "always save with backup" in the datastore helper, we were apparently getting some unwanted commit log entries that were causing timestamp inversions in other tests. Also clear out the replay queue just for good hygiene. - Check for a null replicator in replayToOfy before proceeding. - Use a local inOfyContext flag to track whether we're in ofy context, as the tm() function is less reliable in dual-database tests. --- core/build.gradle | 3 + .../CommitLogRevisionsTranslatorFactory.java | 13 +++- .../CreateAutoTimestampTranslatorFactory.java | 14 +++- .../UpdateAutoTimestampTranslatorFactory.java | 11 ++- .../transaction/JpaTransactionManager.java | 5 ++ .../JpaTransactionManagerImpl.java | 29 ++++++-- .../persistence/transaction/Transaction.java | 29 ++++++-- .../replay/ReplicateToDatastoreAction.java | 9 +-- .../registry/flows/host/HostInfoFlowTest.java | 2 +- .../registry/testing/AppEngineExtension.java | 4 +- ...DatabaseTestInvocationContextProvider.java | 51 +++++++++---- .../registry/testing/ReplayExtension.java | 72 ++++++++++++++++--- 12 files changed, 196 insertions(+), 46 deletions(-) diff --git a/core/build.gradle b/core/build.gradle index e9ea4c2f2..9cb919016 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -79,6 +79,9 @@ def fragileTestPatterns = [ // Changes cache timeouts and for some reason appears to have contention // with other tests. "google/registry/whois/WhoisCommandFactoryTest.*", + // Currently changes a global configuration parameter that for some reason + // results in timestamp inversions for other tests. TODO(mmuller): fix. + "google/registry/flows/host/HostInfoFlowTest.*", ] + dockerIncompatibleTestPatterns sourceSets { diff --git a/core/src/main/java/google/registry/model/translators/CommitLogRevisionsTranslatorFactory.java b/core/src/main/java/google/registry/model/translators/CommitLogRevisionsTranslatorFactory.java index 266cd9f7c..e25a67fc8 100644 --- a/core/src/main/java/google/registry/model/translators/CommitLogRevisionsTranslatorFactory.java +++ b/core/src/main/java/google/registry/model/translators/CommitLogRevisionsTranslatorFactory.java @@ -17,13 +17,14 @@ package google.registry.model.translators; import static com.google.common.base.MoreObjects.firstNonNull; import static google.registry.config.RegistryConfig.getCommitLogDatastoreRetention; import static google.registry.model.ofy.ObjectifyService.auditedOfy; -import static google.registry.persistence.transaction.TransactionManagerFactory.tm; +import static google.registry.persistence.transaction.TransactionManagerFactory.ofyTm; import static google.registry.util.DateTimeUtils.START_OF_TIME; import com.google.common.collect.ImmutableSortedMap; import com.google.common.collect.Ordering; import com.googlecode.objectify.Key; import google.registry.model.ofy.CommitLogManifest; +import google.registry.persistence.transaction.Transaction; import org.joda.time.DateTime; /** @@ -58,12 +59,20 @@ public final class CommitLogRevisionsTranslatorFactory *

We store a maximum of one entry per day. It will be the last transaction that happened on * that day. * + *

In serialization mode, this method just returns "revisions" without modification. + * * @see google.registry.config.RegistryConfig#getCommitLogDatastoreRetention() */ @Override ImmutableSortedMap> transformBeforeSave( ImmutableSortedMap> revisions) { - DateTime now = tm().getTransactionTime(); + + // Don't do anything if we're just doing object serialization. + if (Transaction.inSerializationMode()) { + return revisions; + } + + DateTime now = ofyTm().getTransactionTime(); DateTime threshold = now.minus(getCommitLogDatastoreRetention()); DateTime preThresholdTime = firstNonNull(revisions.floorKey(threshold), START_OF_TIME); return new ImmutableSortedMap.Builder>(Ordering.natural()) diff --git a/core/src/main/java/google/registry/model/translators/CreateAutoTimestampTranslatorFactory.java b/core/src/main/java/google/registry/model/translators/CreateAutoTimestampTranslatorFactory.java index ff9e05eef..a7c28061e 100644 --- a/core/src/main/java/google/registry/model/translators/CreateAutoTimestampTranslatorFactory.java +++ b/core/src/main/java/google/registry/model/translators/CreateAutoTimestampTranslatorFactory.java @@ -15,10 +15,11 @@ package google.registry.model.translators; import static com.google.common.base.MoreObjects.firstNonNull; -import static google.registry.persistence.transaction.TransactionManagerFactory.tm; +import static google.registry.persistence.transaction.TransactionManagerFactory.ofyTm; import static org.joda.time.DateTimeZone.UTC; import google.registry.model.CreateAutoTimestamp; +import google.registry.persistence.transaction.Transaction; import java.util.Date; import org.joda.time.DateTime; @@ -46,7 +47,14 @@ public class CreateAutoTimestampTranslatorFactory /** Save a timestamp, setting it to the current time if it did not have a previous value. */ @Override public Date saveValue(CreateAutoTimestamp pojoValue) { - return firstNonNull(pojoValue.getTimestamp(), tm().getTransactionTime()).toDate(); - }}; + + // Don't do this if we're in the course of transaction serialization. + if (Transaction.inSerializationMode()) { + return pojoValue.getTimestamp() == null ? null : pojoValue.getTimestamp().toDate(); + } + + return firstNonNull(pojoValue.getTimestamp(), ofyTm().getTransactionTime()).toDate(); + } + }; } } diff --git a/core/src/main/java/google/registry/model/translators/UpdateAutoTimestampTranslatorFactory.java b/core/src/main/java/google/registry/model/translators/UpdateAutoTimestampTranslatorFactory.java index 21b461339..7c99c5a58 100644 --- a/core/src/main/java/google/registry/model/translators/UpdateAutoTimestampTranslatorFactory.java +++ b/core/src/main/java/google/registry/model/translators/UpdateAutoTimestampTranslatorFactory.java @@ -14,10 +14,11 @@ package google.registry.model.translators; -import static google.registry.persistence.transaction.TransactionManagerFactory.tm; +import static google.registry.persistence.transaction.TransactionManagerFactory.ofyTm; import static org.joda.time.DateTimeZone.UTC; import google.registry.model.UpdateAutoTimestamp; +import google.registry.persistence.transaction.Transaction; import java.util.Date; import org.joda.time.DateTime; @@ -46,8 +47,14 @@ public class UpdateAutoTimestampTranslatorFactory /** Save a timestamp, setting it to the current time. */ @Override public Date saveValue(UpdateAutoTimestamp pojoValue) { + + // Don't do this if we're in the course of transaction serialization. + if (Transaction.inSerializationMode()) { + return pojoValue.getTimestamp() == null ? null : pojoValue.getTimestamp().toDate(); + } + return UpdateAutoTimestamp.autoUpdateEnabled() - ? tm().getTransactionTime().toDate() + ? ofyTm().getTransactionTime().toDate() : pojoValue.getTimestamp().toDate(); } }; diff --git a/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManager.java b/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManager.java index 4e0db4e7d..ee6a60169 100644 --- a/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManager.java +++ b/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManager.java @@ -50,6 +50,11 @@ public interface JpaTransactionManager extends TransactionManager { */ Query query(String sqlString); + /** + * Execute the work in a transaction without recording the transaction for replay to datastore. + */ + T transactWithoutBackup(Supplier work); + /** Executes the work in a transaction with no retries and returns the result. */ T transactNoRetry(Supplier work); diff --git a/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManagerImpl.java b/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManagerImpl.java index fa4a9a285..a998eae27 100644 --- a/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManagerImpl.java +++ b/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManagerImpl.java @@ -41,6 +41,8 @@ import google.registry.model.server.KmsSecret; import google.registry.model.tmch.ClaimsList.ClaimsListSingleton; import google.registry.persistence.JpaRetries; import google.registry.persistence.VKey; +import google.registry.schema.replay.NonReplicatedEntity; +import google.registry.schema.replay.SqlOnlyEntity; import google.registry.util.Clock; import google.registry.util.Retrier; import google.registry.util.SystemSleeper; @@ -152,6 +154,15 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager { @Override public T transact(Supplier work) { + return transact(work, true); + } + + @Override + public T transactWithoutBackup(Supplier work) { + return transact(work, false); + } + + private T transact(Supplier work, boolean withBackup) { return retrier.callWithRetry( () -> { if (inTransaction()) { @@ -162,7 +173,7 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager { EntityTransaction txn = txnInfo.entityManager.getTransaction(); try { txn.begin(); - txnInfo.start(clock); + txnInfo.start(clock, withBackup); T result = work.get(); txnInfo.recordTransaction(); txn.commit(); @@ -194,7 +205,7 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager { EntityTransaction txn = txnInfo.entityManager.getTransaction(); try { txn.begin(); - txnInfo.start(clock); + txnInfo.start(clock, true); T result = work.get(); txnInfo.recordTransaction(); txn.commit(); @@ -740,11 +751,11 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager { Set objectsToSave = Collections.newSetFromMap(new IdentityHashMap()); /** Start a new transaction. */ - private void start(Clock clock) { + private void start(Clock clock, boolean withBackup) { checkArgumentNotNull(clock); inTransaction = true; transactionTime = clock.nowUtc(); - if (RegistryConfig.getCloudSqlReplicateTransactions()) { + if (withBackup && RegistryConfig.getCloudSqlReplicateTransactions()) { contentsBuilder = new Transaction.Builder(); } } @@ -763,17 +774,23 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager { } private void addUpdate(Object entity) { - if (contentsBuilder != null) { + if (contentsBuilder != null && shouldReplicate(entity.getClass())) { contentsBuilder.addUpdate(entity); } } private void addDelete(VKey key) { - if (contentsBuilder != null) { + if (contentsBuilder != null && shouldReplicate(key.getKind())) { contentsBuilder.addDelete(key); } } + /** Returns true if the entity class should be replicated from SQL to datastore. */ + private boolean shouldReplicate(Class entityClass) { + return !NonReplicatedEntity.class.isAssignableFrom(entityClass) + && !SqlOnlyEntity.class.isAssignableFrom(entityClass); + } + private void recordTransaction() { if (contentsBuilder != null) { Transaction persistedTxn = contentsBuilder.build(); diff --git a/core/src/main/java/google/registry/persistence/transaction/Transaction.java b/core/src/main/java/google/registry/persistence/transaction/Transaction.java index b93d1638b..efad3050d 100644 --- a/core/src/main/java/google/registry/persistence/transaction/Transaction.java +++ b/core/src/main/java/google/registry/persistence/transaction/Transaction.java @@ -18,7 +18,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static google.registry.model.ofy.ObjectifyService.auditedOfy; import static google.registry.persistence.transaction.TransactionManagerFactory.ofyTm; -import static google.registry.persistence.transaction.TransactionManagerFactory.tm; import com.google.appengine.api.datastore.Entity; import com.google.appengine.api.datastore.EntityTranslator; @@ -51,11 +50,17 @@ public class Transaction extends ImmutableObject implements Buildable { // unique and inherently informative. private static final int VERSION_ID = 20200604; + // Keep a per-thread flag to keep track of whether we're serializing an entity for a transaction. + // This is used by internal translators to avoid doing things that are dependent on being in a + // datastore transaction and alter the persisted representation of the entity. + private static ThreadLocal inSerializationMode = ThreadLocal.withInitial(() -> false); + private transient ImmutableList mutations; /** Write the entire transaction to the datastore in a datastore transaction. */ public void writeToDatastore() { - tm().transact( + ofyTm() + .transact( () -> { for (Mutation mutation : mutations) { mutation.writeToDatastore(); @@ -75,8 +80,13 @@ public class Transaction extends ImmutableObject implements Buildable { // Write all of the mutations, preceded by their count. out.writeInt(mutations.size()); - for (Mutation mutation : mutations) { - mutation.serializeTo(out); + try { + inSerializationMode.set(true); + for (Mutation mutation : mutations) { + mutation.serializeTo(out); + } + } finally { + inSerializationMode.set(false); } out.close(); @@ -114,6 +124,17 @@ public class Transaction extends ImmutableObject implements Buildable { return mutations.isEmpty(); } + /** + * Returns true if we are serializing a transaction in the current thread. + * + *

This should be checked by any Ofy translators prior to making any changes to an entity's + * state representation based on the assumption that we are currently pseristing the entity to + * datastore. + */ + public static boolean inSerializationMode() { + return inSerializationMode.get(); + } + @Override public Builder asBuilder() { return new Builder(clone(this)); diff --git a/core/src/main/java/google/registry/schema/replay/ReplicateToDatastoreAction.java b/core/src/main/java/google/registry/schema/replay/ReplicateToDatastoreAction.java index fa829b53c..5ec8a4866 100644 --- a/core/src/main/java/google/registry/schema/replay/ReplicateToDatastoreAction.java +++ b/core/src/main/java/google/registry/schema/replay/ReplicateToDatastoreAction.java @@ -42,7 +42,8 @@ import javax.persistence.NoResultException; method = GET, automaticallyPrintOk = true, auth = Auth.AUTH_INTERNAL_OR_ADMIN) -class ReplicateToDatastoreAction implements Runnable { +@VisibleForTesting +public class ReplicateToDatastoreAction implements Runnable { public static final String PATH = "/_dr/cron/replicateToDatastore"; private static final FluentLogger logger = FluentLogger.forEnclosingClass(); @@ -61,12 +62,12 @@ class ReplicateToDatastoreAction implements Runnable { } @VisibleForTesting - List getTransactionBatch() { + public List getTransactionBatch() { // Get the next batch of transactions that we haven't replicated. LastSqlTransaction lastSqlTxnBeforeBatch = ofyTm().transact(() -> LastSqlTransaction.load()); try { return jpaTm() - .transact( + .transactWithoutBackup( () -> jpaTm() .query( @@ -86,7 +87,7 @@ class ReplicateToDatastoreAction implements Runnable { * be aborted. */ @VisibleForTesting - boolean applyTransaction(TransactionEntity txnEntity) { + public boolean applyTransaction(TransactionEntity txnEntity) { logger.atInfo().log("Applying a single transaction Cloud SQL -> Cloud Datastore"); return ofyTm() .transact( diff --git a/core/src/test/java/google/registry/flows/host/HostInfoFlowTest.java b/core/src/test/java/google/registry/flows/host/HostInfoFlowTest.java index 7a312b647..a74c83c4b 100644 --- a/core/src/test/java/google/registry/flows/host/HostInfoFlowTest.java +++ b/core/src/test/java/google/registry/flows/host/HostInfoFlowTest.java @@ -51,7 +51,7 @@ class HostInfoFlowTest extends ResourceFlowTestCase @Order(value = Order.DEFAULT - 2) @RegisterExtension - final ReplayExtension replayExtension = ReplayExtension.createWithCompare(clock); + final ReplayExtension replayExtension = ReplayExtension.createWithDoubleReplay(clock); HostInfoFlowTest() { setEppInput("host_info.xml", ImmutableMap.of("HOSTNAME", "ns1.example.tld")); diff --git a/core/src/test/java/google/registry/testing/AppEngineExtension.java b/core/src/test/java/google/registry/testing/AppEngineExtension.java index 3ce7cb0ba..67dab888f 100644 --- a/core/src/test/java/google/registry/testing/AppEngineExtension.java +++ b/core/src/test/java/google/registry/testing/AppEngineExtension.java @@ -477,7 +477,7 @@ public final class AppEngineExtension implements BeforeEachCallback, AfterEachCa public void afterEach(ExtensionContext context) throws Exception { checkArgumentNotNull(context, "The ExtensionContext must not be null"); try { - // If there is a replay extension, we'll want to call its replayToSql() method. + // If there is a replay extension, we'll want to call its replay() method. // // We have to provide this hook here for ReplayExtension instead of relying on // ReplayExtension's afterEach() method because of ordering and the conflation of environment @@ -492,7 +492,7 @@ public final class AppEngineExtension implements BeforeEachCallback, AfterEachCa (ReplayExtension) context.getStore(ExtensionContext.Namespace.GLOBAL).get(ReplayExtension.class); if (replayer != null) { - replayer.replayToSql(); + replayer.replay(); } if (withCloudSql) { diff --git a/core/src/test/java/google/registry/testing/DualDatabaseTestInvocationContextProvider.java b/core/src/test/java/google/registry/testing/DualDatabaseTestInvocationContextProvider.java index 124b93658..05a3b4296 100644 --- a/core/src/test/java/google/registry/testing/DualDatabaseTestInvocationContextProvider.java +++ b/core/src/test/java/google/registry/testing/DualDatabaseTestInvocationContextProvider.java @@ -51,15 +51,23 @@ class DualDatabaseTestInvocationContextProvider implements TestTemplateInvocatio return true; } + /** + * Returns true if "context" is an objectify unit test. + * + *

Provided to allow ReplayExtension to make this determination. + */ + static boolean inOfyContext(ExtensionContext context) { + return (DatabaseType) context.getStore(NAMESPACE).get(INJECTED_TM_SUPPLIER_KEY) + == DatabaseType.OFY; + } + @Override public Stream provideTestTemplateInvocationContexts( ExtensionContext context) { TestTemplateInvocationContext ofyContext = - createInvocationContext( - context.getDisplayName() + " with Datastore", TransactionManagerFactory::ofyTm); + createInvocationContext(context.getDisplayName() + " with Datastore", DatabaseType.OFY); TestTemplateInvocationContext sqlContext = - createInvocationContext( - context.getDisplayName() + " with PostgreSQL", TransactionManagerFactory::jpaTm); + createInvocationContext(context.getDisplayName() + " with PostgreSQL", DatabaseType.JPA); Method testMethod = context.getTestMethod().orElseThrow(IllegalStateException::new); if (testMethod.isAnnotationPresent(TestOfyAndSql.class)) { return Stream.of(ofyContext, sqlContext); @@ -74,7 +82,7 @@ class DualDatabaseTestInvocationContextProvider implements TestTemplateInvocatio } private TestTemplateInvocationContext createInvocationContext( - String name, Supplier tmSupplier) { + String name, DatabaseType databaseType) { return new TestTemplateInvocationContext() { @Override public String getDisplayName(int invocationIndex) { @@ -83,17 +91,17 @@ class DualDatabaseTestInvocationContextProvider implements TestTemplateInvocatio @Override public List getAdditionalExtensions() { - return ImmutableList.of(new DatabaseSwitchInvocationContext(tmSupplier)); + return ImmutableList.of(new DatabaseSwitchInvocationContext(databaseType)); } }; } private static class DatabaseSwitchInvocationContext implements TestInstancePostProcessor { - private Supplier tmSupplier; + private DatabaseType databaseType; - private DatabaseSwitchInvocationContext(Supplier tmSupplier) { - this.tmSupplier = tmSupplier; + private DatabaseSwitchInvocationContext(DatabaseType databaseType) { + this.databaseType = databaseType; } @Override @@ -113,7 +121,7 @@ class DualDatabaseTestInvocationContextProvider implements TestTemplateInvocatio throw new IllegalStateException( "AppEngineExtension in @DualDatabaseTest test must set withDatastoreAndCloudSql()"); } - context.getStore(NAMESPACE).put(INJECTED_TM_SUPPLIER_KEY, tmSupplier); + context.getStore(NAMESPACE).put(INJECTED_TM_SUPPLIER_KEY, databaseType); } private static ImmutableList getAppEngineExtensionFields(Class clazz) { @@ -144,10 +152,9 @@ class DualDatabaseTestInvocationContextProvider implements TestTemplateInvocatio } }); context.getStore(NAMESPACE).put(ORIGINAL_TM_KEY, tm()); - Supplier tmSupplier = - (Supplier) - context.getStore(NAMESPACE).get(INJECTED_TM_SUPPLIER_KEY); - TransactionManagerFactory.setTm(tmSupplier.get()); + DatabaseType databaseType = + (DatabaseType) context.getStore(NAMESPACE).get(INJECTED_TM_SUPPLIER_KEY); + TransactionManagerFactory.setTm(databaseType.getTm()); } } @@ -171,4 +178,20 @@ class DualDatabaseTestInvocationContextProvider implements TestTemplateInvocatio return testInstance.getClass().isAnnotationPresent(DualDatabaseTest.class) && isDeclaredTestMethod; } + + private enum DatabaseType { + JPA(TransactionManagerFactory::jpaTm), + OFY(TransactionManagerFactory::ofyTm); + + @SuppressWarnings("Immutable") // Supplier is immutable, but not annotated as such. + private final Supplier supplier; + + DatabaseType(Supplier supplier) { + this.supplier = supplier; + } + + TransactionManager getTm() { + return supplier.get(); + } + } } diff --git a/core/src/test/java/google/registry/testing/ReplayExtension.java b/core/src/test/java/google/registry/testing/ReplayExtension.java index 1b015dc8d..659d3e841 100644 --- a/core/src/test/java/google/registry/testing/ReplayExtension.java +++ b/core/src/test/java/google/registry/testing/ReplayExtension.java @@ -22,13 +22,17 @@ import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.googlecode.objectify.Key; +import google.registry.config.RegistryConfig; import google.registry.model.ImmutableObject; import google.registry.model.ofy.CommitLogBucket; import google.registry.model.ofy.ReplayQueue; import google.registry.model.ofy.TransactionInfo; import google.registry.persistence.VKey; +import google.registry.persistence.transaction.TransactionEntity; import google.registry.schema.replay.DatastoreEntity; +import google.registry.schema.replay.ReplicateToDatastoreAction; import java.util.Optional; +import javax.annotation.Nullable; import org.junit.jupiter.api.extension.AfterEachCallback; import org.junit.jupiter.api.extension.BeforeEachCallback; import org.junit.jupiter.api.extension.ExtensionContext; @@ -48,19 +52,27 @@ public class ReplayExtension implements BeforeEachCallback, AfterEachCallback { FakeClock clock; boolean compare; + boolean replayed = false; + boolean inOfyContext; InjectExtension injectExtension = new InjectExtension(); + @Nullable ReplicateToDatastoreAction sqlToDsReplicator; - private ReplayExtension(FakeClock clock, boolean compare) { + private ReplayExtension( + FakeClock clock, boolean compare, @Nullable ReplicateToDatastoreAction sqlToDsReplicator) { this.clock = clock; this.compare = compare; + this.sqlToDsReplicator = sqlToDsReplicator; } public static ReplayExtension createWithCompare(FakeClock clock) { - return new ReplayExtension(clock, true); + return new ReplayExtension(clock, true, null); } - public static ReplayExtension createWithoutCompare(FakeClock clock) { - return new ReplayExtension(clock, false); + /** + * Create a replay extension that replays from SQL to cloud datastore when running in SQL mode. + */ + public static ReplayExtension createWithDoubleReplay(FakeClock clock) { + return new ReplayExtension(clock, true, new ReplicateToDatastoreAction(clock)); } @Override @@ -74,16 +86,27 @@ public class ReplayExtension implements BeforeEachCallback, AfterEachCallback { DatabaseHelper.setClock(clock); DatabaseHelper.setAlwaysSaveWithBackup(true); ReplayQueue.clear(); + + // When running in JPA mode with double replay enabled, enable JPA transaction replication. + // Note that we can't just use isOfy() here because this extension gets run before the dual-test + // transaction manager gets injected. + inOfyContext = DualDatabaseTestInvocationContextProvider.inOfyContext(context); + if (sqlToDsReplicator != null && !inOfyContext) { + RegistryConfig.overrideCloudSqlReplicateTransactions(true); + } + context.getStore(ExtensionContext.Namespace.GLOBAL).put(ReplayExtension.class, this); } @Override public void afterEach(ExtensionContext context) { // This ensures that we do the replay even if we're not called from AppEngineExtension. It - // should be safe to call replayToSql() twice, as the replay queue should be empty the second - // time. - replayToSql(); + // is safe to call replay() twice, as the method ensures idempotence. + replay(); injectExtension.afterEach(context); + if (sqlToDsReplicator != null) { + RegistryConfig.overrideCloudSqlReplicateTransactions(false); + } } private static ImmutableSet NON_REPLICATED_TYPES = @@ -104,7 +127,26 @@ public class ReplayExtension implements BeforeEachCallback, AfterEachCallback { "ForeignKeyContactIndex", "ForeignKeyDomainIndex"); - public void replayToSql() { + public void replay() { + if (!replayed) { + if (inOfyContext) { + replayToSql(); + } else { + // Disable database backups. For unknown reason, if we don't do this we get residual commit + // log entries that cause timestamp inversions in other tests. + DatabaseHelper.setAlwaysSaveWithBackup(false); + + // Do the ofy replay. + replayToOfy(); + + // Clean out anything that ends up in the replay queue. + ReplayQueue.clear(); + } + replayed = true; + } + } + + private void replayToSql() { DatabaseHelper.setAlwaysSaveWithBackup(false); ImmutableMap, Object> changes = ReplayQueue.replay(); @@ -139,4 +181,18 @@ public class ReplayExtension implements BeforeEachCallback, AfterEachCallback { } } } + + private void replayToOfy() { + if (sqlToDsReplicator == null) { + return; + } + + // TODO(mmuller): Verify that all entities are the same across both databases. + for (TransactionEntity txn : sqlToDsReplicator.getTransactionBatch()) { + if (sqlToDsReplicator.applyTransaction(txn)) { + break; + } + clock.advanceOneMilli(); + } + } }