Clean up ReplicateToDatastoreAction and tests (#1299)

* Clean up ReplicateToDatastoreAction and tests

1. applyTransaction should throw an error if it fails; this allows us to
have more information in the caller (and it shouldn't usually happen)
2. Set a response code + payload now, since this is an action that is
called by cron
3. Add a method to the test log subject that allows us to check if a
severe log with a particular Throwable cause was logged (since the cause
isn't contained in the log message itself directly)
This commit is contained in:
gbrodman 2021-08-25 14:45:05 -06:00 committed by GitHub
parent 37283ecff7
commit 6349bad49b
4 changed files with 61 additions and 27 deletions

View file

@ -87,15 +87,14 @@ public class ReplicateToDatastoreAction implements Runnable {
* Apply a transaction to Datastore, returns true if there was a fatal error and the batch should * Apply a transaction to Datastore, returns true if there was a fatal error and the batch should
* be aborted. * be aborted.
* *
* <p>TODO(gbrodman): this should throw an exception on error instead since it gives more * <p>Throws an exception if a fatal error occurred and the batch should be aborted
* information and we can't rely on the caller checking the boolean result.
*/ */
@VisibleForTesting @VisibleForTesting
public boolean applyTransaction(TransactionEntity txnEntity) { public void applyTransaction(TransactionEntity txnEntity) {
logger.atInfo().log("Applying a single transaction Cloud SQL -> Cloud Datastore"); logger.atInfo().log("Applying a single transaction Cloud SQL -> Cloud Datastore");
try (UpdateAutoTimestamp.DisableAutoUpdateResource disabler = try (UpdateAutoTimestamp.DisableAutoUpdateResource disabler =
UpdateAutoTimestamp.disableAutoUpdate()) { UpdateAutoTimestamp.disableAutoUpdate()) {
return ofyTm() ofyTm()
.transact( .transact(
() -> { () -> {
// Reload the last transaction id, which could possibly have changed. // Reload the last transaction id, which could possibly have changed.
@ -105,11 +104,10 @@ public class ReplicateToDatastoreAction implements Runnable {
// We're missing a transaction. This is bad. Transaction ids are supposed to // We're missing a transaction. This is bad. Transaction ids are supposed to
// increase monotonically, so we abort rather than applying anything out of // increase monotonically, so we abort rather than applying anything out of
// order. // order.
logger.atSevere().log( throw new IllegalStateException(
"Missing transaction: last transaction id = %s, next available transaction " String.format(
+ "= %s", "Missing transaction: last txn id = %s, next available txn = %s",
nextTxnId - 1, txnEntity.getId()); nextTxnId - 1, txnEntity.getId()));
return true;
} else if (nextTxnId > txnEntity.getId()) { } else if (nextTxnId > txnEntity.getId()) {
// We've already replayed this transaction. This shouldn't happen, as GAE cron // We've already replayed this transaction. This shouldn't happen, as GAE cron
// is supposed to avoid overruns and this action shouldn't be executed from any // is supposed to avoid overruns and this action shouldn't be executed from any
@ -118,7 +116,7 @@ public class ReplicateToDatastoreAction implements Runnable {
logger.atWarning().log( logger.atWarning().log(
"Ignoring transaction %s, which appears to have already been applied.", "Ignoring transaction %s, which appears to have already been applied.",
txnEntity.getId()); txnEntity.getId());
return false; return;
} }
logger.atInfo().log( logger.atInfo().log(
@ -137,7 +135,6 @@ public class ReplicateToDatastoreAction implements Runnable {
auditedOfy().save().entity(lastSqlTxn.cloneWithNewTransactionId(nextTxnId)); auditedOfy().save().entity(lastSqlTxn.cloneWithNewTransactionId(nextTxnId));
logger.atInfo().log( logger.atInfo().log(
"Finished applying single transaction Cloud SQL -> Cloud Datastore"); "Finished applying single transaction Cloud SQL -> Cloud Datastore");
return false;
}); });
} }
} }
@ -147,18 +144,23 @@ public class ReplicateToDatastoreAction implements Runnable {
MigrationState state = DatabaseMigrationStateSchedule.getValueAtTime(clock.nowUtc()); MigrationState state = DatabaseMigrationStateSchedule.getValueAtTime(clock.nowUtc());
if (!state.getReplayDirection().equals(ReplayDirection.SQL_TO_DATASTORE)) { if (!state.getReplayDirection().equals(ReplayDirection.SQL_TO_DATASTORE)) {
logger.atInfo().log( logger.atInfo().log(
String.format( "Skipping ReplicateToDatastoreAction because we are in migration phase %s.", state);
"Skipping ReplicateToDatastoreAction because we are in migration phase %s.", state));
return; return;
} }
// TODO(b/181758163): Deal with objects that don't exist in Cloud SQL, e.g. ForeignKeyIndex, // TODO(b/181758163): Deal with objects that don't exist in Cloud SQL, e.g. ForeignKeyIndex,
// EppResourceIndex. // EppResourceIndex.
logger.atInfo().log("Processing transaction replay batch Cloud SQL -> Cloud Datastore"); logger.atInfo().log("Processing transaction replay batch Cloud SQL -> Cloud Datastore");
int numTransactionsReplayed = 0;
for (TransactionEntity txnEntity : getTransactionBatch()) { for (TransactionEntity txnEntity : getTransactionBatch()) {
if (applyTransaction(txnEntity)) { try {
break; applyTransaction(txnEntity);
} catch (Throwable t) {
logger.atSevere().withCause(t).log("Errored out replaying files");
return;
} }
numTransactionsReplayed++;
} }
logger.atInfo().log("Done processing transaction replay batch Cloud SQL -> Cloud Datastore"); logger.atInfo().log(
"Replayed %d transactions from Cloud SQL -> Datastore", numTransactionsReplayed);
} }
} }

View file

@ -19,6 +19,7 @@ import static google.registry.persistence.transaction.TransactionManagerFactory.
import static google.registry.persistence.transaction.TransactionManagerFactory.ofyTm; import static google.registry.persistence.transaction.TransactionManagerFactory.ofyTm;
import static google.registry.testing.LogsSubject.assertAboutLogs; import static google.registry.testing.LogsSubject.assertAboutLogs;
import static google.registry.util.DateTimeUtils.START_OF_TIME; import static google.registry.util.DateTimeUtils.START_OF_TIME;
import static org.junit.Assert.assertThrows;
import com.google.common.base.Suppliers; import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableSortedMap; import com.google.common.collect.ImmutableSortedMap;
@ -149,14 +150,14 @@ public class ReplicateToDatastoreActionTest {
assertThat(txns2).hasSize(2); assertThat(txns2).hasSize(2);
// Apply the first batch. // Apply the first batch.
assertThat(task.applyTransaction(txns1.get(0))).isFalse(); task.applyTransaction(txns1.get(0));
// Remove the foo record so we can ensure that this transaction doesn't get doublle-played. // Remove the foo record so we can ensure that this transaction doesn't get doublle-played.
ofyTm().transact(() -> ofyTm().delete(foo.key())); ofyTm().transact(() -> ofyTm().delete(foo.key()));
// Apply the second batch. // Apply the second batch.
for (TransactionEntity txn : txns2) { for (TransactionEntity txn : txns2) {
assertThat(task.applyTransaction(txn)).isFalse(); task.applyTransaction(txn);
} }
// Verify that the first transaction didn't get replayed but the second one did. // Verify that the first transaction didn't get replayed but the second one did.
@ -179,12 +180,25 @@ public class ReplicateToDatastoreActionTest {
List<TransactionEntity> txns = task.getTransactionBatch(); List<TransactionEntity> txns = task.getTransactionBatch();
assertThat(txns).hasSize(1); assertThat(txns).hasSize(1);
assertThat(task.applyTransaction(txns.get(0))).isTrue(); assertThat(assertThrows(IllegalStateException.class, () -> task.applyTransaction(txns.get(0))))
.hasMessageThat()
.isEqualTo("Missing transaction: last txn id = -1, next available txn = 1");
}
@Test
void testMissingTransactions_fullTask() {
// Write a transaction (should have a transaction id of 1).
TestEntity foo = new TestEntity("foo");
jpaTm().transact(() -> jpaTm().insert(foo));
// Force the last transaction id back to -1 so that we look for transaction 0.
ofyTm().transact(() -> ofyTm().insert(new LastSqlTransaction(-1)));
task.run();
assertAboutLogs() assertAboutLogs()
.that(logHandler) .that(logHandler)
.hasLogAtLevelWithMessage( .hasSevereLogWithCause(
Level.SEVERE, new IllegalStateException(
"Missing transaction: last transaction id = -1, next available transaction = 1"); "Missing transaction: last txn id = -1, next available txn = 1"));
} }
@Test @Test

View file

@ -14,6 +14,7 @@
package google.registry.testing; package google.registry.testing;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.truth.Truth.assertAbout; import static com.google.common.truth.Truth.assertAbout;
import static com.google.common.truth.Truth.assertWithMessage; import static com.google.common.truth.Truth.assertWithMessage;
@ -26,6 +27,7 @@ import com.google.common.truth.StringSubject;
import com.google.common.truth.Subject; import com.google.common.truth.Subject;
import google.registry.testing.TruthChainer.Which; import google.registry.testing.TruthChainer.Which;
import java.util.List; import java.util.List;
import java.util.Objects;
import java.util.logging.Handler; import java.util.logging.Handler;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.LogRecord; import java.util.logging.LogRecord;
@ -41,7 +43,13 @@ public class LogsSubject extends Subject {
} }
private static final Correspondence<String, String> CONTAINS_CORRESPONDENCE = private static final Correspondence<String, String> CONTAINS_CORRESPONDENCE =
Correspondence.from((actual, expected) -> actual.contains(expected), "contains"); Correspondence.from(String::contains, "contains");
private static final Correspondence<Throwable, Throwable> THROWABLE_CORRESPONDENCE =
Correspondence.from(
(t1, t2) ->
t1.getClass().equals(t2.getClass()) && t1.getMessage().equals(t2.getMessage()),
"throwableEquivalent");
private List<String> getMessagesAtLevel(Level level) { private List<String> getMessagesAtLevel(Level level) {
ImmutableList.Builder<String> builder = new ImmutableList.Builder<>(); ImmutableList.Builder<String> builder = new ImmutableList.Builder<>();
@ -57,6 +65,19 @@ public class LogsSubject extends Subject {
check("atLevel(%s)", level).that(getMessagesAtLevel(level)).isEmpty(); check("atLevel(%s)", level).that(getMessagesAtLevel(level)).isEmpty();
} }
public void hasSevereLogWithCause(Throwable throwable) {
ImmutableList<Throwable> actualThrowables =
actual.getStoredLogRecords().stream()
.filter(record -> record.getLevel().equals(Level.SEVERE))
.map(LogRecord::getThrown)
.filter(Objects::nonNull)
.collect(toImmutableList());
check("atSevere")
.that(actualThrowables)
.comparingElementsUsing(THROWABLE_CORRESPONDENCE)
.contains(throwable);
}
public Which<StringSubject> hasLogAtLevelWithMessage(Level level, String message) { public Which<StringSubject> hasLogAtLevelWithMessage(Level level, String message) {
List<String> messagesAtLevel = getMessagesAtLevel(level); List<String> messagesAtLevel = getMessagesAtLevel(level);
check("atLevel(%s)", level) check("atLevel(%s)", level)

View file

@ -199,10 +199,7 @@ public class ReplayExtension implements BeforeEachCallback, AfterEachCallback {
do { do {
transactionBatch = sqlToDsReplicator.getTransactionBatch(); transactionBatch = sqlToDsReplicator.getTransactionBatch();
for (TransactionEntity txn : transactionBatch) { for (TransactionEntity txn : transactionBatch) {
if (sqlToDsReplicator.applyTransaction(txn)) { sqlToDsReplicator.applyTransaction(txn);
throw new RuntimeException(
"Error when replaying to Datastore in tests; see logs for more details");
}
if (compare) { if (compare) {
ofyTm().transact(() -> compareSqlTransaction(txn)); ofyTm().transact(() -> compareSqlTransaction(txn));
} }