Fix runtime issues with commit-log-to-SQL replay (#1240)

* Fix runtime issues with commit-log-to-SQL replay

- We now use a more intelligent prefix to narrow the listObjects search
space in GCS. Otherwise, we're returning >30k objects which can take
roughly 50 seconds. This results in a listObjects time of 1-3 seconds.

- We now search hour by hour to efficiently make use of the prefixing.
Basically, we keep searching for new files until we hit the current time
or until we hit the overall replay timeout.

- Dry-run only prints out the first hour's worth of files
This commit is contained in:
gbrodman 2021-07-22 13:59:28 -04:00 committed by GitHub
parent 3beb207fcc
commit 38c8e81690
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 146 additions and 76 deletions

View file

@ -15,6 +15,7 @@
package google.registry.backup; package google.registry.backup;
import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static google.registry.backup.BackupUtils.GcsMetadataKeys.LOWER_BOUND_CHECKPOINT; import static google.registry.backup.BackupUtils.GcsMetadataKeys.LOWER_BOUND_CHECKPOINT;
import static google.registry.backup.ExportCommitLogDiffAction.DIFF_FILE_PREFIX; import static google.registry.backup.ExportCommitLogDiffAction.DIFF_FILE_PREFIX;
import static google.registry.util.DateTimeUtils.START_OF_TIME; import static google.registry.util.DateTimeUtils.START_OF_TIME;
@ -23,13 +24,13 @@ import static google.registry.util.DateTimeUtils.latestOf;
import com.google.cloud.storage.BlobId; import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.BlobInfo;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.flogger.FluentLogger; import com.google.common.flogger.FluentLogger;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.UncheckedExecutionException; import com.google.common.util.concurrent.UncheckedExecutionException;
import dagger.Lazy;
import google.registry.backup.BackupModule.Backups; import google.registry.backup.BackupModule.Backups;
import google.registry.gcs.GcsUtils; import google.registry.gcs.GcsUtils;
import java.io.IOException; import java.io.IOException;
@ -39,6 +40,7 @@ import java.util.TreeMap;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import javax.inject.Inject; import javax.inject.Inject;
import javax.inject.Provider;
import org.joda.time.DateTime; import org.joda.time.DateTime;
/** Utility class to list commit logs diff files stored on GCS. */ /** Utility class to list commit logs diff files stored on GCS. */
@ -51,7 +53,7 @@ class GcsDiffFileLister {
@Inject GcsUtils gcsUtils; @Inject GcsUtils gcsUtils;
@Inject @Backups Lazy<ListeningExecutorService> lazyExecutor; @Inject @Backups Provider<ListeningExecutorService> executorProvider;
@Inject ScheduledExecutorService scheduledExecutorService; @Inject ScheduledExecutorService scheduledExecutorService;
@Inject @Inject
@ -113,22 +115,27 @@ class GcsDiffFileLister {
// (extracted from the filename) to its asynchronously-loaded metadata, keeping only files with // (extracted from the filename) to its asynchronously-loaded metadata, keeping only files with
// an upper checkpoint time > fromTime. // an upper checkpoint time > fromTime.
TreeMap<DateTime, ListenableFuture<BlobInfo>> upperBoundTimesToBlobInfo = new TreeMap<>(); TreeMap<DateTime, ListenableFuture<BlobInfo>> upperBoundTimesToBlobInfo = new TreeMap<>();
ImmutableList<String> strippedFilenames; String commitLogDiffPrefix = getCommitLogDiffPrefix(fromTime, toTime);
ImmutableList<String> filenames;
try { try {
strippedFilenames = gcsUtils.listFolderObjects(gcsBucket, DIFF_FILE_PREFIX); filenames =
gcsUtils.listFolderObjects(gcsBucket, commitLogDiffPrefix).stream()
.map(s -> commitLogDiffPrefix + s)
.collect(toImmutableList());
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
DateTime lastUpperBoundTime = START_OF_TIME; DateTime lastUpperBoundTime = START_OF_TIME;
TreeMap<DateTime, BlobInfo> sequence = new TreeMap<>(); TreeMap<DateTime, BlobInfo> sequence = new TreeMap<>();
ListeningExecutorService executor = executorProvider.get();
try { try {
for (String strippedFilename : strippedFilenames) { for (String filename : filenames) {
final String filename = DIFF_FILE_PREFIX + strippedFilename; String strippedFilename = filename.replaceFirst(DIFF_FILE_PREFIX, "");
DateTime upperBoundTime = DateTime.parse(strippedFilename); DateTime upperBoundTime = DateTime.parse(strippedFilename);
if (isInRange(upperBoundTime, fromTime, toTime)) { if (isInRange(upperBoundTime, fromTime, toTime)) {
upperBoundTimesToBlobInfo.put( upperBoundTimesToBlobInfo.put(
upperBoundTime, lazyExecutor.get().submit(() -> getBlobInfo(gcsBucket, filename))); upperBoundTime, executor.submit(() -> getBlobInfo(gcsBucket, filename)));
lastUpperBoundTime = latestOf(upperBoundTime, lastUpperBoundTime); lastUpperBoundTime = latestOf(upperBoundTime, lastUpperBoundTime);
} }
} }
@ -173,7 +180,7 @@ class GcsDiffFileLister {
"Unable to compute commit diff history, there are either gaps or forks in the history " "Unable to compute commit diff history, there are either gaps or forks in the history "
+ "file set. Check log for details."); + "file set. Check log for details.");
} finally { } finally {
lazyExecutor.get().shutdown(); executor.shutdown();
} }
logger.atInfo().log( logger.atInfo().log(
@ -198,4 +205,36 @@ class GcsDiffFileLister {
private BlobInfo getBlobInfo(String gcsBucket, String filename) { private BlobInfo getBlobInfo(String gcsBucket, String filename) {
return gcsUtils.getBlobInfo(BlobId.of(gcsBucket, filename)); return gcsUtils.getBlobInfo(BlobId.of(gcsBucket, filename));
} }
/**
* Returns a prefix guaranteed to cover all commit log diff files in the given range.
*
* <p>The listObjects call can be fairly slow if we search over many thousands or tens of
* thousands of files, so we restrict the search space. The commit logs have a file format of
* "commit_diff_until_2021-05-11T06:48:00.070Z" so we can often filter down as far as the hour.
*
* <p>Here, we get the longest prefix possible based on which fields (year, month, day, hour) the
* times in question have in common.
*/
@VisibleForTesting
static String getCommitLogDiffPrefix(DateTime from, @Nullable DateTime to) {
StringBuilder result = new StringBuilder(DIFF_FILE_PREFIX);
if (to == null || from.getYear() != to.getYear()) {
return result.toString();
}
result.append(from.getYear()).append('-');
if (from.getMonthOfYear() != to.getMonthOfYear()) {
return result.toString();
}
result.append(String.format("%02d-", from.getMonthOfYear()));
if (from.getDayOfMonth() != to.getDayOfMonth()) {
return result.toString();
}
result.append(String.format("%02dT", from.getDayOfMonth()));
if (from.getHourOfDay() != to.getHourOfDay()) {
return result.toString();
}
result.append(String.format("%02d:", from.getHourOfDay()));
return result.toString();
}
} }

View file

@ -20,13 +20,15 @@ import static google.registry.backup.RestoreCommitLogsAction.DRY_RUN_PARAM;
import static google.registry.model.ofy.EntityWritePriorities.getEntityPriority; import static google.registry.model.ofy.EntityWritePriorities.getEntityPriority;
import static google.registry.model.ofy.ObjectifyService.auditedOfy; import static google.registry.model.ofy.ObjectifyService.auditedOfy;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import static google.registry.util.DateTimeUtils.isAtOrAfter;
import static google.registry.util.DateTimeUtils.isBeforeOrAt;
import static javax.servlet.http.HttpServletResponse.SC_NO_CONTENT; import static javax.servlet.http.HttpServletResponse.SC_NO_CONTENT;
import static javax.servlet.http.HttpServletResponse.SC_OK;
import static org.joda.time.Duration.standardHours; import static org.joda.time.Duration.standardHours;
import com.google.appengine.api.datastore.Entity; import com.google.appengine.api.datastore.Entity;
import com.google.appengine.api.datastore.Key; import com.google.appengine.api.datastore.Key;
import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.BlobInfo;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.flogger.FluentLogger; import com.google.common.flogger.FluentLogger;
import google.registry.config.RegistryConfig.Config; import google.registry.config.RegistryConfig.Config;
@ -120,26 +122,15 @@ public class ReplayCommitLogsToSqlAction implements Runnable {
} }
try { try {
logger.atInfo().log("Beginning replay of commit logs."); logger.atInfo().log("Beginning replay of commit logs.");
ImmutableList<BlobInfo> commitLogFiles = getFilesToReplay(); String resultMessage;
if (dryRun) { if (dryRun) {
response.setStatus(HttpServletResponse.SC_OK); resultMessage = executeDryRun();
ImmutableList<String> filenames =
commitLogFiles.stream()
.limit(10)
.map(file -> file.getName())
.collect(toImmutableList());
String dryRunMessage =
"Running in dry-run mode; would have processed %d files. They are (limit 10):\n"
+ Joiner.on('\n').join(filenames);
response.setPayload(dryRunMessage);
logger.atInfo().log(dryRunMessage);
} else { } else {
replayFiles(commitLogFiles); resultMessage = replayFiles();
response.setStatus(HttpServletResponse.SC_OK);
String message = "ReplayCommitLogsToSqlAction completed successfully.";
response.setPayload(message);
logger.atInfo().log(message);
} }
response.setStatus(SC_OK);
response.setPayload(resultMessage);
logger.atInfo().log(resultMessage);
} catch (Throwable t) { } catch (Throwable t) {
String message = "Errored out replaying files."; String message = "Errored out replaying files.";
logger.atSevere().withCause(t).log(message); logger.atSevere().withCause(t).log(message);
@ -150,52 +141,77 @@ public class ReplayCommitLogsToSqlAction implements Runnable {
} }
} }
private ImmutableList<BlobInfo> getFilesToReplay() { private String executeDryRun() {
// Start at the first millisecond we haven't seen yet // Start at the first millisecond we haven't seen yet
DateTime fromTime = jpaTm().transact(() -> SqlReplayCheckpoint.get().plusMillis(1)); DateTime searchStartTime = jpaTm().transact(() -> SqlReplayCheckpoint.get().plusMillis(1));
logger.atInfo().log("Starting replay from: %s.", fromTime); // Search through the end of the hour
// If there's an inconsistent file set, this will throw IllegalStateException and the job DateTime searchEndTime =
// will try later -- this is likely because an export hasn't finished yet. searchStartTime.withMinuteOfHour(59).withSecondOfMinute(59).withMillisOfSecond(999);
ImmutableList<BlobInfo> commitLogFiles = ImmutableList<String> fileBatch =
diffLister.listDiffFiles(gcsBucket, fromTime, /* current time */ null); diffLister.listDiffFiles(gcsBucket, searchStartTime, searchEndTime).stream()
logger.atInfo().log("Found %d new commit log files to process.", commitLogFiles.size()); .map(BlobInfo::getName)
return commitLogFiles; .collect(toImmutableList());
return String.format(
"Running in dry-run mode, the first set of commit log files processed would be from "
+ "searching from %s to %s and would contain %d file(s). They are (limit 10): \n%s",
searchStartTime,
searchEndTime,
fileBatch.size(),
fileBatch.stream().limit(10).collect(toImmutableList()));
} }
private void replayFiles(ImmutableList<BlobInfo> commitLogFiles) { private String replayFiles() {
DateTime replayTimeoutTime = clock.nowUtc().plus(REPLAY_TIMEOUT_DURATION); DateTime replayTimeoutTime = clock.nowUtc().plus(REPLAY_TIMEOUT_DURATION);
int processedFiles = 0; DateTime searchStartTime = jpaTm().transact(() -> SqlReplayCheckpoint.get().plusMillis(1));
for (BlobInfo metadata : commitLogFiles) { int filesProcessed = 0;
// One transaction per GCS file // Starting from one millisecond after the last file we processed, search for and import files
jpaTm().transact(() -> processFile(metadata)); // one hour at a time until we catch up to the current time or we hit the replay timeout (in
processedFiles++; // which case the next run will pick up from where we leave off).
if (clock.nowUtc().isAfter(replayTimeoutTime)) { //
logger.atInfo().log( // We use hour-long batches because GCS supports filename prefix-based searches.
"Reached max execution time after replaying %d files, leaving %d files for next run.", while (true) {
processedFiles, commitLogFiles.size() - processedFiles); if (isAtOrAfter(clock.nowUtc(), replayTimeoutTime)) {
return; return String.format(
"Reached max execution time after replaying %d file(s).", filesProcessed);
} }
if (isBeforeOrAt(clock.nowUtc(), searchStartTime)) {
return String.format(
"Caught up to current time after replaying %d file(s).", filesProcessed);
}
// Search through the end of the hour
DateTime searchEndTime =
searchStartTime.withMinuteOfHour(59).withSecondOfMinute(59).withMillisOfSecond(999);
ImmutableList<BlobInfo> fileBatch =
diffLister.listDiffFiles(gcsBucket, searchStartTime, searchEndTime);
if (fileBatch.isEmpty()) {
logger.atInfo().log(
"No remaining files found in hour %s, continuing search in the next hour.",
searchStartTime.toString("yyyy-MM-dd HH"));
}
for (BlobInfo file : fileBatch) {
jpaTm().transact(() -> processFile(file));
filesProcessed++;
if (clock.nowUtc().isAfter(replayTimeoutTime)) {
return String.format(
"Reached max execution time after replaying %d file(s).", filesProcessed);
}
}
searchStartTime = searchEndTime.plusMillis(1);
} }
logger.atInfo().log("Replayed %d commit log files to SQL successfully.", processedFiles);
} }
private void processFile(BlobInfo metadata) { private void processFile(BlobInfo metadata) {
logger.atInfo().log(
"Processing commit log file %s of size %d B.", metadata.getName(), metadata.getSize());
try (InputStream input = gcsUtils.openInputStream(metadata.getBlobId())) { try (InputStream input = gcsUtils.openInputStream(metadata.getBlobId())) {
// Load and process the Datastore transactions one at a time // Load and process the Datastore transactions one at a time
ImmutableList<ImmutableList<VersionedEntity>> allTransactions = ImmutableList<ImmutableList<VersionedEntity>> allTransactions =
CommitLogImports.loadEntitiesByTransaction(input); CommitLogImports.loadEntitiesByTransaction(input);
logger.atInfo().log(
"Replaying %d transactions from commit log file %s.",
allTransactions.size(), metadata.getName());
allTransactions.forEach(this::replayTransaction); allTransactions.forEach(this::replayTransaction);
// if we succeeded, set the last-seen time // if we succeeded, set the last-seen time
DateTime checkpoint = DateTime.parse(metadata.getName().substring(DIFF_FILE_PREFIX.length())); DateTime checkpoint = DateTime.parse(metadata.getName().substring(DIFF_FILE_PREFIX.length()));
SqlReplayCheckpoint.set(checkpoint); SqlReplayCheckpoint.set(checkpoint);
logger.atInfo().log( logger.atInfo().log(
"Replayed %d transactions from commit log file %s.", "Replayed %d transactions from commit log file %s with size %d B.",
allTransactions.size(), metadata.getName()); allTransactions.size(), metadata.getName(), metadata.getSize());
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException( throw new RuntimeException(
"Errored out while replaying commit log file " + metadata.getName(), e); "Errored out while replaying commit log file " + metadata.getName(), e);

View file

@ -18,6 +18,7 @@ import static com.google.common.collect.Iterables.transform;
import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.Truth.assertThat;
import static google.registry.backup.BackupUtils.GcsMetadataKeys.LOWER_BOUND_CHECKPOINT; import static google.registry.backup.BackupUtils.GcsMetadataKeys.LOWER_BOUND_CHECKPOINT;
import static google.registry.backup.ExportCommitLogDiffAction.DIFF_FILE_PREFIX; import static google.registry.backup.ExportCommitLogDiffAction.DIFF_FILE_PREFIX;
import static google.registry.backup.GcsDiffFileLister.getCommitLogDiffPrefix;
import static org.joda.time.DateTimeZone.UTC; import static org.joda.time.DateTimeZone.UTC;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail; import static org.junit.jupiter.api.Assertions.fail;
@ -61,7 +62,7 @@ public class GcsDiffFileListerTest {
@BeforeEach @BeforeEach
void beforeEach() throws Exception { void beforeEach() throws Exception {
diffLister.gcsUtils = gcsUtils; diffLister.gcsUtils = gcsUtils;
diffLister.lazyExecutor = MoreExecutors::newDirectExecutorService; diffLister.executorProvider = MoreExecutors::newDirectExecutorService;
diffLister.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); diffLister.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
addGcsFile(i, i + 1); addGcsFile(i, i + 1);
@ -189,12 +190,23 @@ public class GcsDiffFileListerTest {
@Test @Test
void testList_toTimeSpecified() { void testList_toTimeSpecified() {
assertThat(listDiffFiles( assertThat(
now.minusMinutes(4).minusSeconds(1), now.minusMinutes(2).plusSeconds(1))) listDiffFiles(now.minusMinutes(4).minusSeconds(1), now.minusMinutes(2).plusSeconds(1)))
.containsExactly( .containsExactly(now.minusMinutes(4), now.minusMinutes(3), now.minusMinutes(2))
now.minusMinutes(4),
now.minusMinutes(3),
now.minusMinutes(2))
.inOrder(); .inOrder();
} }
@Test
void testPrefix_lengthened() {
DateTime from = DateTime.parse("2021-05-11T06:48:00.070Z");
assertThat(getCommitLogDiffPrefix(from, null)).isEqualTo("commit_diff_until_");
assertThat(getCommitLogDiffPrefix(from, DateTime.parse("2021-07-01")))
.isEqualTo("commit_diff_until_2021-");
assertThat(getCommitLogDiffPrefix(from, DateTime.parse("2021-05-21")))
.isEqualTo("commit_diff_until_2021-05-");
assertThat(getCommitLogDiffPrefix(from, DateTime.parse("2021-05-11T09:48:00.070Z")))
.isEqualTo("commit_diff_until_2021-05-11T");
assertThat(getCommitLogDiffPrefix(from, DateTime.parse("2021-05-11T06:59:00.070Z")))
.isEqualTo("commit_diff_until_2021-05-11T06:");
}
} }

View file

@ -132,7 +132,7 @@ public class ReplayCommitLogsToSqlActionTest {
action.gcsBucket = "gcs bucket"; action.gcsBucket = "gcs bucket";
action.diffLister = new GcsDiffFileLister(); action.diffLister = new GcsDiffFileLister();
action.diffLister.gcsUtils = gcsUtils; action.diffLister.gcsUtils = gcsUtils;
action.diffLister.lazyExecutor = MoreExecutors::newDirectExecutorService; action.diffLister.executorProvider = MoreExecutors::newDirectExecutorService;
action.diffLister.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); action.diffLister.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
ofyTm() ofyTm()
.transact( .transact(
@ -200,7 +200,7 @@ public class ReplayCommitLogsToSqlActionTest {
CommitLogMutation.create(manifest2Key, TestObject.create("f"))); CommitLogMutation.create(manifest2Key, TestObject.create("f")));
jpaTm().transact(() -> SqlReplayCheckpoint.set(now.minusMinutes(1).minusMillis(1))); jpaTm().transact(() -> SqlReplayCheckpoint.set(now.minusMinutes(1).minusMillis(1)));
fakeClock.advanceOneMilli(); fakeClock.advanceOneMilli();
runAndAssertSuccess(now); runAndAssertSuccess(now, 2);
assertExpectedIds("previous to keep", "b", "d", "e", "f"); assertExpectedIds("previous to keep", "b", "d", "e", "f");
} }
@ -211,7 +211,7 @@ public class ReplayCommitLogsToSqlActionTest {
saveDiffFileNotToRestore(gcsUtils, now.minusMinutes(1)); saveDiffFileNotToRestore(gcsUtils, now.minusMinutes(1));
saveDiffFile(gcsUtils, createCheckpoint(now.minusMillis(2))); saveDiffFile(gcsUtils, createCheckpoint(now.minusMillis(2)));
jpaTm().transact(() -> SqlReplayCheckpoint.set(now.minusMillis(1))); jpaTm().transact(() -> SqlReplayCheckpoint.set(now.minusMillis(1)));
runAndAssertSuccess(now.minusMillis(1)); runAndAssertSuccess(now.minusMillis(1), 0);
assertExpectedIds("previous to keep"); assertExpectedIds("previous to keep");
} }
@ -235,8 +235,10 @@ public class ReplayCommitLogsToSqlActionTest {
assertThat(response.getStatus()).isEqualTo(SC_OK); assertThat(response.getStatus()).isEqualTo(SC_OK);
assertThat(response.getPayload()) assertThat(response.getPayload())
.isEqualTo( .isEqualTo(
"Running in dry-run mode; would have processed %d files. They are (limit 10):\n" "Running in dry-run mode, the first set of commit log files processed would be from "
+ "commit_diff_until_1999-12-31T23:59:00.000Z"); + "searching from 1999-12-31T23:59:00.000Z to 1999-12-31T23:59:59.999Z and would "
+ "contain 1 file(s). They are (limit 10): \n"
+ "[commit_diff_until_1999-12-31T23:59:00.000Z]");
} }
@Test @Test
@ -253,7 +255,7 @@ public class ReplayCommitLogsToSqlActionTest {
CommitLogManifest.create(bucketKey, now, null), CommitLogManifest.create(bucketKey, now, null),
CommitLogMutation.create(manifestKey, TestObject.create("a")), CommitLogMutation.create(manifestKey, TestObject.create("a")),
CommitLogMutation.create(manifestKey, TestObject.create("b"))); CommitLogMutation.create(manifestKey, TestObject.create("b")));
runAndAssertSuccess(now.minusMinutes(1)); runAndAssertSuccess(now.minusMinutes(1), 1);
assertExpectedIds("previous to keep", "a", "b"); assertExpectedIds("previous to keep", "a", "b");
} }
@ -275,7 +277,7 @@ public class ReplayCommitLogsToSqlActionTest {
getBucketKey(1), getBucketKey(1),
now, now,
ImmutableSet.of(Key.create(TestObject.create("previous to delete"))))); ImmutableSet.of(Key.create(TestObject.create("previous to delete")))));
runAndAssertSuccess(now.minusMinutes(1)); runAndAssertSuccess(now.minusMinutes(1), 1);
assertExpectedIds("previous to keep"); assertExpectedIds("previous to keep");
} }
@ -347,7 +349,7 @@ public class ReplayCommitLogsToSqlActionTest {
domainMutation, domainMutation,
contactMutation); contactMutation);
runAndAssertSuccess(now.minusMinutes(1)); runAndAssertSuccess(now.minusMinutes(1), 1);
// Verify two things: // Verify two things:
// 1. that the contact insert occurred before the domain insert (necessary for FK ordering) // 1. that the contact insert occurred before the domain insert (necessary for FK ordering)
// even though the domain came first in the file // even though the domain came first in the file
@ -390,7 +392,7 @@ public class ReplayCommitLogsToSqlActionTest {
CommitLogManifest.create( CommitLogManifest.create(
getBucketKey(1), now.minusMinutes(1).plusMillis(1), ImmutableSet.of()), getBucketKey(1), now.minusMinutes(1).plusMillis(1), ImmutableSet.of()),
contactMutation); contactMutation);
runAndAssertSuccess(now.minusMinutes(1).plusMillis(1)); runAndAssertSuccess(now.minusMinutes(1).plusMillis(1), 1);
// Verify that the delete occurred first (because it was in the first transaction) even though // Verify that the delete occurred first (because it was in the first transaction) even though
// deletes have higher weight // deletes have higher weight
ArgumentCaptor<Object> putCaptor = ArgumentCaptor.forClass(Object.class); ArgumentCaptor<Object> putCaptor = ArgumentCaptor.forClass(Object.class);
@ -435,7 +437,7 @@ public class ReplayCommitLogsToSqlActionTest {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
}); });
runAndAssertSuccess(now.minusMinutes(1)); runAndAssertSuccess(now.minusMinutes(1), 1);
// jpaTm()::put should only have been called with the checkpoint // jpaTm()::put should only have been called with the checkpoint
verify(spy, times(2)).put(any(SqlReplayCheckpoint.class)); verify(spy, times(2)).put(any(SqlReplayCheckpoint.class));
verify(spy, times(2)).put(any()); verify(spy, times(2)).put(any());
@ -460,7 +462,7 @@ public class ReplayCommitLogsToSqlActionTest {
// one object only exists in Datastore, one is dually-written (so isn't replicated) // one object only exists in Datastore, one is dually-written (so isn't replicated)
ImmutableSet.of(getCrossTldKey(), claimsListKey))); ImmutableSet.of(getCrossTldKey(), claimsListKey)));
runAndAssertSuccess(now.minusMinutes(1)); runAndAssertSuccess(now.minusMinutes(1), 1);
verify(spy, times(0)).delete(any(VKey.class)); verify(spy, times(0)).delete(any(VKey.class));
} }
@ -503,7 +505,7 @@ public class ReplayCommitLogsToSqlActionTest {
createCheckpoint(now.minusMinutes(1)), createCheckpoint(now.minusMinutes(1)),
CommitLogManifest.create(bucketKey, now, null), CommitLogManifest.create(bucketKey, now, null),
CommitLogMutation.create(manifestKey, TestObject.create("a"))); CommitLogMutation.create(manifestKey, TestObject.create("a")));
runAndAssertSuccess(now.minusMinutes(1)); runAndAssertSuccess(now.minusMinutes(1), 1);
assertThat(TestObject.beforeSqlSaveCallCount).isEqualTo(1); assertThat(TestObject.beforeSqlSaveCallCount).isEqualTo(1);
} }
@ -522,11 +524,12 @@ public class ReplayCommitLogsToSqlActionTest {
assertThat(TestObject.beforeSqlDeleteCallCount).isEqualTo(1); assertThat(TestObject.beforeSqlDeleteCallCount).isEqualTo(1);
} }
private void runAndAssertSuccess(DateTime expectedCheckpointTime) { private void runAndAssertSuccess(DateTime expectedCheckpointTime, int numFiles) {
action.run(); action.run();
assertThat(response.getStatus()).isEqualTo(SC_OK); assertThat(response.getStatus()).isEqualTo(SC_OK);
assertThat(response.getPayload()) assertThat(response.getPayload())
.isEqualTo("ReplayCommitLogsToSqlAction completed successfully."); .isEqualTo(
String.format("Caught up to current time after replaying %d file(s).", numFiles));
assertThat(jpaTm().transact(SqlReplayCheckpoint::get)).isEqualTo(expectedCheckpointTime); assertThat(jpaTm().transact(SqlReplayCheckpoint::get)).isEqualTo(expectedCheckpointTime);
} }

View file

@ -88,7 +88,7 @@ public class RestoreCommitLogsActionTest {
action.gcsBucketOverride = Optional.empty(); action.gcsBucketOverride = Optional.empty();
action.diffLister = new GcsDiffFileLister(); action.diffLister = new GcsDiffFileLister();
action.diffLister.gcsUtils = gcsUtils; action.diffLister.gcsUtils = gcsUtils;
action.diffLister.lazyExecutor = MoreExecutors::newDirectExecutorService; action.diffLister.executorProvider = MoreExecutors::newDirectExecutorService;
action.diffLister.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); action.diffLister.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
} }