Don't retry RDE upload tasks that have failed dependencies

New upload tasks are created every 4 hours, so if we're waiting on a 2 hour SFTP cooldown or some other long-running dependency like generating the RDE report, just delete this task and let it re-run at the next 4 hour period.  No need to let these tasks continue gumming up the queue.

Note that this method of throwing NoContentException to abort the task without enqueuing it for retry is already being used by RdeReportAction for the same purpose.

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=201372808
This commit is contained in:
mcilwain 2018-06-20 10:18:29 -07:00 committed by Ben McIlwain
parent a5cc359813
commit c8925555d4
5 changed files with 98 additions and 57 deletions

View file

@ -94,15 +94,11 @@ class EscrowTaskRunner {
if (nextRequiredRun.isAfter(startOfToday)) {
throw new NoContentException("Already completed");
}
logger.atInfo().log("Cursor: %s", nextRequiredRun);
logger.atInfo().log("Current cursor is: %s", nextRequiredRun);
task.runWithLock(nextRequiredRun);
ofy()
.transact(
() ->
ofy()
.save()
.entity(
Cursor.create(cursorType, nextRequiredRun.plus(interval), registry)));
DateTime nextRun = nextRequiredRun.plus(interval);
logger.atInfo().log("Rolling cursor forward to %s.", nextRun);
ofy().transact(() -> ofy().save().entity(Cursor.create(cursorType, nextRun, registry)));
return null;
};
String lockName = String.format("EscrowTaskRunner %s", task.getClass().getSimpleName());

View file

@ -20,6 +20,7 @@ import static google.registry.model.common.Cursor.getCursorTimeOrStartOfTime;
import static google.registry.model.ofy.ObjectifyService.ofy;
import static google.registry.model.rde.RdeMode.FULL;
import static google.registry.request.Action.Method.POST;
import static google.registry.util.DateTimeUtils.isBeforeOrAt;
import com.google.appengine.tools.cloudstorage.GcsFilename;
import com.google.common.flogger.FluentLogger;
@ -82,9 +83,12 @@ public final class RdeReportAction implements Runnable, EscrowTask {
DateTime cursorTime =
getCursorTimeOrStartOfTime(
ofy().load().key(Cursor.createKey(CursorType.RDE_UPLOAD, Registry.get(tld))).now());
if (!cursorTime.isAfter(watermark)) {
logger.atInfo().log("tld=%s reportCursor=%s uploadCursor=%s", tld, watermark, cursorTime);
throw new NoContentException("Waiting for RdeUploadAction to complete");
if (isBeforeOrAt(cursorTime, watermark)) {
throw new NoContentException(
String.format(
"Waiting on RdeUploadAction for TLD %s to send %s report; "
+ "last upload completion was at %s",
tld, watermark, cursorTime));
}
String prefix = RdeNamingUtils.makeRydeFilename(tld, watermark, FULL, 1, 0);
GcsFilename reportFilename = new GcsFilename(bucket, prefix + "-report.xml.ghostryde");

View file

@ -23,6 +23,7 @@ import static google.registry.model.common.Cursor.getCursorTimeOrStartOfTime;
import static google.registry.model.ofy.ObjectifyService.ofy;
import static google.registry.model.rde.RdeMode.FULL;
import static google.registry.request.Action.Method.POST;
import static google.registry.util.DateTimeUtils.isBeforeOrAt;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Arrays.asList;
@ -46,7 +47,7 @@ import google.registry.model.registry.Registry;
import google.registry.rde.EscrowTaskRunner.EscrowTask;
import google.registry.rde.JSchSshSession.JSchSshSessionFactory;
import google.registry.request.Action;
import google.registry.request.HttpException.ServiceUnavailableException;
import google.registry.request.HttpException.NoContentException;
import google.registry.request.Parameter;
import google.registry.request.RequestParameters;
import google.registry.request.Response;
@ -137,17 +138,27 @@ public final class RdeUploadAction implements Runnable, EscrowTask {
logger.atInfo().log("Verifying readiness to upload the RDE deposit.");
DateTime stagingCursorTime = getCursorTimeOrStartOfTime(
ofy().load().key(Cursor.createKey(CursorType.RDE_STAGING, Registry.get(tld))).now());
if (!stagingCursorTime.isAfter(watermark)) {
logger.atInfo().log(
"tld=%s uploadCursor=%s stagingCursor=%s", tld, watermark, stagingCursorTime);
throw new ServiceUnavailableException("Waiting for RdeStagingAction to complete");
if (isBeforeOrAt(stagingCursorTime, watermark)) {
throw new NoContentException(
String.format(
"Waiting on RdeStagingAction for TLD %s to send %s upload; "
+ "last RDE staging completion was at %s",
tld, watermark, stagingCursorTime));
}
DateTime sftpCursorTime = getCursorTimeOrStartOfTime(
ofy().load().key(Cursor.createKey(RDE_UPLOAD_SFTP, Registry.get(tld))).now());
if (sftpCursorTime.plus(sftpCooldown).isAfter(clock.nowUtc())) {
// Fail the task good and hard so it retries until the cooldown passes.
logger.atInfo().log("tld=%s cursor=%s sftpCursor=%s", tld, watermark, sftpCursorTime);
throw new ServiceUnavailableException("SFTP cooldown has not yet passed");
DateTime sftpCursorTime =
getCursorTimeOrStartOfTime(
ofy().load().key(Cursor.createKey(RDE_UPLOAD_SFTP, Registry.get(tld))).now());
Duration timeSinceLastSftp = new Duration(sftpCursorTime, clock.nowUtc());
if (timeSinceLastSftp.isShorterThan(sftpCooldown)) {
throw new NoContentException(
String.format(
"Waiting on %d minute SFTP cooldown for TLD %s to send %s upload; "
+ "last upload attempt was at %s (%d minutes ago)",
sftpCooldown.getStandardMinutes(),
tld,
watermark,
sftpCursorTime,
timeSinceLastSftp.getStandardMinutes()));
}
int revision = RdeRevision.getNextRevision(tld, watermark, FULL) - 1;
verify(revision >= 0, "RdeRevision was not set on generated deposit");
@ -161,22 +172,16 @@ public final class RdeUploadAction implements Runnable, EscrowTask {
logger.atInfo().log("Commencing RDE upload for TLD '%s' to '%s'.", tld, uploadUrl);
final long xmlLength = readXmlLength(xmlLengthFilename);
retrier.callWithRetry(
() -> {
upload(xmlFilename, xmlLength, watermark, name);
return null;
},
JSchException.class);
() -> upload(xmlFilename, xmlLength, watermark, name), JSchException.class);
logger.atInfo().log(
"Updating RDE cursor '%s' for TLD '%s' following successful upload.", RDE_UPLOAD_SFTP, tld);
ofy()
.transact(
() ->
ofy()
.save()
.entity(
Cursor.create(
RDE_UPLOAD_SFTP, ofy().getTransactionTime(), Registry.get(tld)))
.now());
() -> {
Cursor updatedSftpCursor =
Cursor.create(RDE_UPLOAD_SFTP, ofy().getTransactionTime(), Registry.get(tld));
ofy().save().entity(updatedSftpCursor);
});
response.setContentType(PLAIN_TEXT_UTF_8);
response.setPayload(String.format("OK %s %s\n", tld, watermark));
}