Support testing SQL -> DS replication in ReplayExt (#1216)

* Support testing SQL -> DS replication in ReplayExt

Support testing of Postgres -> Datastore replication in the ReplayExtension
when running in SQL mode in a DualDatabaseTest.

This is currently only enabled for one test (HostInfoFlowTest) since this form
of replication is likely to be problematic in many cases.

As part of this change:

- Add a thread-local flag so that we don't attempt to do certain data
  transformations when serializing entities for storage in a Transaction
  record. (These typically need to be called in a datastore transaction).
- Replace tm() in datastore translators with ofyTm() (these should only be
  called from within an ofy transaction) and also in the replay system itself.
- Add a transactWithoutBackup() method for use within the replay itself.
- Prevent replication of entities that are not intended to be replicated.
- Make some of the ReplicateToDatastoreAction methods public so we can invoke
  them from ReplayExtension.
- Change the way that the test type is stored in the extension context in a
  DualDatabaseTest so that we can check for it from the ReplayExtension.

* Limit number of tests and show output

Trying to debug why these are failing in kokoro.

* Move HostInfoFlowTest to fragile for now

The test now manipulates a globel variable that causes problems for other
tests.  There's likely a better fix for this, but for purposes of this PR we
can just move it to "fragile."

* Fix a few more problems

-   "replay" flag should have been initialized to false -- as it stands,
    replay wasn't happening.
-   disable "always save with backup" in the datastore helper, we were
    apparently getting some unwanted commit log entries that were causing
    timestamp inversions in other tests.  Also clear out the replay queue
    just for good hygiene.
-   Check for a null replicator in replayToOfy before proceeding.
-   Use a local inOfyContext flag to track whether we're in ofy context, as
    the tm() function is less reliable in dual-database tests.
This commit is contained in:
Michael Muller 2021-06-29 10:00:39 -04:00 committed by GitHub
parent 33221cc9f8
commit 6d26f3cc0e
12 changed files with 196 additions and 46 deletions

View file

@ -79,6 +79,9 @@ def fragileTestPatterns = [
// Changes cache timeouts and for some reason appears to have contention
// with other tests.
"google/registry/whois/WhoisCommandFactoryTest.*",
// Currently changes a global configuration parameter that for some reason
// results in timestamp inversions for other tests. TODO(mmuller): fix.
"google/registry/flows/host/HostInfoFlowTest.*",
] + dockerIncompatibleTestPatterns
sourceSets {

View file

@ -17,13 +17,14 @@ package google.registry.model.translators;
import static com.google.common.base.MoreObjects.firstNonNull;
import static google.registry.config.RegistryConfig.getCommitLogDatastoreRetention;
import static google.registry.model.ofy.ObjectifyService.auditedOfy;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import static google.registry.persistence.transaction.TransactionManagerFactory.ofyTm;
import static google.registry.util.DateTimeUtils.START_OF_TIME;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.Ordering;
import com.googlecode.objectify.Key;
import google.registry.model.ofy.CommitLogManifest;
import google.registry.persistence.transaction.Transaction;
import org.joda.time.DateTime;
/**
@ -58,12 +59,20 @@ public final class CommitLogRevisionsTranslatorFactory
* <p>We store a maximum of one entry per day. It will be the last transaction that happened on
* that day.
*
* <p>In serialization mode, this method just returns "revisions" without modification.
*
* @see google.registry.config.RegistryConfig#getCommitLogDatastoreRetention()
*/
@Override
ImmutableSortedMap<DateTime, Key<CommitLogManifest>> transformBeforeSave(
ImmutableSortedMap<DateTime, Key<CommitLogManifest>> revisions) {
DateTime now = tm().getTransactionTime();
// Don't do anything if we're just doing object serialization.
if (Transaction.inSerializationMode()) {
return revisions;
}
DateTime now = ofyTm().getTransactionTime();
DateTime threshold = now.minus(getCommitLogDatastoreRetention());
DateTime preThresholdTime = firstNonNull(revisions.floorKey(threshold), START_OF_TIME);
return new ImmutableSortedMap.Builder<DateTime, Key<CommitLogManifest>>(Ordering.natural())

View file

@ -15,10 +15,11 @@
package google.registry.model.translators;
import static com.google.common.base.MoreObjects.firstNonNull;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import static google.registry.persistence.transaction.TransactionManagerFactory.ofyTm;
import static org.joda.time.DateTimeZone.UTC;
import google.registry.model.CreateAutoTimestamp;
import google.registry.persistence.transaction.Transaction;
import java.util.Date;
import org.joda.time.DateTime;
@ -46,7 +47,14 @@ public class CreateAutoTimestampTranslatorFactory
/** Save a timestamp, setting it to the current time if it did not have a previous value. */
@Override
public Date saveValue(CreateAutoTimestamp pojoValue) {
return firstNonNull(pojoValue.getTimestamp(), tm().getTransactionTime()).toDate();
}};
// Don't do this if we're in the course of transaction serialization.
if (Transaction.inSerializationMode()) {
return pojoValue.getTimestamp() == null ? null : pojoValue.getTimestamp().toDate();
}
return firstNonNull(pojoValue.getTimestamp(), ofyTm().getTransactionTime()).toDate();
}
};
}
}

