diff --git a/core/src/main/java/google/registry/backup/GcsDiffFileLister.java b/core/src/main/java/google/registry/backup/GcsDiffFileLister.java index b315f61cc..49e6a8647 100644 --- a/core/src/main/java/google/registry/backup/GcsDiffFileLister.java +++ b/core/src/main/java/google/registry/backup/GcsDiffFileLister.java @@ -15,6 +15,7 @@ package google.registry.backup; import static com.google.common.base.Preconditions.checkState; +import static com.google.common.collect.ImmutableList.toImmutableList; import static google.registry.backup.BackupUtils.GcsMetadataKeys.LOWER_BOUND_CHECKPOINT; import static google.registry.backup.ExportCommitLogDiffAction.DIFF_FILE_PREFIX; import static google.registry.util.DateTimeUtils.START_OF_TIME; @@ -23,13 +24,13 @@ import static google.registry.util.DateTimeUtils.latestOf; import com.google.cloud.storage.BlobId; import com.google.cloud.storage.BlobInfo; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.flogger.FluentLogger; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.UncheckedExecutionException; -import dagger.Lazy; import google.registry.backup.BackupModule.Backups; import google.registry.gcs.GcsUtils; import java.io.IOException; @@ -39,6 +40,7 @@ import java.util.TreeMap; import java.util.concurrent.ScheduledExecutorService; import javax.annotation.Nullable; import javax.inject.Inject; +import javax.inject.Provider; import org.joda.time.DateTime; /** Utility class to list commit logs diff files stored on GCS. */ @@ -51,7 +53,7 @@ class GcsDiffFileLister { @Inject GcsUtils gcsUtils; - @Inject @Backups Lazy lazyExecutor; + @Inject @Backups Provider executorProvider; @Inject ScheduledExecutorService scheduledExecutorService; @Inject @@ -113,22 +115,27 @@ class GcsDiffFileLister { // (extracted from the filename) to its asynchronously-loaded metadata, keeping only files with // an upper checkpoint time > fromTime. TreeMap> upperBoundTimesToBlobInfo = new TreeMap<>(); - ImmutableList strippedFilenames; + String commitLogDiffPrefix = getCommitLogDiffPrefix(fromTime, toTime); + ImmutableList filenames; try { - strippedFilenames = gcsUtils.listFolderObjects(gcsBucket, DIFF_FILE_PREFIX); + filenames = + gcsUtils.listFolderObjects(gcsBucket, commitLogDiffPrefix).stream() + .map(s -> commitLogDiffPrefix + s) + .collect(toImmutableList()); } catch (IOException e) { throw new RuntimeException(e); } DateTime lastUpperBoundTime = START_OF_TIME; TreeMap sequence = new TreeMap<>(); + ListeningExecutorService executor = executorProvider.get(); try { - for (String strippedFilename : strippedFilenames) { - final String filename = DIFF_FILE_PREFIX + strippedFilename; + for (String filename : filenames) { + String strippedFilename = filename.replaceFirst(DIFF_FILE_PREFIX, ""); DateTime upperBoundTime = DateTime.parse(strippedFilename); if (isInRange(upperBoundTime, fromTime, toTime)) { upperBoundTimesToBlobInfo.put( - upperBoundTime, lazyExecutor.get().submit(() -> getBlobInfo(gcsBucket, filename))); + upperBoundTime, executor.submit(() -> getBlobInfo(gcsBucket, filename))); lastUpperBoundTime = latestOf(upperBoundTime, lastUpperBoundTime); } } @@ -173,7 +180,7 @@ class GcsDiffFileLister { "Unable to compute commit diff history, there are either gaps or forks in the history " + "file set. Check log for details."); } finally { - lazyExecutor.get().shutdown(); + executor.shutdown(); } logger.atInfo().log( @@ -198,4 +205,36 @@ class GcsDiffFileLister { private BlobInfo getBlobInfo(String gcsBucket, String filename) { return gcsUtils.getBlobInfo(BlobId.of(gcsBucket, filename)); } + + /** + * Returns a prefix guaranteed to cover all commit log diff files in the given range. + * + *

The listObjects call can be fairly slow if we search over many thousands or tens of + * thousands of files, so we restrict the search space. The commit logs have a file format of + * "commit_diff_until_2021-05-11T06:48:00.070Z" so we can often filter down as far as the hour. + * + *

Here, we get the longest prefix possible based on which fields (year, month, day, hour) the + * times in question have in common. + */ + @VisibleForTesting + static String getCommitLogDiffPrefix(DateTime from, @Nullable DateTime to) { + StringBuilder result = new StringBuilder(DIFF_FILE_PREFIX); + if (to == null || from.getYear() != to.getYear()) { + return result.toString(); + } + result.append(from.getYear()).append('-'); + if (from.getMonthOfYear() != to.getMonthOfYear()) { + return result.toString(); + } + result.append(String.format("%02d-", from.getMonthOfYear())); + if (from.getDayOfMonth() != to.getDayOfMonth()) { + return result.toString(); + } + result.append(String.format("%02dT", from.getDayOfMonth())); + if (from.getHourOfDay() != to.getHourOfDay()) { + return result.toString(); + } + result.append(String.format("%02d:", from.getHourOfDay())); + return result.toString(); + } } diff --git a/core/src/main/java/google/registry/backup/ReplayCommitLogsToSqlAction.java b/core/src/main/java/google/registry/backup/ReplayCommitLogsToSqlAction.java index 8bcb1c0c3..245eda28b 100644 --- a/core/src/main/java/google/registry/backup/ReplayCommitLogsToSqlAction.java +++ b/core/src/main/java/google/registry/backup/ReplayCommitLogsToSqlAction.java @@ -20,13 +20,15 @@ import static google.registry.backup.RestoreCommitLogsAction.DRY_RUN_PARAM; import static google.registry.model.ofy.EntityWritePriorities.getEntityPriority; import static google.registry.model.ofy.ObjectifyService.auditedOfy; import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; +import static google.registry.util.DateTimeUtils.isAtOrAfter; +import static google.registry.util.DateTimeUtils.isBeforeOrAt; import static javax.servlet.http.HttpServletResponse.SC_NO_CONTENT; +import static javax.servlet.http.HttpServletResponse.SC_OK; import static org.joda.time.Duration.standardHours; import com.google.appengine.api.datastore.Entity; import com.google.appengine.api.datastore.Key; import com.google.cloud.storage.BlobInfo; -import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; import com.google.common.flogger.FluentLogger; import google.registry.config.RegistryConfig.Config; @@ -120,26 +122,15 @@ public class ReplayCommitLogsToSqlAction implements Runnable { } try { logger.atInfo().log("Beginning replay of commit logs."); - ImmutableList commitLogFiles = getFilesToReplay(); + String resultMessage; if (dryRun) { - response.setStatus(HttpServletResponse.SC_OK); - ImmutableList filenames = - commitLogFiles.stream() - .limit(10) - .map(file -> file.getName()) - .collect(toImmutableList()); - String dryRunMessage = - "Running in dry-run mode; would have processed %d files. They are (limit 10):\n" - + Joiner.on('\n').join(filenames); - response.setPayload(dryRunMessage); - logger.atInfo().log(dryRunMessage); + resultMessage = executeDryRun(); } else { - replayFiles(commitLogFiles); - response.setStatus(HttpServletResponse.SC_OK); - String message = "ReplayCommitLogsToSqlAction completed successfully."; - response.setPayload(message); - logger.atInfo().log(message); + resultMessage = replayFiles(); } + response.setStatus(SC_OK); + response.setPayload(resultMessage); + logger.atInfo().log(resultMessage); } catch (Throwable t) { String message = "Errored out replaying files."; logger.atSevere().withCause(t).log(message); @@ -150,52 +141,77 @@ public class ReplayCommitLogsToSqlAction implements Runnable { } } - private ImmutableList getFilesToReplay() { + private String executeDryRun() { // Start at the first millisecond we haven't seen yet - DateTime fromTime = jpaTm().transact(() -> SqlReplayCheckpoint.get().plusMillis(1)); - logger.atInfo().log("Starting replay from: %s.", fromTime); - // If there's an inconsistent file set, this will throw IllegalStateException and the job - // will try later -- this is likely because an export hasn't finished yet. - ImmutableList commitLogFiles = - diffLister.listDiffFiles(gcsBucket, fromTime, /* current time */ null); - logger.atInfo().log("Found %d new commit log files to process.", commitLogFiles.size()); - return commitLogFiles; + DateTime searchStartTime = jpaTm().transact(() -> SqlReplayCheckpoint.get().plusMillis(1)); + // Search through the end of the hour + DateTime searchEndTime = + searchStartTime.withMinuteOfHour(59).withSecondOfMinute(59).withMillisOfSecond(999); + ImmutableList fileBatch = + diffLister.listDiffFiles(gcsBucket, searchStartTime, searchEndTime).stream() + .map(BlobInfo::getName) + .collect(toImmutableList()); + return String.format( + "Running in dry-run mode, the first set of commit log files processed would be from " + + "searching from %s to %s and would contain %d file(s). They are (limit 10): \n%s", + searchStartTime, + searchEndTime, + fileBatch.size(), + fileBatch.stream().limit(10).collect(toImmutableList())); } - private void replayFiles(ImmutableList commitLogFiles) { + private String replayFiles() { DateTime replayTimeoutTime = clock.nowUtc().plus(REPLAY_TIMEOUT_DURATION); - int processedFiles = 0; - for (BlobInfo metadata : commitLogFiles) { - // One transaction per GCS file - jpaTm().transact(() -> processFile(metadata)); - processedFiles++; - if (clock.nowUtc().isAfter(replayTimeoutTime)) { - logger.atInfo().log( - "Reached max execution time after replaying %d files, leaving %d files for next run.", - processedFiles, commitLogFiles.size() - processedFiles); - return; + DateTime searchStartTime = jpaTm().transact(() -> SqlReplayCheckpoint.get().plusMillis(1)); + int filesProcessed = 0; + // Starting from one millisecond after the last file we processed, search for and import files + // one hour at a time until we catch up to the current time or we hit the replay timeout (in + // which case the next run will pick up from where we leave off). + // + // We use hour-long batches because GCS supports filename prefix-based searches. + while (true) { + if (isAtOrAfter(clock.nowUtc(), replayTimeoutTime)) { + return String.format( + "Reached max execution time after replaying %d file(s).", filesProcessed); } + if (isBeforeOrAt(clock.nowUtc(), searchStartTime)) { + return String.format( + "Caught up to current time after replaying %d file(s).", filesProcessed); + } + // Search through the end of the hour + DateTime searchEndTime = + searchStartTime.withMinuteOfHour(59).withSecondOfMinute(59).withMillisOfSecond(999); + ImmutableList fileBatch = + diffLister.listDiffFiles(gcsBucket, searchStartTime, searchEndTime); + if (fileBatch.isEmpty()) { + logger.atInfo().log( + "No remaining files found in hour %s, continuing search in the next hour.", + searchStartTime.toString("yyyy-MM-dd HH")); + } + for (BlobInfo file : fileBatch) { + jpaTm().transact(() -> processFile(file)); + filesProcessed++; + if (clock.nowUtc().isAfter(replayTimeoutTime)) { + return String.format( + "Reached max execution time after replaying %d file(s).", filesProcessed); + } + } + searchStartTime = searchEndTime.plusMillis(1); } - logger.atInfo().log("Replayed %d commit log files to SQL successfully.", processedFiles); } private void processFile(BlobInfo metadata) { - logger.atInfo().log( - "Processing commit log file %s of size %d B.", metadata.getName(), metadata.getSize()); try (InputStream input = gcsUtils.openInputStream(metadata.getBlobId())) { // Load and process the Datastore transactions one at a time ImmutableList> allTransactions = CommitLogImports.loadEntitiesByTransaction(input); - logger.atInfo().log( - "Replaying %d transactions from commit log file %s.", - allTransactions.size(), metadata.getName()); allTransactions.forEach(this::replayTransaction); // if we succeeded, set the last-seen time DateTime checkpoint = DateTime.parse(metadata.getName().substring(DIFF_FILE_PREFIX.length())); SqlReplayCheckpoint.set(checkpoint); logger.atInfo().log( - "Replayed %d transactions from commit log file %s.", - allTransactions.size(), metadata.getName()); + "Replayed %d transactions from commit log file %s with size %d B.", + allTransactions.size(), metadata.getName(), metadata.getSize()); } catch (IOException e) { throw new RuntimeException( "Errored out while replaying commit log file " + metadata.getName(), e); diff --git a/core/src/test/java/google/registry/backup/GcsDiffFileListerTest.java b/core/src/test/java/google/registry/backup/GcsDiffFileListerTest.java index e35630c06..94ecf0a03 100644 --- a/core/src/test/java/google/registry/backup/GcsDiffFileListerTest.java +++ b/core/src/test/java/google/registry/backup/GcsDiffFileListerTest.java @@ -18,6 +18,7 @@ import static com.google.common.collect.Iterables.transform; import static com.google.common.truth.Truth.assertThat; import static google.registry.backup.BackupUtils.GcsMetadataKeys.LOWER_BOUND_CHECKPOINT; import static google.registry.backup.ExportCommitLogDiffAction.DIFF_FILE_PREFIX; +import static google.registry.backup.GcsDiffFileLister.getCommitLogDiffPrefix; import static org.joda.time.DateTimeZone.UTC; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.fail; @@ -61,7 +62,7 @@ public class GcsDiffFileListerTest { @BeforeEach void beforeEach() throws Exception { diffLister.gcsUtils = gcsUtils; - diffLister.lazyExecutor = MoreExecutors::newDirectExecutorService; + diffLister.executorProvider = MoreExecutors::newDirectExecutorService; diffLister.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); for (int i = 0; i < 5; i++) { addGcsFile(i, i + 1); @@ -189,12 +190,23 @@ public class GcsDiffFileListerTest { @Test void testList_toTimeSpecified() { - assertThat(listDiffFiles( - now.minusMinutes(4).minusSeconds(1), now.minusMinutes(2).plusSeconds(1))) - .containsExactly( - now.minusMinutes(4), - now.minusMinutes(3), - now.minusMinutes(2)) + assertThat( + listDiffFiles(now.minusMinutes(4).minusSeconds(1), now.minusMinutes(2).plusSeconds(1))) + .containsExactly(now.minusMinutes(4), now.minusMinutes(3), now.minusMinutes(2)) .inOrder(); } + + @Test + void testPrefix_lengthened() { + DateTime from = DateTime.parse("2021-05-11T06:48:00.070Z"); + assertThat(getCommitLogDiffPrefix(from, null)).isEqualTo("commit_diff_until_"); + assertThat(getCommitLogDiffPrefix(from, DateTime.parse("2021-07-01"))) + .isEqualTo("commit_diff_until_2021-"); + assertThat(getCommitLogDiffPrefix(from, DateTime.parse("2021-05-21"))) + .isEqualTo("commit_diff_until_2021-05-"); + assertThat(getCommitLogDiffPrefix(from, DateTime.parse("2021-05-11T09:48:00.070Z"))) + .isEqualTo("commit_diff_until_2021-05-11T"); + assertThat(getCommitLogDiffPrefix(from, DateTime.parse("2021-05-11T06:59:00.070Z"))) + .isEqualTo("commit_diff_until_2021-05-11T06:"); + } } diff --git a/core/src/test/java/google/registry/backup/ReplayCommitLogsToSqlActionTest.java b/core/src/test/java/google/registry/backup/ReplayCommitLogsToSqlActionTest.java index da8857665..24c980084 100644 --- a/core/src/test/java/google/registry/backup/ReplayCommitLogsToSqlActionTest.java +++ b/core/src/test/java/google/registry/backup/ReplayCommitLogsToSqlActionTest.java @@ -132,7 +132,7 @@ public class ReplayCommitLogsToSqlActionTest { action.gcsBucket = "gcs bucket"; action.diffLister = new GcsDiffFileLister(); action.diffLister.gcsUtils = gcsUtils; - action.diffLister.lazyExecutor = MoreExecutors::newDirectExecutorService; + action.diffLister.executorProvider = MoreExecutors::newDirectExecutorService; action.diffLister.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); ofyTm() .transact( @@ -200,7 +200,7 @@ public class ReplayCommitLogsToSqlActionTest { CommitLogMutation.create(manifest2Key, TestObject.create("f"))); jpaTm().transact(() -> SqlReplayCheckpoint.set(now.minusMinutes(1).minusMillis(1))); fakeClock.advanceOneMilli(); - runAndAssertSuccess(now); + runAndAssertSuccess(now, 2); assertExpectedIds("previous to keep", "b", "d", "e", "f"); } @@ -211,7 +211,7 @@ public class ReplayCommitLogsToSqlActionTest { saveDiffFileNotToRestore(gcsUtils, now.minusMinutes(1)); saveDiffFile(gcsUtils, createCheckpoint(now.minusMillis(2))); jpaTm().transact(() -> SqlReplayCheckpoint.set(now.minusMillis(1))); - runAndAssertSuccess(now.minusMillis(1)); + runAndAssertSuccess(now.minusMillis(1), 0); assertExpectedIds("previous to keep"); } @@ -235,8 +235,10 @@ public class ReplayCommitLogsToSqlActionTest { assertThat(response.getStatus()).isEqualTo(SC_OK); assertThat(response.getPayload()) .isEqualTo( - "Running in dry-run mode; would have processed %d files. They are (limit 10):\n" - + "commit_diff_until_1999-12-31T23:59:00.000Z"); + "Running in dry-run mode, the first set of commit log files processed would be from " + + "searching from 1999-12-31T23:59:00.000Z to 1999-12-31T23:59:59.999Z and would " + + "contain 1 file(s). They are (limit 10): \n" + + "[commit_diff_until_1999-12-31T23:59:00.000Z]"); } @Test @@ -253,7 +255,7 @@ public class ReplayCommitLogsToSqlActionTest { CommitLogManifest.create(bucketKey, now, null), CommitLogMutation.create(manifestKey, TestObject.create("a")), CommitLogMutation.create(manifestKey, TestObject.create("b"))); - runAndAssertSuccess(now.minusMinutes(1)); + runAndAssertSuccess(now.minusMinutes(1), 1); assertExpectedIds("previous to keep", "a", "b"); } @@ -275,7 +277,7 @@ public class ReplayCommitLogsToSqlActionTest { getBucketKey(1), now, ImmutableSet.of(Key.create(TestObject.create("previous to delete"))))); - runAndAssertSuccess(now.minusMinutes(1)); + runAndAssertSuccess(now.minusMinutes(1), 1); assertExpectedIds("previous to keep"); } @@ -347,7 +349,7 @@ public class ReplayCommitLogsToSqlActionTest { domainMutation, contactMutation); - runAndAssertSuccess(now.minusMinutes(1)); + runAndAssertSuccess(now.minusMinutes(1), 1); // Verify two things: // 1. that the contact insert occurred before the domain insert (necessary for FK ordering) // even though the domain came first in the file @@ -390,7 +392,7 @@ public class ReplayCommitLogsToSqlActionTest { CommitLogManifest.create( getBucketKey(1), now.minusMinutes(1).plusMillis(1), ImmutableSet.of()), contactMutation); - runAndAssertSuccess(now.minusMinutes(1).plusMillis(1)); + runAndAssertSuccess(now.minusMinutes(1).plusMillis(1), 1); // Verify that the delete occurred first (because it was in the first transaction) even though // deletes have higher weight ArgumentCaptor putCaptor = ArgumentCaptor.forClass(Object.class); @@ -435,7 +437,7 @@ public class ReplayCommitLogsToSqlActionTest { throw new RuntimeException(e); } }); - runAndAssertSuccess(now.minusMinutes(1)); + runAndAssertSuccess(now.minusMinutes(1), 1); // jpaTm()::put should only have been called with the checkpoint verify(spy, times(2)).put(any(SqlReplayCheckpoint.class)); verify(spy, times(2)).put(any()); @@ -460,7 +462,7 @@ public class ReplayCommitLogsToSqlActionTest { // one object only exists in Datastore, one is dually-written (so isn't replicated) ImmutableSet.of(getCrossTldKey(), claimsListKey))); - runAndAssertSuccess(now.minusMinutes(1)); + runAndAssertSuccess(now.minusMinutes(1), 1); verify(spy, times(0)).delete(any(VKey.class)); } @@ -503,7 +505,7 @@ public class ReplayCommitLogsToSqlActionTest { createCheckpoint(now.minusMinutes(1)), CommitLogManifest.create(bucketKey, now, null), CommitLogMutation.create(manifestKey, TestObject.create("a"))); - runAndAssertSuccess(now.minusMinutes(1)); + runAndAssertSuccess(now.minusMinutes(1), 1); assertThat(TestObject.beforeSqlSaveCallCount).isEqualTo(1); } @@ -522,11 +524,12 @@ public class ReplayCommitLogsToSqlActionTest { assertThat(TestObject.beforeSqlDeleteCallCount).isEqualTo(1); } - private void runAndAssertSuccess(DateTime expectedCheckpointTime) { + private void runAndAssertSuccess(DateTime expectedCheckpointTime, int numFiles) { action.run(); assertThat(response.getStatus()).isEqualTo(SC_OK); assertThat(response.getPayload()) - .isEqualTo("ReplayCommitLogsToSqlAction completed successfully."); + .isEqualTo( + String.format("Caught up to current time after replaying %d file(s).", numFiles)); assertThat(jpaTm().transact(SqlReplayCheckpoint::get)).isEqualTo(expectedCheckpointTime); } diff --git a/core/src/test/java/google/registry/backup/RestoreCommitLogsActionTest.java b/core/src/test/java/google/registry/backup/RestoreCommitLogsActionTest.java index 6a0aa15c8..0e2733f5d 100644 --- a/core/src/test/java/google/registry/backup/RestoreCommitLogsActionTest.java +++ b/core/src/test/java/google/registry/backup/RestoreCommitLogsActionTest.java @@ -88,7 +88,7 @@ public class RestoreCommitLogsActionTest { action.gcsBucketOverride = Optional.empty(); action.diffLister = new GcsDiffFileLister(); action.diffLister.gcsUtils = gcsUtils; - action.diffLister.lazyExecutor = MoreExecutors::newDirectExecutorService; + action.diffLister.executorProvider = MoreExecutors::newDirectExecutorService; action.diffLister.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); }