diff --git a/core/src/main/java/google/registry/model/replay/ReplicateToDatastoreAction.java b/core/src/main/java/google/registry/model/replay/ReplicateToDatastoreAction.java index 91b269e2b..deba20cd6 100644 --- a/core/src/main/java/google/registry/model/replay/ReplicateToDatastoreAction.java +++ b/core/src/main/java/google/registry/model/replay/ReplicateToDatastoreAction.java @@ -87,15 +87,14 @@ public class ReplicateToDatastoreAction implements Runnable { * Apply a transaction to Datastore, returns true if there was a fatal error and the batch should * be aborted. * - *

TODO(gbrodman): this should throw an exception on error instead since it gives more - * information and we can't rely on the caller checking the boolean result. + *

Throws an exception if a fatal error occurred and the batch should be aborted */ @VisibleForTesting - public boolean applyTransaction(TransactionEntity txnEntity) { + public void applyTransaction(TransactionEntity txnEntity) { logger.atInfo().log("Applying a single transaction Cloud SQL -> Cloud Datastore"); try (UpdateAutoTimestamp.DisableAutoUpdateResource disabler = UpdateAutoTimestamp.disableAutoUpdate()) { - return ofyTm() + ofyTm() .transact( () -> { // Reload the last transaction id, which could possibly have changed. @@ -105,11 +104,10 @@ public class ReplicateToDatastoreAction implements Runnable { // We're missing a transaction. This is bad. Transaction ids are supposed to // increase monotonically, so we abort rather than applying anything out of // order. - logger.atSevere().log( - "Missing transaction: last transaction id = %s, next available transaction " - + "= %s", - nextTxnId - 1, txnEntity.getId()); - return true; + throw new IllegalStateException( + String.format( + "Missing transaction: last txn id = %s, next available txn = %s", + nextTxnId - 1, txnEntity.getId())); } else if (nextTxnId > txnEntity.getId()) { // We've already replayed this transaction. This shouldn't happen, as GAE cron // is supposed to avoid overruns and this action shouldn't be executed from any @@ -118,7 +116,7 @@ public class ReplicateToDatastoreAction implements Runnable { logger.atWarning().log( "Ignoring transaction %s, which appears to have already been applied.", txnEntity.getId()); - return false; + return; } logger.atInfo().log( @@ -137,7 +135,6 @@ public class ReplicateToDatastoreAction implements Runnable { auditedOfy().save().entity(lastSqlTxn.cloneWithNewTransactionId(nextTxnId)); logger.atInfo().log( "Finished applying single transaction Cloud SQL -> Cloud Datastore"); - return false; }); } } @@ -147,18 +144,23 @@ public class ReplicateToDatastoreAction implements Runnable { MigrationState state = DatabaseMigrationStateSchedule.getValueAtTime(clock.nowUtc()); if (!state.getReplayDirection().equals(ReplayDirection.SQL_TO_DATASTORE)) { logger.atInfo().log( - String.format( - "Skipping ReplicateToDatastoreAction because we are in migration phase %s.", state)); + "Skipping ReplicateToDatastoreAction because we are in migration phase %s.", state); return; } // TODO(b/181758163): Deal with objects that don't exist in Cloud SQL, e.g. ForeignKeyIndex, // EppResourceIndex. logger.atInfo().log("Processing transaction replay batch Cloud SQL -> Cloud Datastore"); + int numTransactionsReplayed = 0; for (TransactionEntity txnEntity : getTransactionBatch()) { - if (applyTransaction(txnEntity)) { - break; + try { + applyTransaction(txnEntity); + } catch (Throwable t) { + logger.atSevere().withCause(t).log("Errored out replaying files"); + return; } + numTransactionsReplayed++; } - logger.atInfo().log("Done processing transaction replay batch Cloud SQL -> Cloud Datastore"); + logger.atInfo().log( + "Replayed %d transactions from Cloud SQL -> Datastore", numTransactionsReplayed); } } diff --git a/core/src/test/java/google/registry/model/replay/ReplicateToDatastoreActionTest.java b/core/src/test/java/google/registry/model/replay/ReplicateToDatastoreActionTest.java index 95f78314e..ccb8c7c1e 100644 --- a/core/src/test/java/google/registry/model/replay/ReplicateToDatastoreActionTest.java +++ b/core/src/test/java/google/registry/model/replay/ReplicateToDatastoreActionTest.java @@ -19,6 +19,7 @@ import static google.registry.persistence.transaction.TransactionManagerFactory. import static google.registry.persistence.transaction.TransactionManagerFactory.ofyTm; import static google.registry.testing.LogsSubject.assertAboutLogs; import static google.registry.util.DateTimeUtils.START_OF_TIME; +import static org.junit.Assert.assertThrows; import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableSortedMap; @@ -149,14 +150,14 @@ public class ReplicateToDatastoreActionTest { assertThat(txns2).hasSize(2); // Apply the first batch. - assertThat(task.applyTransaction(txns1.get(0))).isFalse(); + task.applyTransaction(txns1.get(0)); // Remove the foo record so we can ensure that this transaction doesn't get doublle-played. ofyTm().transact(() -> ofyTm().delete(foo.key())); // Apply the second batch. for (TransactionEntity txn : txns2) { - assertThat(task.applyTransaction(txn)).isFalse(); + task.applyTransaction(txn); } // Verify that the first transaction didn't get replayed but the second one did. @@ -179,12 +180,25 @@ public class ReplicateToDatastoreActionTest { List txns = task.getTransactionBatch(); assertThat(txns).hasSize(1); - assertThat(task.applyTransaction(txns.get(0))).isTrue(); + assertThat(assertThrows(IllegalStateException.class, () -> task.applyTransaction(txns.get(0)))) + .hasMessageThat() + .isEqualTo("Missing transaction: last txn id = -1, next available txn = 1"); + } + + @Test + void testMissingTransactions_fullTask() { + // Write a transaction (should have a transaction id of 1). + TestEntity foo = new TestEntity("foo"); + jpaTm().transact(() -> jpaTm().insert(foo)); + + // Force the last transaction id back to -1 so that we look for transaction 0. + ofyTm().transact(() -> ofyTm().insert(new LastSqlTransaction(-1))); + task.run(); assertAboutLogs() .that(logHandler) - .hasLogAtLevelWithMessage( - Level.SEVERE, - "Missing transaction: last transaction id = -1, next available transaction = 1"); + .hasSevereLogWithCause( + new IllegalStateException( + "Missing transaction: last txn id = -1, next available txn = 1")); } @Test diff --git a/core/src/test/java/google/registry/testing/LogsSubject.java b/core/src/test/java/google/registry/testing/LogsSubject.java index cfdf25644..e2c72f994 100644 --- a/core/src/test/java/google/registry/testing/LogsSubject.java +++ b/core/src/test/java/google/registry/testing/LogsSubject.java @@ -14,6 +14,7 @@ package google.registry.testing; +import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.truth.Truth.assertAbout; import static com.google.common.truth.Truth.assertWithMessage; @@ -26,6 +27,7 @@ import com.google.common.truth.StringSubject; import com.google.common.truth.Subject; import google.registry.testing.TruthChainer.Which; import java.util.List; +import java.util.Objects; import java.util.logging.Handler; import java.util.logging.Level; import java.util.logging.LogRecord; @@ -41,7 +43,13 @@ public class LogsSubject extends Subject { } private static final Correspondence CONTAINS_CORRESPONDENCE = - Correspondence.from((actual, expected) -> actual.contains(expected), "contains"); + Correspondence.from(String::contains, "contains"); + + private static final Correspondence THROWABLE_CORRESPONDENCE = + Correspondence.from( + (t1, t2) -> + t1.getClass().equals(t2.getClass()) && t1.getMessage().equals(t2.getMessage()), + "throwableEquivalent"); private List getMessagesAtLevel(Level level) { ImmutableList.Builder builder = new ImmutableList.Builder<>(); @@ -57,6 +65,19 @@ public class LogsSubject extends Subject { check("atLevel(%s)", level).that(getMessagesAtLevel(level)).isEmpty(); } + public void hasSevereLogWithCause(Throwable throwable) { + ImmutableList actualThrowables = + actual.getStoredLogRecords().stream() + .filter(record -> record.getLevel().equals(Level.SEVERE)) + .map(LogRecord::getThrown) + .filter(Objects::nonNull) + .collect(toImmutableList()); + check("atSevere") + .that(actualThrowables) + .comparingElementsUsing(THROWABLE_CORRESPONDENCE) + .contains(throwable); + } + public Which hasLogAtLevelWithMessage(Level level, String message) { List messagesAtLevel = getMessagesAtLevel(level); check("atLevel(%s)", level) diff --git a/core/src/test/java/google/registry/testing/ReplayExtension.java b/core/src/test/java/google/registry/testing/ReplayExtension.java index 234d7e9bb..425164b86 100644 --- a/core/src/test/java/google/registry/testing/ReplayExtension.java +++ b/core/src/test/java/google/registry/testing/ReplayExtension.java @@ -199,10 +199,7 @@ public class ReplayExtension implements BeforeEachCallback, AfterEachCallback { do { transactionBatch = sqlToDsReplicator.getTransactionBatch(); for (TransactionEntity txn : transactionBatch) { - if (sqlToDsReplicator.applyTransaction(txn)) { - throw new RuntimeException( - "Error when replaying to Datastore in tests; see logs for more details"); - } + sqlToDsReplicator.applyTransaction(txn); if (compare) { ofyTm().transact(() -> compareSqlTransaction(txn)); }