Add tx/s instrumentation to replay action and re-enable it on sandbox (#1276)

This commit is contained in:
Ben McIlwain 2021-08-12 18:33:47 -04:00 committed by GitHub
parent fbdd278abd
commit aae06a25fc
3 changed files with 49 additions and 27 deletions

View file

@ -58,6 +58,7 @@ import javax.inject.Inject;
import javax.servlet.http.HttpServletResponse;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Seconds;
/** Action that replays commit logs to Cloud SQL to keep it up to date. */
@Action(
@ -97,7 +98,8 @@ public class ReplayCommitLogsToSqlAction implements Runnable {
@Override
public void run() {
MigrationState state = DatabaseMigrationStateSchedule.getValueAtTime(clock.nowUtc());
DateTime startTime = clock.nowUtc();
MigrationState state = DatabaseMigrationStateSchedule.getValueAtTime(startTime);
if (!state.getReplayDirection().equals(ReplayDirection.DATASTORE_TO_SQL)) {
String message =
String.format(
@ -126,7 +128,7 @@ public class ReplayCommitLogsToSqlAction implements Runnable {
if (dryRun) {
resultMessage = executeDryRun();
} else {
resultMessage = replayFiles();
resultMessage = replayFiles(startTime);
}
response.setStatus(SC_OK);
response.setPayload(resultMessage);
@ -160,10 +162,11 @@ public class ReplayCommitLogsToSqlAction implements Runnable {
fileBatch.stream().limit(10).collect(toImmutableList()));
}
private String replayFiles() {
DateTime replayTimeoutTime = clock.nowUtc().plus(REPLAY_TIMEOUT_DURATION);
private String replayFiles(DateTime startTime) {
DateTime replayTimeoutTime = startTime.plus(REPLAY_TIMEOUT_DURATION);
DateTime searchStartTime = jpaTm().transact(() -> SqlReplayCheckpoint.get().plusMillis(1));
int filesProcessed = 0;
int transactionsProcessed = 0;
// Starting from one millisecond after the last file we processed, search for and import files
// one hour at a time until we catch up to the current time or we hit the replay timeout (in
// which case the next run will pick up from where we leave off).
@ -171,12 +174,12 @@ public class ReplayCommitLogsToSqlAction implements Runnable {
// We use hour-long batches because GCS supports filename prefix-based searches.
while (true) {
if (isAtOrAfter(clock.nowUtc(), replayTimeoutTime)) {
return String.format(
"Reached max execution time after replaying %d file(s).", filesProcessed);
return createResponseString(
"Reached max execution time", startTime, filesProcessed, transactionsProcessed);
}
if (isBeforeOrAt(clock.nowUtc(), searchStartTime)) {
return String.format(
"Caught up to current time after replaying %d file(s).", filesProcessed);
return createResponseString(
"Caught up to current time", startTime, filesProcessed, transactionsProcessed);
}
// Search through the end of the hour
DateTime searchEndTime =
@ -189,18 +192,32 @@ public class ReplayCommitLogsToSqlAction implements Runnable {
searchStartTime.toString("yyyy-MM-dd HH"));
}
for (BlobInfo file : fileBatch) {
processFile(file);
transactionsProcessed += processFile(file);
filesProcessed++;
if (clock.nowUtc().isAfter(replayTimeoutTime)) {
return String.format(
"Reached max execution time after replaying %d file(s).", filesProcessed);
return createResponseString(
"Reached max execution time", startTime, filesProcessed, transactionsProcessed);
}
}
searchStartTime = searchEndTime.plusMillis(1);
}
}
private void processFile(BlobInfo metadata) {
private String createResponseString(
String msg, DateTime startTime, int filesProcessed, int transactionsProcessed) {
double tps =
(double) transactionsProcessed
/ (double) Seconds.secondsBetween(startTime, clock.nowUtc()).getSeconds();
return String.format(
"%s after replaying %d file(s) containing %d total transaction(s) (%.2f tx/s).",
msg, filesProcessed, transactionsProcessed, tps);
}
/**
* Replays the commit logs in the given commit log file and returns the number of transactions
* committed.
*/
private int processFile(BlobInfo metadata) {
try (InputStream input = gcsUtils.openInputStream(metadata.getBlobId())) {
// Load and process the Datastore transactions one at a time
ImmutableList<ImmutableList<VersionedEntity>> allTransactions =
@ -213,6 +230,7 @@ public class ReplayCommitLogsToSqlAction implements Runnable {
logger.atInfo().log(
"Replayed %d transactions from commit log file %s with size %d B.",
allTransactions.size(), metadata.getName(), metadata.getSize());
return allTransactions.size();
} catch (IOException e) {
throw new RuntimeException(
"Errored out while replaying commit log file " + metadata.getName(), e);

View file

@ -230,7 +230,7 @@
</cron>
<cron>
<url><![CDATA[/_dr/cron/fanout?queue=replay-commit-logs-to-sql&endpoint=/_dr/task/replayCommitLogsToSql&runInEmpty&dryRun=true]]></url>
<url><![CDATA[/_dr/cron/fanout?queue=replay-commit-logs-to-sql&endpoint=/_dr/task/replayCommitLogsToSql&runInEmpty]]></url>
<description>
Replays recent commit logs from Datastore to the SQL secondary backend.
</description>

View file

@ -201,7 +201,7 @@ public class ReplayCommitLogsToSqlActionTest {
CommitLogMutation.create(manifest2Key, TestObject.create("f")));
jpaTm().transact(() -> SqlReplayCheckpoint.set(now.minusMinutes(1).minusMillis(1)));
fakeClock.advanceOneMilli();
runAndAssertSuccess(now, 2);
runAndAssertSuccess(now, 2, 3);
assertExpectedIds("previous to keep", "b", "d", "e", "f");
}
@ -212,7 +212,7 @@ public class ReplayCommitLogsToSqlActionTest {
saveDiffFileNotToRestore(gcsUtils, now.minusMinutes(1));
saveDiffFile(gcsUtils, createCheckpoint(now.minusMillis(2)));
jpaTm().transact(() -> SqlReplayCheckpoint.set(now.minusMillis(1)));
runAndAssertSuccess(now.minusMillis(1), 0);
runAndAssertSuccess(now.minusMillis(1), 0, 0);
assertExpectedIds("previous to keep");
}
@ -256,7 +256,7 @@ public class ReplayCommitLogsToSqlActionTest {
CommitLogManifest.create(bucketKey, now, null),
CommitLogMutation.create(manifestKey, TestObject.create("a")),
CommitLogMutation.create(manifestKey, TestObject.create("b")));
runAndAssertSuccess(now.minusMinutes(1), 1);
runAndAssertSuccess(now.minusMinutes(1), 1, 1);
assertExpectedIds("previous to keep", "a", "b");
}
@ -278,7 +278,7 @@ public class ReplayCommitLogsToSqlActionTest {
getBucketKey(1),
now,
ImmutableSet.of(Key.create(TestObject.create("previous to delete")))));
runAndAssertSuccess(now.minusMinutes(1), 1);
runAndAssertSuccess(now.minusMinutes(1), 1, 1);
assertExpectedIds("previous to keep");
}
@ -350,7 +350,7 @@ public class ReplayCommitLogsToSqlActionTest {
domainMutation,
contactMutation);
runAndAssertSuccess(now.minusMinutes(1), 1);
runAndAssertSuccess(now.minusMinutes(1), 1, 1);
// Verify two things:
// 1. that the contact insert occurred before the domain insert (necessary for FK ordering)
// even though the domain came first in the file
@ -393,7 +393,7 @@ public class ReplayCommitLogsToSqlActionTest {
CommitLogManifest.create(
getBucketKey(1), now.minusMinutes(1).plusMillis(1), ImmutableSet.of()),
contactMutation);
runAndAssertSuccess(now.minusMinutes(1).plusMillis(1), 1);
runAndAssertSuccess(now.minusMinutes(1).plusMillis(1), 1, 2);
// Verify that the delete occurred first (because it was in the first transaction) even though
// deletes have higher weight
ArgumentCaptor<Object> putCaptor = ArgumentCaptor.forClass(Object.class);
@ -438,7 +438,7 @@ public class ReplayCommitLogsToSqlActionTest {
throw new RuntimeException(e);
}
});
runAndAssertSuccess(now.minusMinutes(1), 1);
runAndAssertSuccess(now.minusMinutes(1), 1, 1);
// jpaTm()::put should only have been called with the checkpoint
verify(spy, times(2)).put(any(SqlReplayCheckpoint.class));
verify(spy, times(2)).put(any());
@ -463,7 +463,7 @@ public class ReplayCommitLogsToSqlActionTest {
// one object only exists in Datastore, one is dually-written (so isn't replicated)
ImmutableSet.of(getCrossTldKey(), claimsListKey)));
runAndAssertSuccess(now.minusMinutes(1), 1);
runAndAssertSuccess(now.minusMinutes(1), 1, 1);
verify(spy, times(0)).delete(any(VKey.class));
}
@ -506,7 +506,7 @@ public class ReplayCommitLogsToSqlActionTest {
createCheckpoint(now.minusMinutes(1)),
CommitLogManifest.create(bucketKey, now, null),
CommitLogMutation.create(manifestKey, TestObject.create("a")));
runAndAssertSuccess(now.minusMinutes(1), 1);
runAndAssertSuccess(now.minusMinutes(1), 1, 1);
assertThat(TestObject.beforeSqlSaveCallCount).isEqualTo(1);
}
@ -544,7 +544,7 @@ public class ReplayCommitLogsToSqlActionTest {
createCheckpoint(now.minusMinutes(1)),
CommitLogManifest.create(
getBucketKey(1), now.minusMinutes(3), ImmutableSet.of(Key.create(domain))));
runAndAssertSuccess(now.minusMinutes(1), 1);
runAndAssertSuccess(now.minusMinutes(1), 1, 1);
jpaTm()
.transact(
@ -596,15 +596,19 @@ public class ReplayCommitLogsToSqlActionTest {
domainWithoutDsDataMutation,
CommitLogManifest.create(getBucketKey(1), now.minusMinutes(2), ImmutableSet.of()),
domainWithOriginalDsDataMutation);
runAndAssertSuccess(now.minusMinutes(1), 1);
runAndAssertSuccess(now.minusMinutes(1), 1, 2);
}
private void runAndAssertSuccess(DateTime expectedCheckpointTime, int numFiles) {
private void runAndAssertSuccess(
DateTime expectedCheckpointTime, int numFiles, int numTransactions) {
action.run();
assertThat(response.getStatus()).isEqualTo(SC_OK);
assertThat(response.getPayload())
.isEqualTo(
String.format("Caught up to current time after replaying %d file(s).", numFiles));
.startsWith(
String.format(
"Caught up to current time after replaying %d file(s) containing %d total"
+ " transaction(s)",
numFiles, numTransactions));
assertThat(jpaTm().transact(SqlReplayCheckpoint::get)).isEqualTo(expectedCheckpointTime);
}