diff --git a/core/src/main/java/google/registry/backup/ReplayCommitLogsToSqlAction.java b/core/src/main/java/google/registry/backup/ReplayCommitLogsToSqlAction.java index b2870d6bc..96c57949a 100644 --- a/core/src/main/java/google/registry/backup/ReplayCommitLogsToSqlAction.java +++ b/core/src/main/java/google/registry/backup/ReplayCommitLogsToSqlAction.java @@ -58,6 +58,7 @@ import javax.inject.Inject; import javax.servlet.http.HttpServletResponse; import org.joda.time.DateTime; import org.joda.time.Duration; +import org.joda.time.Seconds; /** Action that replays commit logs to Cloud SQL to keep it up to date. */ @Action( @@ -97,7 +98,8 @@ public class ReplayCommitLogsToSqlAction implements Runnable { @Override public void run() { - MigrationState state = DatabaseMigrationStateSchedule.getValueAtTime(clock.nowUtc()); + DateTime startTime = clock.nowUtc(); + MigrationState state = DatabaseMigrationStateSchedule.getValueAtTime(startTime); if (!state.getReplayDirection().equals(ReplayDirection.DATASTORE_TO_SQL)) { String message = String.format( @@ -126,7 +128,7 @@ public class ReplayCommitLogsToSqlAction implements Runnable { if (dryRun) { resultMessage = executeDryRun(); } else { - resultMessage = replayFiles(); + resultMessage = replayFiles(startTime); } response.setStatus(SC_OK); response.setPayload(resultMessage); @@ -160,10 +162,11 @@ public class ReplayCommitLogsToSqlAction implements Runnable { fileBatch.stream().limit(10).collect(toImmutableList())); } - private String replayFiles() { - DateTime replayTimeoutTime = clock.nowUtc().plus(REPLAY_TIMEOUT_DURATION); + private String replayFiles(DateTime startTime) { + DateTime replayTimeoutTime = startTime.plus(REPLAY_TIMEOUT_DURATION); DateTime searchStartTime = jpaTm().transact(() -> SqlReplayCheckpoint.get().plusMillis(1)); int filesProcessed = 0; + int transactionsProcessed = 0; // Starting from one millisecond after the last file we processed, search for and import files // one hour at a time until we catch up to the current time or we hit the replay timeout (in // which case the next run will pick up from where we leave off). @@ -171,12 +174,12 @@ public class ReplayCommitLogsToSqlAction implements Runnable { // We use hour-long batches because GCS supports filename prefix-based searches. while (true) { if (isAtOrAfter(clock.nowUtc(), replayTimeoutTime)) { - return String.format( - "Reached max execution time after replaying %d file(s).", filesProcessed); + return createResponseString( + "Reached max execution time", startTime, filesProcessed, transactionsProcessed); } if (isBeforeOrAt(clock.nowUtc(), searchStartTime)) { - return String.format( - "Caught up to current time after replaying %d file(s).", filesProcessed); + return createResponseString( + "Caught up to current time", startTime, filesProcessed, transactionsProcessed); } // Search through the end of the hour DateTime searchEndTime = @@ -189,18 +192,32 @@ public class ReplayCommitLogsToSqlAction implements Runnable { searchStartTime.toString("yyyy-MM-dd HH")); } for (BlobInfo file : fileBatch) { - processFile(file); + transactionsProcessed += processFile(file); filesProcessed++; if (clock.nowUtc().isAfter(replayTimeoutTime)) { - return String.format( - "Reached max execution time after replaying %d file(s).", filesProcessed); + return createResponseString( + "Reached max execution time", startTime, filesProcessed, transactionsProcessed); } } searchStartTime = searchEndTime.plusMillis(1); } } - private void processFile(BlobInfo metadata) { + private String createResponseString( + String msg, DateTime startTime, int filesProcessed, int transactionsProcessed) { + double tps = + (double) transactionsProcessed + / (double) Seconds.secondsBetween(startTime, clock.nowUtc()).getSeconds(); + return String.format( + "%s after replaying %d file(s) containing %d total transaction(s) (%.2f tx/s).", + msg, filesProcessed, transactionsProcessed, tps); + } + + /** + * Replays the commit logs in the given commit log file and returns the number of transactions + * committed. + */ + private int processFile(BlobInfo metadata) { try (InputStream input = gcsUtils.openInputStream(metadata.getBlobId())) { // Load and process the Datastore transactions one at a time ImmutableList> allTransactions = @@ -213,6 +230,7 @@ public class ReplayCommitLogsToSqlAction implements Runnable { logger.atInfo().log( "Replayed %d transactions from commit log file %s with size %d B.", allTransactions.size(), metadata.getName(), metadata.getSize()); + return allTransactions.size(); } catch (IOException e) { throw new RuntimeException( "Errored out while replaying commit log file " + metadata.getName(), e); diff --git a/core/src/main/java/google/registry/env/sandbox/default/WEB-INF/cron.xml b/core/src/main/java/google/registry/env/sandbox/default/WEB-INF/cron.xml index 1ef5bd64e..0a3ba76b4 100644 --- a/core/src/main/java/google/registry/env/sandbox/default/WEB-INF/cron.xml +++ b/core/src/main/java/google/registry/env/sandbox/default/WEB-INF/cron.xml @@ -230,7 +230,7 @@ - + Replays recent commit logs from Datastore to the SQL secondary backend. diff --git a/core/src/test/java/google/registry/backup/ReplayCommitLogsToSqlActionTest.java b/core/src/test/java/google/registry/backup/ReplayCommitLogsToSqlActionTest.java index eb0ed1647..0bf691462 100644 --- a/core/src/test/java/google/registry/backup/ReplayCommitLogsToSqlActionTest.java +++ b/core/src/test/java/google/registry/backup/ReplayCommitLogsToSqlActionTest.java @@ -201,7 +201,7 @@ public class ReplayCommitLogsToSqlActionTest { CommitLogMutation.create(manifest2Key, TestObject.create("f"))); jpaTm().transact(() -> SqlReplayCheckpoint.set(now.minusMinutes(1).minusMillis(1))); fakeClock.advanceOneMilli(); - runAndAssertSuccess(now, 2); + runAndAssertSuccess(now, 2, 3); assertExpectedIds("previous to keep", "b", "d", "e", "f"); } @@ -212,7 +212,7 @@ public class ReplayCommitLogsToSqlActionTest { saveDiffFileNotToRestore(gcsUtils, now.minusMinutes(1)); saveDiffFile(gcsUtils, createCheckpoint(now.minusMillis(2))); jpaTm().transact(() -> SqlReplayCheckpoint.set(now.minusMillis(1))); - runAndAssertSuccess(now.minusMillis(1), 0); + runAndAssertSuccess(now.minusMillis(1), 0, 0); assertExpectedIds("previous to keep"); } @@ -256,7 +256,7 @@ public class ReplayCommitLogsToSqlActionTest { CommitLogManifest.create(bucketKey, now, null), CommitLogMutation.create(manifestKey, TestObject.create("a")), CommitLogMutation.create(manifestKey, TestObject.create("b"))); - runAndAssertSuccess(now.minusMinutes(1), 1); + runAndAssertSuccess(now.minusMinutes(1), 1, 1); assertExpectedIds("previous to keep", "a", "b"); } @@ -278,7 +278,7 @@ public class ReplayCommitLogsToSqlActionTest { getBucketKey(1), now, ImmutableSet.of(Key.create(TestObject.create("previous to delete"))))); - runAndAssertSuccess(now.minusMinutes(1), 1); + runAndAssertSuccess(now.minusMinutes(1), 1, 1); assertExpectedIds("previous to keep"); } @@ -350,7 +350,7 @@ public class ReplayCommitLogsToSqlActionTest { domainMutation, contactMutation); - runAndAssertSuccess(now.minusMinutes(1), 1); + runAndAssertSuccess(now.minusMinutes(1), 1, 1); // Verify two things: // 1. that the contact insert occurred before the domain insert (necessary for FK ordering) // even though the domain came first in the file @@ -393,7 +393,7 @@ public class ReplayCommitLogsToSqlActionTest { CommitLogManifest.create( getBucketKey(1), now.minusMinutes(1).plusMillis(1), ImmutableSet.of()), contactMutation); - runAndAssertSuccess(now.minusMinutes(1).plusMillis(1), 1); + runAndAssertSuccess(now.minusMinutes(1).plusMillis(1), 1, 2); // Verify that the delete occurred first (because it was in the first transaction) even though // deletes have higher weight ArgumentCaptor putCaptor = ArgumentCaptor.forClass(Object.class); @@ -438,7 +438,7 @@ public class ReplayCommitLogsToSqlActionTest { throw new RuntimeException(e); } }); - runAndAssertSuccess(now.minusMinutes(1), 1); + runAndAssertSuccess(now.minusMinutes(1), 1, 1); // jpaTm()::put should only have been called with the checkpoint verify(spy, times(2)).put(any(SqlReplayCheckpoint.class)); verify(spy, times(2)).put(any()); @@ -463,7 +463,7 @@ public class ReplayCommitLogsToSqlActionTest { // one object only exists in Datastore, one is dually-written (so isn't replicated) ImmutableSet.of(getCrossTldKey(), claimsListKey))); - runAndAssertSuccess(now.minusMinutes(1), 1); + runAndAssertSuccess(now.minusMinutes(1), 1, 1); verify(spy, times(0)).delete(any(VKey.class)); } @@ -506,7 +506,7 @@ public class ReplayCommitLogsToSqlActionTest { createCheckpoint(now.minusMinutes(1)), CommitLogManifest.create(bucketKey, now, null), CommitLogMutation.create(manifestKey, TestObject.create("a"))); - runAndAssertSuccess(now.minusMinutes(1), 1); + runAndAssertSuccess(now.minusMinutes(1), 1, 1); assertThat(TestObject.beforeSqlSaveCallCount).isEqualTo(1); } @@ -544,7 +544,7 @@ public class ReplayCommitLogsToSqlActionTest { createCheckpoint(now.minusMinutes(1)), CommitLogManifest.create( getBucketKey(1), now.minusMinutes(3), ImmutableSet.of(Key.create(domain)))); - runAndAssertSuccess(now.minusMinutes(1), 1); + runAndAssertSuccess(now.minusMinutes(1), 1, 1); jpaTm() .transact( @@ -596,15 +596,19 @@ public class ReplayCommitLogsToSqlActionTest { domainWithoutDsDataMutation, CommitLogManifest.create(getBucketKey(1), now.minusMinutes(2), ImmutableSet.of()), domainWithOriginalDsDataMutation); - runAndAssertSuccess(now.minusMinutes(1), 1); + runAndAssertSuccess(now.minusMinutes(1), 1, 2); } - private void runAndAssertSuccess(DateTime expectedCheckpointTime, int numFiles) { + private void runAndAssertSuccess( + DateTime expectedCheckpointTime, int numFiles, int numTransactions) { action.run(); assertThat(response.getStatus()).isEqualTo(SC_OK); assertThat(response.getPayload()) - .isEqualTo( - String.format("Caught up to current time after replaying %d file(s).", numFiles)); + .startsWith( + String.format( + "Caught up to current time after replaying %d file(s) containing %d total" + + " transaction(s)", + numFiles, numTransactions)); assertThat(jpaTm().transact(SqlReplayCheckpoint::get)).isEqualTo(expectedCheckpointTime); }