From 38c8e816900d70bca499ed2ed8353cfd9398967c Mon Sep 17 00:00:00 2001 From: gbrodman Date: Thu, 22 Jul 2021 13:59:28 -0400 Subject: [PATCH] Fix runtime issues with commit-log-to-SQL replay (#1240) * Fix runtime issues with commit-log-to-SQL replay - We now use a more intelligent prefix to narrow the listObjects search space in GCS. Otherwise, we're returning >30k objects which can take roughly 50 seconds. This results in a listObjects time of 1-3 seconds. - We now search hour by hour to efficiently make use of the prefixing. Basically, we keep searching for new files until we hit the current time or until we hit the overall replay timeout. - Dry-run only prints out the first hour's worth of files --- .../registry/backup/GcsDiffFileLister.java | 55 +++++++-- .../backup/ReplayCommitLogsToSqlAction.java | 108 ++++++++++-------- .../backup/GcsDiffFileListerTest.java | 26 +++-- .../ReplayCommitLogsToSqlActionTest.java | 31 ++--- .../backup/RestoreCommitLogsActionTest.java | 2 +- 5 files changed, 146 insertions(+), 76 deletions(-) diff --git a/core/src/main/java/google/registry/backup/GcsDiffFileLister.java b/core/src/main/java/google/registry/backup/GcsDiffFileLister.java index b315f61cc..49e6a8647 100644 --- a/core/src/main/java/google/registry/backup/GcsDiffFileLister.java +++ b/core/src/main/java/google/registry/backup/GcsDiffFileLister.java @@ -15,6 +15,7 @@ package google.registry.backup; import static com.google.common.base.Preconditions.checkState; +import static com.google.common.collect.ImmutableList.toImmutableList; import static google.registry.backup.BackupUtils.GcsMetadataKeys.LOWER_BOUND_CHECKPOINT; import static google.registry.backup.ExportCommitLogDiffAction.DIFF_FILE_PREFIX; import static google.registry.util.DateTimeUtils.START_OF_TIME; @@ -23,13 +24,13 @@ import static google.registry.util.DateTimeUtils.latestOf; import com.google.cloud.storage.BlobId; import com.google.cloud.storage.BlobInfo; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.flogger.FluentLogger; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.UncheckedExecutionException; -import dagger.Lazy; import google.registry.backup.BackupModule.Backups; import google.registry.gcs.GcsUtils; import java.io.IOException; @@ -39,6 +40,7 @@ import java.util.TreeMap; import java.util.concurrent.ScheduledExecutorService; import javax.annotation.Nullable; import javax.inject.Inject; +import javax.inject.Provider; import org.joda.time.DateTime; /** Utility class to list commit logs diff files stored on GCS. */ @@ -51,7 +53,7 @@ class GcsDiffFileLister { @Inject GcsUtils gcsUtils; - @Inject @Backups Lazy lazyExecutor; + @Inject @Backups Provider executorProvider; @Inject ScheduledExecutorService scheduledExecutorService; @Inject @@ -113,22 +115,27 @@ class GcsDiffFileLister { // (extracted from the filename) to its asynchronously-loaded metadata, keeping only files with // an upper checkpoint time > fromTime. TreeMap> upperBoundTimesToBlobInfo = new TreeMap<>(); - ImmutableList strippedFilenames; + String commitLogDiffPrefix = getCommitLogDiffPrefix(fromTime, toTime); + ImmutableList filenames; try { - strippedFilenames = gcsUtils.listFolderObjects(gcsBucket, DIFF_FILE_PREFIX); + filenames = + gcsUtils.listFolderObjects(gcsBucket, commitLogDiffPrefix).stream() + .map(s -> commitLogDiffPrefix + s) + .collect(toImmutableList()); } catch (IOException e) { throw new RuntimeException(e); } DateTime lastUpperBoundTime = START_OF_TIME; TreeMap sequence = new TreeMap<>(); + ListeningExecutorService executor = executorProvider.get(); try { - for (String strippedFilename : strippedFilenames) { - final String filename = DIFF_FILE_PREFIX + strippedFilename; + for (String filename : filenames) { + String strippedFilename = filename.replaceFirst(DIFF_FILE_PREFIX, ""); DateTime upperBoundTime = DateTime.parse(strippedFilename); if (isInRange(upperBoundTime, fromTime, toTime)) { upperBoundTimesToBlobInfo.put( - upperBoundTime, lazyExecutor.get().submit(() -> getBlobInfo(gcsBucket, filename))); + upperBoundTime, executor.submit(() -> getBlobInfo(gcsBucket, filename))); lastUpperBoundTime = latestOf(upperBoundTime, lastUpperBoundTime); } } @@ -173,7 +180,7 @@ class GcsDiffFileLister { "Unable to compute commit diff history, there are either gaps or forks in the history " + "file set. Check log for details."); } finally { - lazyExecutor.get().shutdown(); + executor.shutdown(); } logger.atInfo().log( @@ -198,4 +205,36 @@ class GcsDiffFileLister { private BlobInfo getBlobInfo(String gcsBucket, String filename) { return gcsUtils.getBlobInfo(BlobId.of(gcsBucket, filename)); } + + /** + * Returns a prefix guaranteed to cover all commit log diff files in the given range. + * + *

The listObjects call can be fairly slow if we search over many thousands or tens of + * thousands of files, so we restrict the search space. The commit logs have a file format of + * "commit_diff_until_2021-05-11T06:48:00.070Z" so we can often filter down as far as the hour. + * + *

Here, we get the longest prefix possible based on which fields (year, month, day, hour) the + * times in question have in common. + */ + @VisibleForTesting + static String getCommitLogDiffPrefix(DateTime from, @Nullable DateTime to) { + StringBuilder result = new StringBuilder(DIFF_FILE_PREFIX); + if (to == null || from.getYear() != to.getYear()) { + return result.toString(); + } + result.append(from.getYear()).append('-'); + if (from.getMonthOfYear() != to.getMonthOfYear()) { + return result.toString(); + } + result.append(String.format("%02d-", from.getMonthOfYear())); + if (from.getDayOfMonth() != to.getDayOfMonth()) { + return result.toString(); + } + result.append(String.format("%02dT", from.getDayOfMonth())); + if (from.getHourOfDay() != to.getHourOfDay()) { + return result.toString(); + } + result.append(String.format("%02d:", from.getHourOfDay())); + return result.toString(); + } } diff --git a/core/src/main/java/google/registry/backup/ReplayCommitLogsToSqlAction.java b/core/src/main/java/google/registry/backup/ReplayCommitLogsToSqlAction.java index 8bcb1c0c3..245eda28b 100644 --- a/core/src/main/java/google/registry/backup/ReplayCommitLogsToSqlAction.java +++ b/core/src/main/java/google/registry/backup/ReplayCommitLogsToSqlAction.java @@ -20,13 +20,15 @@ import static google.registry.backup.RestoreCommitLogsAction.DRY_RUN_PARAM; import static google.registry.model.ofy.EntityWritePriorities.getEntityPriority; import static google.registry.model.ofy.ObjectifyService.auditedOfy; import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; +import static google.registry.util.DateTimeUtils.isAtOrAfter; +import static google.registry.util.DateTimeUtils.isBeforeOrAt; import static javax.servlet.http.HttpServletResponse.SC_NO_CONTENT; +import static javax.servlet.http.HttpServletResponse.SC_OK; import static org.joda.time.Duration.standardHours; import com.google.appengine.api.datastore.Entity; import com.google.appengine.api.datastore.Key; import com.google.cloud.storage.BlobInfo; -import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; import com.google.common.flogger.FluentLogger; import google.registry.config.RegistryConfig.Config; @@ -120,26 +122,15 @@ public class ReplayCommitLogsToSqlAction implements Runnable { } try { logger.atInfo().log("Beginning replay of commit logs."); - ImmutableList commitLogFiles = getFilesToReplay(); + String resultMessage; if (dryRun) { - response.setStatus(HttpServletResponse.SC_OK); - ImmutableList filenames = - commitLogFiles.stream() - .limit(10) - .map(file -> file.getName()) - .collect(toImmutableList()); - String dryRunMessage = - "Running in dry-run mode; would have processed %d files. They are (limit 10):\n" - + Joiner.on('\n').join(filenames); - response.setPayload(dryRunMessage); - logger.atInfo().log(dryRunMessage); + resultMessage = executeDryRun(); } else { - replayFiles(commitLogFiles); - response.setStatus(HttpServletResponse.SC_OK); - String message = "ReplayCommitLogsToSqlAction completed successfully."; - response.setPayload(message); - logger.atInfo().log(message); + resultMessage = replayFiles(); } + response.setStatus(SC_OK); + response.setPayload(resultMessage); + logger.atInfo().log(resultMessage); } catch (Throwable t) { String message = "Errored out replaying files."; logger.atSevere().withCause(t).log(message); @@ -150,52 +141,77 @@ public class ReplayCommitLogsToSqlAction implements Runnable { } } - private ImmutableList getFilesToReplay() { + private String executeDryRun() { // Start at the first millisecond we haven't seen yet - DateTime fromTime = jpaTm().transact(() -> SqlReplayCheckpoint.get().plusMillis(1)); - logger.atInfo().log("Starting replay from: %s.", fromTime); - // If there's an inconsistent file set, this will throw IllegalStateException and the job - // will try later -- this is likely because an export hasn't finished yet. - ImmutableList commitLogFiles = - diffLister.listDiffFiles(gcsBucket, fromTime, /* current time */ null); - logger.atInfo().log("Found %d new commit log files to process.", commitLogFiles.size()); - return commitLogFiles; + DateTime searchStartTime = jpaTm().transact(() -> SqlReplayCheckpoint.get().plusMillis(1)); + // Search through the end of the hour + DateTime searchEndTime = + searchStartTime.withMinuteOfHour(59).withSecondOfMinute(59).withMillisOfSecond(999); + ImmutableList fileBatch = + diffLister.listDiffFiles(gcsBucket, searchStartTime, searchEndTime).stream() + .map(BlobInfo::getName) + .collect(toImmutableList()); + return String.format( + "Running in dry-run mode, the first set of commit log files processed would be from " + + "searching from %s to %s and would contain %d file(s). They are (limit 10): \n%s", + searchStartTime, + searchEndTime, + fileBatch.size(), + fileBatch.stream().limit(10).collect(toImmutableList())); } - private void replayFiles(ImmutableList commitLogFiles) { + private String replayFiles() { DateTime replayTimeoutTime = clock.nowUtc().plus(REPLAY_TIMEOUT_DURATION); - int processedFiles = 0; - for (BlobInfo metadata : commitLogFiles) { - // One transaction per GCS file - jpaTm().transact(() -> processFile(metadata)); - processedFiles++; - if (clock.nowUtc().isAfter(replayTimeoutTime)) { - logger.atInfo().log( - "Reached max execution time after replaying %d files, leaving %d files for next run.", - processedFiles, commitLogFiles.size() - processedFiles); - return; + DateTime searchStartTime = jpaTm().transact(() -> SqlReplayCheckpoint.get().plusMillis(1)); + int filesProcessed = 0; + // Starting from one millisecond after the last file we processed, search for and import files + // one hour at a time until we catch up to the current time or we hit the replay timeout (in + // which case the next run will pick up from where we leave off). + // + // We use hour-long batches because GCS supports filename prefix-based searches. + while (true) { + if (isAtOrAfter(clock.nowUtc(), replayTimeoutTime)) { + return String.format( + "Reached max execution time after replaying %d file(s).", filesProcessed); } + if (isBeforeOrAt(clock.nowUtc(), searchStartTime)) { + return String.format( + "Caught up to current time after replaying %d file(s).", filesProcessed); + } + // Search through the end of the hour + DateTime searchEndTime = + searchStartTime.withMinuteOfHour(59).withSecondOfMinute(59).withMillisOfSecond(999); + ImmutableList fileBatch = + diffLister.listDiffFiles(gcsBucket, searchStartTime, searchEndTime); + if (fileBatch.isEmpty()) { + logger.atInfo().log( + "No remaining files found in hour %s, continuing search in the next hour.", + searchStartTime.toString("yyyy-MM-dd HH")); + } + for (BlobInfo file : fileBatch) { + jpaTm().transact(() -> processFile(file)); + filesProcessed++; + if (clock.nowUtc().isAfter(replayTimeoutTime)) { + return String.format( + "Reached max execution time after replaying %d file(s).", filesProcessed); + } + } + searchStartTime = searchEndTime.plusMillis(1); } - logger.atInfo().log("Replayed %d commit log files to SQL successfully.", processedFiles); } private void processFile(BlobInfo metadata) { - logger.atInfo().log( - "Processing commit log file %s of size %d B.", metadata.getName(), metadata.getSize()); try (InputStream input = gcsUtils.openInputStream(metadata.getBlobId())) { // Load and process the Datastore transactions one at a time ImmutableList> allTransactions = CommitLogImports.loadEntitiesByTransaction(input); - logger.atInfo().log( - "Replaying %d transactions from commit log file %s.", - allTransactions.size(), metadata.getName()); allTransactions.forEach(this::replayTransaction); // if we succeeded, set the last-seen time DateTime checkpoint = DateTime.parse(metadata.getName().substring(DIFF_FILE_PREFIX.length())); SqlReplayCheckpoint.set(checkpoint); logger.atInfo().log( - "Replayed %d transactions from commit log file %s.", - allTransactions.size(), metadata.getName()); + "Replayed %d transactions from commit log file %s with size %d B.", + allTransactions.size(), metadata.getName(), metadata.getSize()); } catch (IOException e) { throw new RuntimeException( "Errored out while replaying commit log file " + metadata.getName(), e); diff --git a/core/src/test/java/google/registry/backup/GcsDiffFileListerTest.java b/core/src/test/java/google/registry/backup/GcsDiffFileListerTest.java index e35630c06..94ecf0a03 100644 --- a/core/src/test/java/google/registry/backup/GcsDiffFileListerTest.java +++ b/core/src/test/java/google/registry/backup/GcsDiffFileListerTest.java @@ -18,6 +18,7 @@ import static com.google.common.collect.Iterables.transform; import static com.google.common.truth.Truth.assertThat; import static google.registry.backup.BackupUtils.GcsMetadataKeys.LOWER_BOUND_CHECKPOINT; import static google.registry.backup.ExportCommitLogDiffAction.DIFF_FILE_PREFIX; +import static google.registry.backup.GcsDiffFileLister.getCommitLogDiffPrefix; import static org.joda.time.DateTimeZone.UTC; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.fail; @@ -61,7 +62,7 @@ public class GcsDiffFileListerTest { @BeforeEach void beforeEach() throws Exception { diffLister.gcsUtils = gcsUtils; - diffLister.lazyExecutor = MoreExecutors::newDirectExecutorService; + diffLister.executorProvider = MoreExecutors::newDirectExecutorService; diffLister.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); for (int i = 0; i < 5; i++) { addGcsFile(i, i + 1); @@ -189,12 +190,23 @@ public class GcsDiffFileListerTest { @Test void testList_toTimeSpecified() { - assertThat(listDiffFiles( - now.minusMinutes(4).minusSeconds(1), now.minusMinutes(2).plusSeconds(1))) - .containsExactly( - now.minusMinutes(4), - now.minusMinutes(3), - now.minusMinutes(2)) + assertThat( + listDiffFiles(now.minusMinutes(4).minusSeconds(1), now.minusMinutes(2).plusSeconds(1))) + .containsExactly(now.minusMinutes(4), now.minusMinutes(3), now.minusMinutes(2)) .inOrder(); } + + @Test + void testPrefix_lengthened() { + DateTime from = DateTime.parse("2021-05-11T06:48:00.070Z"); + assertThat(getCommitLogDiffPrefix(from, null)).isEqualTo("commit_diff_until_"); + assertThat(getCommitLogDiffPrefix(from, DateTime.parse("2021-07-01"))) + .isEqualTo("commit_diff_until_2021-"); + assertThat(getCommitLogDiffPrefix(from, DateTime.parse("2021-05-21"))) + .isEqualTo("commit_diff_until_2021-05-"); + assertThat(getCommitLogDiffPrefix(from, DateTime.parse("2021-05-11T09:48:00.070Z"))) + .isEqualTo("commit_diff_until_2021-05-11T"); + assertThat(getCommitLogDiffPrefix(from, DateTime.parse("2021-05-11T06:59:00.070Z"))) + .isEqualTo("commit_diff_until_2021-05-11T06:"); + } } diff --git a/core/src/test/java/google/registry/backup/ReplayCommitLogsToSqlActionTest.java b/core/src/test/java/google/registry/backup/ReplayCommitLogsToSqlActionTest.java index da8857665..24c980084 100644 --- a/core/src/test/java/google/registry/backup/ReplayCommitLogsToSqlActionTest.java +++ b/core/src/test/java/google/registry/backup/ReplayCommitLogsToSqlActionTest.java @@ -132,7 +132,7 @@ public class ReplayCommitLogsToSqlActionTest { action.gcsBucket = "gcs bucket"; action.diffLister = new GcsDiffFileLister(); action.diffLister.gcsUtils = gcsUtils; - action.diffLister.lazyExecutor = MoreExecutors::newDirectExecutorService; + action.diffLister.executorProvider = MoreExecutors::newDirectExecutorService; action.diffLister.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); ofyTm() .transact( @@ -200,7 +200,7 @@ public class ReplayCommitLogsToSqlActionTest { CommitLogMutation.create(manifest2Key, TestObject.create("f"))); jpaTm().transact(() -> SqlReplayCheckpoint.set(now.minusMinutes(1).minusMillis(1))); fakeClock.advanceOneMilli(); - runAndAssertSuccess(now); + runAndAssertSuccess(now, 2); assertExpectedIds("previous to keep", "b", "d", "e", "f"); } @@ -211,7 +211,7 @@ public class ReplayCommitLogsToSqlActionTest { saveDiffFileNotToRestore(gcsUtils, now.minusMinutes(1)); saveDiffFile(gcsUtils, createCheckpoint(now.minusMillis(2))); jpaTm().transact(() -> SqlReplayCheckpoint.set(now.minusMillis(1))); - runAndAssertSuccess(now.minusMillis(1)); + runAndAssertSuccess(now.minusMillis(1), 0); assertExpectedIds("previous to keep"); } @@ -235,8 +235,10 @@ public class ReplayCommitLogsToSqlActionTest { assertThat(response.getStatus()).isEqualTo(SC_OK); assertThat(response.getPayload()) .isEqualTo( - "Running in dry-run mode; would have processed %d files. They are (limit 10):\n" - + "commit_diff_until_1999-12-31T23:59:00.000Z"); + "Running in dry-run mode, the first set of commit log files processed would be from " + + "searching from 1999-12-31T23:59:00.000Z to 1999-12-31T23:59:59.999Z and would " + + "contain 1 file(s). They are (limit 10): \n" + + "[commit_diff_until_1999-12-31T23:59:00.000Z]"); } @Test @@ -253,7 +255,7 @@ public class ReplayCommitLogsToSqlActionTest { CommitLogManifest.create(bucketKey, now, null), CommitLogMutation.create(manifestKey, TestObject.create("a")), CommitLogMutation.create(manifestKey, TestObject.create("b"))); - runAndAssertSuccess(now.minusMinutes(1)); + runAndAssertSuccess(now.minusMinutes(1), 1); assertExpectedIds("previous to keep", "a", "b"); } @@ -275,7 +277,7 @@ public class ReplayCommitLogsToSqlActionTest { getBucketKey(1), now, ImmutableSet.of(Key.create(TestObject.create("previous to delete"))))); - runAndAssertSuccess(now.minusMinutes(1)); + runAndAssertSuccess(now.minusMinutes(1), 1); assertExpectedIds("previous to keep"); } @@ -347,7 +349,7 @@ public class ReplayCommitLogsToSqlActionTest { domainMutation, contactMutation); - runAndAssertSuccess(now.minusMinutes(1)); + runAndAssertSuccess(now.minusMinutes(1), 1); // Verify two things: // 1. that the contact insert occurred before the domain insert (necessary for FK ordering) // even though the domain came first in the file @@ -390,7 +392,7 @@ public class ReplayCommitLogsToSqlActionTest { CommitLogManifest.create( getBucketKey(1), now.minusMinutes(1).plusMillis(1), ImmutableSet.of()), contactMutation); - runAndAssertSuccess(now.minusMinutes(1).plusMillis(1)); + runAndAssertSuccess(now.minusMinutes(1).plusMillis(1), 1); // Verify that the delete occurred first (because it was in the first transaction) even though // deletes have higher weight ArgumentCaptor putCaptor = ArgumentCaptor.forClass(Object.class); @@ -435,7 +437,7 @@ public class ReplayCommitLogsToSqlActionTest { throw new RuntimeException(e); } }); - runAndAssertSuccess(now.minusMinutes(1)); + runAndAssertSuccess(now.minusMinutes(1), 1); // jpaTm()::put should only have been called with the checkpoint verify(spy, times(2)).put(any(SqlReplayCheckpoint.class)); verify(spy, times(2)).put(any()); @@ -460,7 +462,7 @@ public class ReplayCommitLogsToSqlActionTest { // one object only exists in Datastore, one is dually-written (so isn't replicated) ImmutableSet.of(getCrossTldKey(), claimsListKey))); - runAndAssertSuccess(now.minusMinutes(1)); + runAndAssertSuccess(now.minusMinutes(1), 1); verify(spy, times(0)).delete(any(VKey.class)); } @@ -503,7 +505,7 @@ public class ReplayCommitLogsToSqlActionTest { createCheckpoint(now.minusMinutes(1)), CommitLogManifest.create(bucketKey, now, null), CommitLogMutation.create(manifestKey, TestObject.create("a"))); - runAndAssertSuccess(now.minusMinutes(1)); + runAndAssertSuccess(now.minusMinutes(1), 1); assertThat(TestObject.beforeSqlSaveCallCount).isEqualTo(1); } @@ -522,11 +524,12 @@ public class ReplayCommitLogsToSqlActionTest { assertThat(TestObject.beforeSqlDeleteCallCount).isEqualTo(1); } - private void runAndAssertSuccess(DateTime expectedCheckpointTime) { + private void runAndAssertSuccess(DateTime expectedCheckpointTime, int numFiles) { action.run(); assertThat(response.getStatus()).isEqualTo(SC_OK); assertThat(response.getPayload()) - .isEqualTo("ReplayCommitLogsToSqlAction completed successfully."); + .isEqualTo( + String.format("Caught up to current time after replaying %d file(s).", numFiles)); assertThat(jpaTm().transact(SqlReplayCheckpoint::get)).isEqualTo(expectedCheckpointTime); } diff --git a/core/src/test/java/google/registry/backup/RestoreCommitLogsActionTest.java b/core/src/test/java/google/registry/backup/RestoreCommitLogsActionTest.java index 6a0aa15c8..0e2733f5d 100644 --- a/core/src/test/java/google/registry/backup/RestoreCommitLogsActionTest.java +++ b/core/src/test/java/google/registry/backup/RestoreCommitLogsActionTest.java @@ -88,7 +88,7 @@ public class RestoreCommitLogsActionTest { action.gcsBucketOverride = Optional.empty(); action.diffLister = new GcsDiffFileLister(); action.diffLister.gcsUtils = gcsUtils; - action.diffLister.lazyExecutor = MoreExecutors::newDirectExecutorService; + action.diffLister.executorProvider = MoreExecutors::newDirectExecutorService; action.diffLister.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); }