mirror of
https://github.com/google/nomulus.git
synced 2025-07-22 18:55:58 +02:00
Use multiple transactions in IcannReportingUploadAction (#1386)
Relevant error log message: https://pantheon.corp.google.com/logs/viewer?project=domain-registry&minLogLevel=0&expandAll=false×tamp=2021-10-11T15:28:01.047783000Z&customFacets=&limitCustomFacetWidth=true&dateRangeEnd=2021-10-11T20:51:40.591Z&interval=PT1H&resource=gae_app&logName=projects%2Fdomain-registry%2Flogs%2Fappengine.googleapis.com%252Frequest_log&scrollTimestamp=2021-10-11T15:10:23.174336000Z&filters=text:icannReportingUpload&dateRangeUnbound=backwardInTime&advancedFilter=resource.type%3D%22gae_app%22%0AlogName%3D%22projects%2Fdomain-registry%2Flogs%2Fappengine.googleapis.com%252Frequest_log%22%0A%22icannReportingUpload%22%0Aoperation.id%3D%22616453df00ff02a873d26cedb40001737e646f6d61696e2d726567697374727900016261636b656e643a6e6f6d756c75732d76303233000100%22 note the "invalid handle" bit From https://cloud.google.com/datastore/docs/concepts/transactions: "Transactions expire after 270 seconds or if idle for 60 seconds." From b/202309933: "There is a 60 second timeout on Datastore operations after which they will automatically rollback and the handles become invalid." From the logs we can see that the action is lasting significantly longer than 270 seconds -- roughly 480 seconds in the linked log (more or less). My running theory is that ICANN is, for some reason, now being significantly more slow to respond than they used to be. Some uploads in the log linked above are taking upwards of 10 seconds, especially when they have to retry. Because we have >=45 TLDs, it's not surprising that the action is taking >400 seconds to run. The fix here is to perform each per-TLD operation in its own transaction. The only reason why we need the transactions is for the cursors anyway, and we can just grab and store those at the beginning of the transaction.
This commit is contained in:
parent
d0c8f29a3b
commit
1fd179d041
2 changed files with 52 additions and 62 deletions
|
@ -14,7 +14,6 @@
|
|||
|
||||
package google.registry.reporting.icann;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
import static com.google.common.net.MediaType.PLAIN_TEXT_UTF_8;
|
||||
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
|
||||
import static google.registry.request.Action.Method.POST;
|
||||
|
@ -46,7 +45,6 @@ import google.registry.util.SendEmailService;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
import javax.inject.Inject;
|
||||
|
@ -78,6 +76,7 @@ public final class IcannReportingUploadAction implements Runnable {
|
|||
static final String PATH = "/_dr/task/icannReportingUpload";
|
||||
|
||||
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
|
||||
private static final String LOCK_NAME = "IcannReportingUploadAction";
|
||||
|
||||
@Inject
|
||||
@Config("reportingBucket")
|
||||
|
@ -98,48 +97,33 @@ public final class IcannReportingUploadAction implements Runnable {
|
|||
|
||||
@Override
|
||||
public void run() {
|
||||
Runnable transactional =
|
||||
() -> {
|
||||
ImmutableMap.Builder<String, Boolean> reportSummaryBuilder = new ImmutableMap.Builder<>();
|
||||
|
||||
ImmutableMap<Cursor, String> cursors = loadCursors();
|
||||
|
||||
// If cursor time is before now, upload the corresponding report
|
||||
cursors.entrySet().stream()
|
||||
.filter(entry -> entry.getKey().getCursorTime().isBefore(clock.nowUtc()))
|
||||
.forEach(
|
||||
entry -> {
|
||||
DateTime cursorTime = entry.getKey().getCursorTime();
|
||||
uploadReport(
|
||||
cursorTime,
|
||||
entry.getKey().getType(),
|
||||
entry.getValue(),
|
||||
reportSummaryBuilder);
|
||||
});
|
||||
// Send email of which reports were uploaded
|
||||
emailUploadResults(reportSummaryBuilder.build());
|
||||
response.setStatus(SC_OK);
|
||||
response.setContentType(PLAIN_TEXT_UTF_8);
|
||||
};
|
||||
|
||||
Callable<Void> lockRunner =
|
||||
() -> {
|
||||
tm().transact(transactional);
|
||||
return null;
|
||||
};
|
||||
|
||||
String lockname = "IcannReportingUploadAction";
|
||||
if (!lockHandler.executeWithLocks(lockRunner, null, Duration.standardHours(2), lockname)) {
|
||||
throw new ServiceUnavailableException("Lock for IcannReportingUploadAction already in use");
|
||||
if (!lockHandler.executeWithLocks(
|
||||
this::runWithLock, null, Duration.standardHours(2), LOCK_NAME)) {
|
||||
throw new ServiceUnavailableException(String.format("Lock for %s already in use", LOCK_NAME));
|
||||
}
|
||||
}
|
||||
|
||||
private Void runWithLock() {
|
||||
ImmutableMap.Builder<String, Boolean> reportSummaryBuilder = new ImmutableMap.Builder<>();
|
||||
|
||||
ImmutableMap<Cursor, String> cursors = tm().transact(this::loadCursors);
|
||||
|
||||
// If cursor time is before now, upload the corresponding report
|
||||
cursors.entrySet().stream()
|
||||
.filter(entry -> entry.getKey().getCursorTime().isBefore(clock.nowUtc()))
|
||||
.forEach(entry -> uploadReport(entry.getKey(), entry.getValue(), reportSummaryBuilder));
|
||||
// Send email of which reports were uploaded
|
||||
emailUploadResults(reportSummaryBuilder.build());
|
||||
response.setStatus(SC_OK);
|
||||
response.setContentType(PLAIN_TEXT_UTF_8);
|
||||
return null;
|
||||
}
|
||||
|
||||
/** Uploads the report and rolls forward the cursor for that report. */
|
||||
private void uploadReport(
|
||||
DateTime cursorTime,
|
||||
CursorType cursorType,
|
||||
String tldStr,
|
||||
ImmutableMap.Builder<String, Boolean> reportSummaryBuilder) {
|
||||
Cursor cursor, String tldStr, ImmutableMap.Builder<String, Boolean> reportSummaryBuilder) {
|
||||
DateTime cursorTime = cursor.getCursorTime();
|
||||
CursorType cursorType = cursor.getType();
|
||||
DateTime cursorTimeMinusMonth = cursorTime.withDayOfMonth(1).minusMonths(1);
|
||||
String reportSubdir =
|
||||
String.format(
|
||||
|
@ -150,17 +134,16 @@ public final class IcannReportingUploadAction implements Runnable {
|
|||
BlobId.of(reportingBucket, String.format("%s/%s", reportSubdir, filename));
|
||||
logger.atInfo().log("Reading ICANN report %s from bucket '%s'.", filename, reportingBucket);
|
||||
// Check that the report exists
|
||||
try {
|
||||
verifyFileExists(gcsFilename);
|
||||
} catch (IllegalArgumentException e) {
|
||||
if (!gcsUtils.existsAndNotEmpty(gcsFilename)) {
|
||||
String logMessage =
|
||||
String.format(
|
||||
"Could not upload %s report for %s because file %s did not exist.",
|
||||
cursorType, tldStr, filename);
|
||||
"Could not upload %s report for %s because file %s (object %s in bucket %s) did not"
|
||||
+ " exist.",
|
||||
cursorType, tldStr, filename, gcsFilename.getName(), gcsFilename.getBucket());
|
||||
if (clock.nowUtc().dayOfMonth().get() == 1) {
|
||||
logger.atInfo().withCause(e).log(logMessage + " This report may not have been staged yet.");
|
||||
logger.atInfo().log(logMessage + " This report may not have been staged yet.");
|
||||
} else {
|
||||
logger.atSevere().withCause(e).log(logMessage);
|
||||
logger.atSevere().log(logMessage);
|
||||
}
|
||||
reportSummaryBuilder.put(filename, false);
|
||||
return;
|
||||
|
@ -179,7 +162,6 @@ public final class IcannReportingUploadAction implements Runnable {
|
|||
} catch (RuntimeException e) {
|
||||
logger.atWarning().withCause(e).log("Upload to %s failed.", gcsFilename);
|
||||
}
|
||||
reportSummaryBuilder.put(filename, success);
|
||||
|
||||
// Set cursor to first day of next month if the upload succeeded
|
||||
if (success) {
|
||||
|
@ -188,8 +170,24 @@ public final class IcannReportingUploadAction implements Runnable {
|
|||
cursorType,
|
||||
cursorTime.withTimeAtStartOfDay().withDayOfMonth(1).plusMonths(1),
|
||||
Registry.get(tldStr));
|
||||
tm().put(newCursor);
|
||||
// In order to keep the transactions short-lived, we load all of the cursors in a single
|
||||
// transaction then later use per-cursor transactions when checking + saving the cursors. We
|
||||
// run behind a lock so the cursors shouldn't be changed, but double check to be sure.
|
||||
success =
|
||||
tm().transact(
|
||||
() -> {
|
||||
Cursor fromDb = tm().transact(() -> tm().loadByEntity(cursor));
|
||||
if (!cursor.equals(fromDb)) {
|
||||
logger.atSevere().log(
|
||||
"Expected previously-loaded cursor %s to equal current cursor %s",
|
||||
cursor, fromDb);
|
||||
return false;
|
||||
}
|
||||
tm().put(newCursor);
|
||||
return true;
|
||||
});
|
||||
}
|
||||
reportSummaryBuilder.put(filename, success);
|
||||
}
|
||||
|
||||
private String getFileName(CursorType cursorType, DateTime cursorTime, String tld) {
|
||||
|
@ -303,13 +301,4 @@ public final class IcannReportingUploadAction implements Runnable {
|
|||
return ByteStreams.toByteArray(gcsInput);
|
||||
}
|
||||
}
|
||||
|
||||
private void verifyFileExists(BlobId gcsFilename) {
|
||||
checkArgument(
|
||||
gcsUtils.existsAndNotEmpty(gcsFilename),
|
||||
"Object %s in bucket %s not found",
|
||||
gcsFilename.getName(),
|
||||
gcsFilename.getBucket());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -294,8 +294,9 @@ class IcannReportingUploadActionTest {
|
|||
.that(logHandler)
|
||||
.hasLogAtLevelWithMessage(
|
||||
Level.SEVERE,
|
||||
"Could not upload ICANN_UPLOAD_ACTIVITY report for tld because file"
|
||||
+ " tld-activity-200512.csv did not exist");
|
||||
"Could not upload ICANN_UPLOAD_ACTIVITY report for tld because file "
|
||||
+ "tld-activity-200512.csv (object icann/monthly/2005-12/tld-activity-200512.csv in"
|
||||
+ " bucket basin) did not exist.");
|
||||
}
|
||||
|
||||
@TestOfyAndSql
|
||||
|
@ -310,9 +311,9 @@ class IcannReportingUploadActionTest {
|
|||
.that(logHandler)
|
||||
.hasLogAtLevelWithMessage(
|
||||
Level.INFO,
|
||||
"Could not upload ICANN_UPLOAD_ACTIVITY report for foo because file"
|
||||
+ " foo-activity-200607.csv did not exist. This report may not have been staged"
|
||||
+ " yet.");
|
||||
"Could not upload ICANN_UPLOAD_ACTIVITY report for foo because file "
|
||||
+ "foo-activity-200607.csv (object icann/monthly/2006-07/foo-activity-200607.csv in"
|
||||
+ " bucket basin) did not exist. This report may not have been staged yet.");
|
||||
}
|
||||
|
||||
@TestOfyAndSql
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue