diff --git a/core/src/main/java/google/registry/reporting/icann/IcannReportingUploadAction.java b/core/src/main/java/google/registry/reporting/icann/IcannReportingUploadAction.java index aa67743bf..0db1b8345 100644 --- a/core/src/main/java/google/registry/reporting/icann/IcannReportingUploadAction.java +++ b/core/src/main/java/google/registry/reporting/icann/IcannReportingUploadAction.java @@ -14,7 +14,6 @@ package google.registry.reporting.icann; -import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.net.MediaType.PLAIN_TEXT_UTF_8; import static google.registry.persistence.transaction.TransactionManagerFactory.tm; import static google.registry.request.Action.Method.POST; @@ -46,7 +45,6 @@ import google.registry.util.SendEmailService; import java.io.IOException; import java.io.InputStream; import java.util.Map; -import java.util.concurrent.Callable; import java.util.regex.Pattern; import java.util.stream.Collectors; import javax.inject.Inject; @@ -78,6 +76,7 @@ public final class IcannReportingUploadAction implements Runnable { static final String PATH = "/_dr/task/icannReportingUpload"; private static final FluentLogger logger = FluentLogger.forEnclosingClass(); + private static final String LOCK_NAME = "IcannReportingUploadAction"; @Inject @Config("reportingBucket") @@ -98,48 +97,33 @@ public final class IcannReportingUploadAction implements Runnable { @Override public void run() { - Runnable transactional = - () -> { - ImmutableMap.Builder reportSummaryBuilder = new ImmutableMap.Builder<>(); - - ImmutableMap cursors = loadCursors(); - - // If cursor time is before now, upload the corresponding report - cursors.entrySet().stream() - .filter(entry -> entry.getKey().getCursorTime().isBefore(clock.nowUtc())) - .forEach( - entry -> { - DateTime cursorTime = entry.getKey().getCursorTime(); - uploadReport( - cursorTime, - entry.getKey().getType(), - entry.getValue(), - reportSummaryBuilder); - }); - // Send email of which reports were uploaded - emailUploadResults(reportSummaryBuilder.build()); - response.setStatus(SC_OK); - response.setContentType(PLAIN_TEXT_UTF_8); - }; - - Callable lockRunner = - () -> { - tm().transact(transactional); - return null; - }; - - String lockname = "IcannReportingUploadAction"; - if (!lockHandler.executeWithLocks(lockRunner, null, Duration.standardHours(2), lockname)) { - throw new ServiceUnavailableException("Lock for IcannReportingUploadAction already in use"); + if (!lockHandler.executeWithLocks( + this::runWithLock, null, Duration.standardHours(2), LOCK_NAME)) { + throw new ServiceUnavailableException(String.format("Lock for %s already in use", LOCK_NAME)); } } + private Void runWithLock() { + ImmutableMap.Builder reportSummaryBuilder = new ImmutableMap.Builder<>(); + + ImmutableMap cursors = tm().transact(this::loadCursors); + + // If cursor time is before now, upload the corresponding report + cursors.entrySet().stream() + .filter(entry -> entry.getKey().getCursorTime().isBefore(clock.nowUtc())) + .forEach(entry -> uploadReport(entry.getKey(), entry.getValue(), reportSummaryBuilder)); + // Send email of which reports were uploaded + emailUploadResults(reportSummaryBuilder.build()); + response.setStatus(SC_OK); + response.setContentType(PLAIN_TEXT_UTF_8); + return null; + } + /** Uploads the report and rolls forward the cursor for that report. */ private void uploadReport( - DateTime cursorTime, - CursorType cursorType, - String tldStr, - ImmutableMap.Builder reportSummaryBuilder) { + Cursor cursor, String tldStr, ImmutableMap.Builder reportSummaryBuilder) { + DateTime cursorTime = cursor.getCursorTime(); + CursorType cursorType = cursor.getType(); DateTime cursorTimeMinusMonth = cursorTime.withDayOfMonth(1).minusMonths(1); String reportSubdir = String.format( @@ -150,17 +134,16 @@ public final class IcannReportingUploadAction implements Runnable { BlobId.of(reportingBucket, String.format("%s/%s", reportSubdir, filename)); logger.atInfo().log("Reading ICANN report %s from bucket '%s'.", filename, reportingBucket); // Check that the report exists - try { - verifyFileExists(gcsFilename); - } catch (IllegalArgumentException e) { + if (!gcsUtils.existsAndNotEmpty(gcsFilename)) { String logMessage = String.format( - "Could not upload %s report for %s because file %s did not exist.", - cursorType, tldStr, filename); + "Could not upload %s report for %s because file %s (object %s in bucket %s) did not" + + " exist.", + cursorType, tldStr, filename, gcsFilename.getName(), gcsFilename.getBucket()); if (clock.nowUtc().dayOfMonth().get() == 1) { - logger.atInfo().withCause(e).log(logMessage + " This report may not have been staged yet."); + logger.atInfo().log(logMessage + " This report may not have been staged yet."); } else { - logger.atSevere().withCause(e).log(logMessage); + logger.atSevere().log(logMessage); } reportSummaryBuilder.put(filename, false); return; @@ -179,7 +162,6 @@ public final class IcannReportingUploadAction implements Runnable { } catch (RuntimeException e) { logger.atWarning().withCause(e).log("Upload to %s failed.", gcsFilename); } - reportSummaryBuilder.put(filename, success); // Set cursor to first day of next month if the upload succeeded if (success) { @@ -188,8 +170,24 @@ public final class IcannReportingUploadAction implements Runnable { cursorType, cursorTime.withTimeAtStartOfDay().withDayOfMonth(1).plusMonths(1), Registry.get(tldStr)); - tm().put(newCursor); + // In order to keep the transactions short-lived, we load all of the cursors in a single + // transaction then later use per-cursor transactions when checking + saving the cursors. We + // run behind a lock so the cursors shouldn't be changed, but double check to be sure. + success = + tm().transact( + () -> { + Cursor fromDb = tm().transact(() -> tm().loadByEntity(cursor)); + if (!cursor.equals(fromDb)) { + logger.atSevere().log( + "Expected previously-loaded cursor %s to equal current cursor %s", + cursor, fromDb); + return false; + } + tm().put(newCursor); + return true; + }); } + reportSummaryBuilder.put(filename, success); } private String getFileName(CursorType cursorType, DateTime cursorTime, String tld) { @@ -303,13 +301,4 @@ public final class IcannReportingUploadAction implements Runnable { return ByteStreams.toByteArray(gcsInput); } } - - private void verifyFileExists(BlobId gcsFilename) { - checkArgument( - gcsUtils.existsAndNotEmpty(gcsFilename), - "Object %s in bucket %s not found", - gcsFilename.getName(), - gcsFilename.getBucket()); - } - } diff --git a/core/src/test/java/google/registry/reporting/icann/IcannReportingUploadActionTest.java b/core/src/test/java/google/registry/reporting/icann/IcannReportingUploadActionTest.java index 44b3434fa..ccbb93c19 100644 --- a/core/src/test/java/google/registry/reporting/icann/IcannReportingUploadActionTest.java +++ b/core/src/test/java/google/registry/reporting/icann/IcannReportingUploadActionTest.java @@ -294,8 +294,9 @@ class IcannReportingUploadActionTest { .that(logHandler) .hasLogAtLevelWithMessage( Level.SEVERE, - "Could not upload ICANN_UPLOAD_ACTIVITY report for tld because file" - + " tld-activity-200512.csv did not exist"); + "Could not upload ICANN_UPLOAD_ACTIVITY report for tld because file " + + "tld-activity-200512.csv (object icann/monthly/2005-12/tld-activity-200512.csv in" + + " bucket basin) did not exist."); } @TestOfyAndSql @@ -310,9 +311,9 @@ class IcannReportingUploadActionTest { .that(logHandler) .hasLogAtLevelWithMessage( Level.INFO, - "Could not upload ICANN_UPLOAD_ACTIVITY report for foo because file" - + " foo-activity-200607.csv did not exist. This report may not have been staged" - + " yet."); + "Could not upload ICANN_UPLOAD_ACTIVITY report for foo because file " + + "foo-activity-200607.csv (object icann/monthly/2006-07/foo-activity-200607.csv in" + + " bucket basin) did not exist. This report may not have been staged yet."); } @TestOfyAndSql