View file

@ -14,10 +14,11 @@
package google.registry.model.translators;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import static google.registry.persistence.transaction.TransactionManagerFactory.ofyTm;
import static org.joda.time.DateTimeZone.UTC;
import google.registry.model.UpdateAutoTimestamp;
import google.registry.persistence.transaction.Transaction;
import java.util.Date;
import org.joda.time.DateTime;
@ -46,8 +47,14 @@ public class UpdateAutoTimestampTranslatorFactory
/** Save a timestamp, setting it to the current time. */
@Override
public Date saveValue(UpdateAutoTimestamp pojoValue) {
// Don't do this if we're in the course of transaction serialization.
if (Transaction.inSerializationMode()) {
return pojoValue.getTimestamp() == null ? null : pojoValue.getTimestamp().toDate();
}
return UpdateAutoTimestamp.autoUpdateEnabled()
? tm().getTransactionTime().toDate()
? ofyTm().getTransactionTime().toDate()
: pojoValue.getTimestamp().toDate();
}
};

View file

@ -50,6 +50,11 @@ public interface JpaTransactionManager extends TransactionManager {
*/
Query query(String sqlString);
/**
* Execute the work in a transaction without recording the transaction for replay to datastore.
*/
<T> T transactWithoutBackup(Supplier<T> work);
/** Executes the work in a transaction with no retries and returns the result. */
<T> T transactNoRetry(Supplier<T> work);

View file

@ -41,6 +41,8 @@ import google.registry.model.server.KmsSecret;
import google.registry.model.tmch.ClaimsList.ClaimsListSingleton;
import google.registry.persistence.JpaRetries;
import google.registry.persistence.VKey;
import google.registry.schema.replay.NonReplicatedEntity;
import google.registry.schema.replay.SqlOnlyEntity;
import google.registry.util.Clock;
import google.registry.util.Retrier;
import google.registry.util.SystemSleeper;
@ -152,6 +154,15 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager {
@Override
public <T> T transact(Supplier<T> work) {
return transact(work, true);
}
@Override
public <T> T transactWithoutBackup(Supplier<T> work) {
return transact(work, false);
}
private <T> T transact(Supplier<T> work, boolean withBackup) {
return retrier.callWithRetry(
() -> {
if (inTransaction()) {
@ -162,7 +173,7 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager {
EntityTransaction txn = txnInfo.entityManager.getTransaction();
try {
txn.begin();
txnInfo.start(clock);
txnInfo.start(clock, withBackup);
T result = work.get();
txnInfo.recordTransaction();
txn.commit();
@ -194,7 +205,7 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager {
EntityTransaction txn = txnInfo.entityManager.getTransaction();
try {
txn.begin();
txnInfo.start(clock);
txnInfo.start(clock, true);
T result = work.get();
txnInfo.recordTransaction();
txn.commit();
@ -740,11 +751,11 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager {
Set<Object> objectsToSave = Collections.newSetFromMap(new IdentityHashMap<Object, Boolean>());
/** Start a new transaction. */
private void start(Clock clock) {
private void start(Clock clock, boolean withBackup) {
checkArgumentNotNull(clock);
inTransaction = true;
transactionTime = clock.nowUtc();
if (RegistryConfig.getCloudSqlReplicateTransactions()) {
if (withBackup && RegistryConfig.getCloudSqlReplicateTransactions()) {
contentsBuilder = new Transaction.Builder();
}
}
@ -763,17 +774,23 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager {
}
private void addUpdate(Object entity) {
if (contentsBuilder != null) {
if (contentsBuilder != null && shouldReplicate(entity.getClass())) {
contentsBuilder.addUpdate(entity);
}
}
private void addDelete(VKey<?> key) {
if (contentsBuilder != null) {
if (contentsBuilder != null && shouldReplicate(key.getKind())) {
contentsBuilder.addDelete(key);
}
}
/** Returns true if the entity class should be replicated from SQL to datastore. */
private boolean shouldReplicate(Class<?> entityClass) {
return !NonReplicatedEntity.class.isAssignableFrom(entityClass)
&& !SqlOnlyEntity.class.isAssignableFrom(entityClass);
}
private void recordTransaction() {
if (contentsBuilder != null) {
Transaction persistedTxn = contentsBuilder.build();

View file

@ -18,7 +18,6 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static google.registry.model.ofy.ObjectifyService.auditedOfy;
import static google.registry.persistence.transaction.TransactionManagerFactory.ofyTm;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import com.google.appengine.api.datastore.Entity;
import com.google.appengine.api.datastore.EntityTranslator;
@ -51,11 +50,17 @@ public class Transaction extends ImmutableObject implements Buildable {
// unique and inherently informative.
private static final int VERSION_ID = 20200604;
// Keep a per-thread flag to keep track of whether we're serializing an entity for a transaction.
// This is used by internal translators to avoid doing things that are dependent on being in a
// datastore transaction and alter the persisted representation of the entity.
private static ThreadLocal<Boolean> inSerializationMode = ThreadLocal.withInitial(() -> false);
private transient ImmutableList<Mutation> mutations;
/** Write the entire transaction to the datastore in a datastore transaction. */
public void writeToDatastore() {
tm().transact(
ofyTm()
.transact(
() -> {
for (Mutation mutation : mutations) {
mutation.writeToDatastore();
@ -75,8 +80,13 @@ public class Transaction extends ImmutableObject implements Buildable {
// Write all of the mutations, preceded by their count.
out.writeInt(mutations.size());
for (Mutation mutation : mutations) {
mutation.serializeTo(out);
try {
inSerializationMode.set(true);
for (Mutation mutation : mutations) {
mutation.serializeTo(out);
}
} finally {
inSerializationMode.set(false);
}
out.close();
@ -114,6 +124,17 @@ public class Transaction extends ImmutableObject implements Buildable {
return mutations.isEmpty();
}
/**
* Returns true if we are serializing a transaction in the current thread.
*
* <p>This should be checked by any Ofy translators prior to making any changes to an entity's
* state representation based on the assumption that we are currently pseristing the entity to
* datastore.
*/
public static boolean inSerializationMode() {
return inSerializationMode.get();
}
@Override
public Builder asBuilder() {
return new Builder(clone(this));

View file

@ -42,7 +42,8 @@ import javax.persistence.NoResultException;
method = GET,
automaticallyPrintOk = true,
auth = Auth.AUTH_INTERNAL_OR_ADMIN)
class ReplicateToDatastoreAction implements Runnable {
@VisibleForTesting
public class ReplicateToDatastoreAction implements Runnable {
public static final String PATH = "/_dr/cron/replicateToDatastore";
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
@ -61,12 +62,12 @@ class ReplicateToDatastoreAction implements Runnable {
}
@VisibleForTesting
List<TransactionEntity> getTransactionBatch() {
public List<TransactionEntity> getTransactionBatch() {
// Get the next batch of transactions that we haven't replicated.
LastSqlTransaction lastSqlTxnBeforeBatch = ofyTm().transact(() -> LastSqlTransaction.load());
try {
return jpaTm()
.transact(
.transactWithoutBackup(
() ->
jpaTm()
.query(
@ -86,7 +87,7 @@ class ReplicateToDatastoreAction implements Runnable {
* be aborted.
*/
@VisibleForTesting
boolean applyTransaction(TransactionEntity txnEntity) {
public boolean applyTransaction(TransactionEntity txnEntity) {
logger.atInfo().log("Applying a single transaction Cloud SQL -> Cloud Datastore");
return ofyTm()
.transact(

View file

@ -51,7 +51,7 @@ class HostInfoFlowTest extends ResourceFlowTestCase<HostInfoFlow, HostResource>
@Order(value = Order.DEFAULT - 2)
@RegisterExtension
final ReplayExtension replayExtension = ReplayExtension.createWithCompare(clock);
final ReplayExtension replayExtension = ReplayExtension.createWithDoubleReplay(clock);
HostInfoFlowTest() {
setEppInput("host_info.xml", ImmutableMap.of("HOSTNAME", "ns1.example.tld"));

View file

@ -477,7 +477,7 @@ public final class AppEngineExtension implements BeforeEachCallback, AfterEachCa
public void afterEach(ExtensionContext context) throws Exception {
checkArgumentNotNull(context, "The ExtensionContext must not be null");
try {
// If there is a replay extension, we'll want to call its replayToSql() method.
// If there is a replay extension, we'll want to call its replay() method.
//
// We have to provide this hook here for ReplayExtension instead of relying on
// ReplayExtension's afterEach() method because of ordering and the conflation of environment
@ -492,7 +492,7 @@ public final class AppEngineExtension implements BeforeEachCallback, AfterEachCa
(ReplayExtension)
context.getStore(ExtensionContext.Namespace.GLOBAL).get(ReplayExtension.class);
if (replayer != null) {
replayer.replayToSql();
replayer.replay();
}
if (withCloudSql) {

View file

@ -51,15 +51,23 @@ class DualDatabaseTestInvocationContextProvider implements TestTemplateInvocatio
return true;
}
/**
* Returns true if "context" is an objectify unit test.
*
* <p>Provided to allow ReplayExtension to make this determination.
*/
static boolean inOfyContext(ExtensionContext context) {
return (DatabaseType) context.getStore(NAMESPACE).get(INJECTED_TM_SUPPLIER_KEY)
== DatabaseType.OFY;
}
@Override
public Stream<TestTemplateInvocationContext> provideTestTemplateInvocationContexts(
ExtensionContext context) {
TestTemplateInvocationContext ofyContext =
createInvocationContext(
context.getDisplayName() + " with Datastore", TransactionManagerFactory::ofyTm);
createInvocationContext(context.getDisplayName() + " with Datastore", DatabaseType.OFY);
TestTemplateInvocationContext sqlContext =
createInvocationContext(
context.getDisplayName() + " with PostgreSQL", TransactionManagerFactory::jpaTm);
createInvocationContext(context.getDisplayName() + " with PostgreSQL", DatabaseType.JPA);
Method testMethod = context.getTestMethod().orElseThrow(IllegalStateException::new);
if (testMethod.isAnnotationPresent(TestOfyAndSql.class)) {
return Stream.of(ofyContext, sqlContext);
@ -74,7 +82,7 @@ class DualDatabaseTestInvocationContextProvider implements TestTemplateInvocatio
}
private TestTemplateInvocationContext createInvocationContext(
String name, Supplier<? extends TransactionManager> tmSupplier) {
String name, DatabaseType databaseType) {
return new TestTemplateInvocationContext() {
@Override
public String getDisplayName(int invocationIndex) {
@ -83,17 +91,17 @@ class DualDatabaseTestInvocationContextProvider implements TestTemplateInvocatio
@Override
public List<Extension> getAdditionalExtensions() {
return ImmutableList.of(new DatabaseSwitchInvocationContext(tmSupplier));
return ImmutableList.of(new DatabaseSwitchInvocationContext(databaseType));
}
};
}
private static class DatabaseSwitchInvocationContext implements TestInstancePostProcessor {
private Supplier<? extends TransactionManager> tmSupplier;
private DatabaseType databaseType;
private DatabaseSwitchInvocationContext(Supplier<? extends TransactionManager> tmSupplier) {
this.tmSupplier = tmSupplier;
private DatabaseSwitchInvocationContext(DatabaseType databaseType) {
this.databaseType = databaseType;
}
@Override
@ -113,7 +121,7 @@ class DualDatabaseTestInvocationContextProvider implements TestTemplateInvocatio
throw new IllegalStateException(
"AppEngineExtension in @DualDatabaseTest test must set withDatastoreAndCloudSql()");
}
context.getStore(NAMESPACE).put(INJECTED_TM_SUPPLIER_KEY, tmSupplier);
context.getStore(NAMESPACE).put(INJECTED_TM_SUPPLIER_KEY, databaseType);
}
private static ImmutableList<Field> getAppEngineExtensionFields(Class<?> clazz) {
@ -144,10 +152,9 @@ class DualDatabaseTestInvocationContextProvider implements TestTemplateInvocatio
}
});
context.getStore(NAMESPACE).put(ORIGINAL_TM_KEY, tm());
Supplier<? extends TransactionManager> tmSupplier =
(Supplier<? extends TransactionManager>)
context.getStore(NAMESPACE).get(INJECTED_TM_SUPPLIER_KEY);
TransactionManagerFactory.setTm(tmSupplier.get());
DatabaseType databaseType =
(DatabaseType) context.getStore(NAMESPACE).get(INJECTED_TM_SUPPLIER_KEY);
TransactionManagerFactory.setTm(databaseType.getTm());
}
}
@ -171,4 +178,20 @@ class DualDatabaseTestInvocationContextProvider implements TestTemplateInvocatio
return testInstance.getClass().isAnnotationPresent(DualDatabaseTest.class)
&& isDeclaredTestMethod;
}
private enum DatabaseType {
JPA(TransactionManagerFactory::jpaTm),
OFY(TransactionManagerFactory::ofyTm);
@SuppressWarnings("Immutable") // Supplier is immutable, but not annotated as such.
private final Supplier<? extends TransactionManager> supplier;
DatabaseType(Supplier<? extends TransactionManager> supplier) {
this.supplier = supplier;
}
TransactionManager getTm() {
return supplier.get();
}
}
}

View file

@ -22,13 +22,17 @@ import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.googlecode.objectify.Key;
import google.registry.config.RegistryConfig;
import google.registry.model.ImmutableObject;
import google.registry.model.ofy.CommitLogBucket;
import google.registry.model.ofy.ReplayQueue;
import google.registry.model.ofy.TransactionInfo;
import google.registry.persistence.VKey;
import google.registry.persistence.transaction.TransactionEntity;
import google.registry.schema.replay.DatastoreEntity;
import google.registry.schema.replay.ReplicateToDatastoreAction;
import java.util.Optional;
import javax.annotation.Nullable;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
@ -48,19 +52,27 @@ public class ReplayExtension implements BeforeEachCallback, AfterEachCallback {
FakeClock clock;
boolean compare;
boolean replayed = false;
boolean inOfyContext;
InjectExtension injectExtension = new InjectExtension();
@Nullable ReplicateToDatastoreAction sqlToDsReplicator;
private ReplayExtension(FakeClock clock, boolean compare) {
private ReplayExtension(
FakeClock clock, boolean compare, @Nullable ReplicateToDatastoreAction sqlToDsReplicator) {
this.clock = clock;
this.compare = compare;
this.sqlToDsReplicator = sqlToDsReplicator;
}
public static ReplayExtension createWithCompare(FakeClock clock) {
return new ReplayExtension(clock, true);
return new ReplayExtension(clock, true, null);
}
public static ReplayExtension createWithoutCompare(FakeClock clock) {
return new ReplayExtension(clock, false);
/**
* Create a replay extension that replays from SQL to cloud datastore when running in SQL mode.
*/
public static ReplayExtension createWithDoubleReplay(FakeClock clock) {
return new ReplayExtension(clock, true, new ReplicateToDatastoreAction(clock));
}
@Override
@ -74,16 +86,27 @@ public class ReplayExtension implements BeforeEachCallback, AfterEachCallback {
DatabaseHelper.setClock(clock);
DatabaseHelper.setAlwaysSaveWithBackup(true);
ReplayQueue.clear();
// When running in JPA mode with double replay enabled, enable JPA transaction replication.
// Note that we can't just use isOfy() here because this extension gets run before the dual-test
// transaction manager gets injected.
inOfyContext = DualDatabaseTestInvocationContextProvider.inOfyContext(context);
if (sqlToDsReplicator != null && !inOfyContext) {
RegistryConfig.overrideCloudSqlReplicateTransactions(true);
}
context.getStore(ExtensionContext.Namespace.GLOBAL).put(ReplayExtension.class, this);
}
@Override
public void afterEach(ExtensionContext context) {
// This ensures that we do the replay even if we're not called from AppEngineExtension. It
// should be safe to call replayToSql() twice, as the replay queue should be empty the second
// time.
replayToSql();
// is safe to call replay() twice, as the method ensures idempotence.
replay();
injectExtension.afterEach(context);
if (sqlToDsReplicator != null) {
RegistryConfig.overrideCloudSqlReplicateTransactions(false);
}
}
private static ImmutableSet<String> NON_REPLICATED_TYPES =
@ -104,7 +127,26 @@ public class ReplayExtension implements BeforeEachCallback, AfterEachCallback {
"ForeignKeyContactIndex",
"ForeignKeyDomainIndex");
public void replayToSql() {
public void replay() {
if (!replayed) {
if (inOfyContext) {
replayToSql();
} else {
// Disable database backups. For unknown reason, if we don't do this we get residual commit
// log entries that cause timestamp inversions in other tests.
DatabaseHelper.setAlwaysSaveWithBackup(false);
// Do the ofy replay.
replayToOfy();
// Clean out anything that ends up in the replay queue.
ReplayQueue.clear();
}
replayed = true;
}
}
private void replayToSql() {
DatabaseHelper.setAlwaysSaveWithBackup(false);
ImmutableMap<Key<?>, Object> changes = ReplayQueue.replay();
@ -139,4 +181,18 @@ public class ReplayExtension implements BeforeEachCallback, AfterEachCallback {
}
}
}
private void replayToOfy() {
if (sqlToDsReplicator == null) {
return;
}
// TODO(mmuller): Verify that all entities are the same across both databases.
for (TransactionEntity txn : sqlToDsReplicator.getTransactionBatch()) {
if (sqlToDsReplicator.applyTransaction(txn)) {
break;
}
clock.advanceOneMilli();
}
}
}