diff --git a/java/google/registry/mapreduce/inputs/CommitLogManifestReader.java b/java/google/registry/mapreduce/inputs/CommitLogManifestReader.java index 29ec015cb..a968277e6 100644 --- a/java/google/registry/mapreduce/inputs/CommitLogManifestReader.java +++ b/java/google/registry/mapreduce/inputs/CommitLogManifestReader.java @@ -14,9 +14,11 @@ package google.registry.mapreduce.inputs; +import static com.google.common.base.Preconditions.checkNotNull; import static google.registry.model.ofy.ObjectifyService.ofy; import com.google.appengine.api.datastore.Cursor; +import com.google.appengine.api.datastore.DatastoreTimeoutException; import com.google.appengine.api.datastore.QueryResultIterator; import com.google.appengine.tools.mapreduce.InputReader; import com.google.common.base.Optional; @@ -24,7 +26,11 @@ import com.googlecode.objectify.Key; import com.googlecode.objectify.cmd.Query; import google.registry.model.ofy.CommitLogBucket; import google.registry.model.ofy.CommitLogManifest; +import google.registry.util.FormattingLogger; +import google.registry.util.Retrier; +import google.registry.util.SystemSleeper; import java.util.NoSuchElementException; +import java.util.concurrent.Callable; import org.joda.time.DateTime; /** {@link InputReader} that maps over {@link CommitLogManifest}. */ @@ -32,6 +38,8 @@ class CommitLogManifestReader extends InputReader> { private static final long serialVersionUID = 5117046535590539778L; + static final FormattingLogger logger = FormattingLogger.getLoggerForCallerClass(); + /** * Memory estimation for this reader. * @@ -40,6 +48,8 @@ class CommitLogManifestReader extends InputReader> { */ private static final long MEMORY_ESTIMATE = 100 * 1024; + private static final Retrier retrier = new Retrier(new SystemSleeper(), 3); + private final Key bucketKey; /** @@ -125,8 +135,31 @@ class CommitLogManifestReader extends InputReader> { @Override public Key next() { loaded++; + final Cursor currentCursor = queryIterator.getCursor(); try { - return queryIterator.next(); + return retrier.callWithRetry( + new Callable>() { + @Override + public Key call() { + return queryIterator.next(); + } + }, + new Retrier.FailureReporter() { + @Override + public void beforeRetry(Throwable thrown, int failures, int maxAttempts) { + checkNotNull(currentCursor, "Can't retry because cursor is null. Giving up."); + queryIterator = query().startAt(currentCursor).keys().iterator(); + } + + @Override + public void afterFinalFailure(Throwable thrown, int failures) { + logger.severefmt( + "Max retry attempts reached trying to read item %d/%d. Giving up.", + loaded, + total); + } + }, + DatastoreTimeoutException.class); } finally { ofy().clearSessionCache(); // Try not to leak memory. } diff --git a/java/google/registry/util/Retrier.java b/java/google/registry/util/Retrier.java index 2d596093d..5299f4cac 100644 --- a/java/google/registry/util/Retrier.java +++ b/java/google/registry/util/Retrier.java @@ -39,6 +39,29 @@ public class Retrier implements Serializable { private final Sleeper sleeper; private final int attempts; + /** Holds functions to call whenever the code being retried fails. */ + public static interface FailureReporter { + + /** + * Called after a retriable failure happened. + * + *

Not called after the final failure, nor if the Throwable thrown isn't "a retriable error". + * + *

Not called at all if the retrier succeeded on its first attempt. + */ + public void beforeRetry(Throwable thrown, int failures, int maxAttempts); + + /** + * Called after a a non-retriable error. + * + *

Called either after the final failure, or if the Throwable thrown isn't "a retriable + * error". The retrier throws right after calling this function. + * + *

Not called at all if the retrier succeeds. + */ + public void afterFinalFailure(Throwable thrown, int failures); + } + @Inject public Retrier(Sleeper sleeper, @Named("transientFailureRetries") int transientFailureRetries) { this.sleeper = sleeper; @@ -57,17 +80,21 @@ public class Retrier implements Serializable { * * @return the value returned by the {@link Callable}. */ - private V callWithRetry(Callable callable, Predicate isRetryable) { + private V callWithRetry( + Callable callable, + FailureReporter failureReporter, + Predicate isRetryable) { int failures = 0; while (true) { try { return callable.call(); } catch (Throwable e) { if (++failures == attempts || !isRetryable.apply(e)) { + failureReporter.afterFinalFailure(e, failures); throwIfUnchecked(e); throw new RuntimeException(e); } - logger.info(e, "Retrying transient error, attempt " + failures); + failureReporter.beforeRetry(e, failures, attempts); try { // Wait 100ms on the first attempt, doubling on each subsequent attempt. sleeper.sleep(Duration.millis(pow(2, failures) * 100)); @@ -82,6 +109,41 @@ public class Retrier implements Serializable { } } + private static final FailureReporter LOGGING_FAILURE_REPORTER = new FailureReporter() { + @Override + public void beforeRetry(Throwable thrown, int failures, int maxAttempts) { + logger.infofmt(thrown, "Retrying transient error, attempt %d", failures); + } + + @Override + public void afterFinalFailure(Throwable thrown, int failures) {} + }; + + /** + * Retries a unit of work in the face of transient errors. + * + *

Retrying is done a fixed number of times, with exponential backoff, if the exception that is + * thrown is on a whitelist of retryable errors. If the error is not on the whitelist, or if the + * thread is interrupted, or if the allowable number of attempts has been exhausted, the original + * exception is propagated through to the caller. Checked exceptions are wrapped in a + * RuntimeException, while unchecked exceptions are propagated as-is. + * + *

Uses a default FailureReporter that logs before each retry. + * + * @return the value returned by the {@link Callable}. + */ + @SafeVarargs + public final V callWithRetry( + Callable callable, + Class retryableError, + Class... moreRetryableErrors) { + return callWithRetry( + callable, + LOGGING_FAILURE_REPORTER, + retryableError, + moreRetryableErrors); + } + /** * Retries a unit of work in the face of transient errors. * @@ -96,11 +158,12 @@ public class Retrier implements Serializable { @SafeVarargs public final V callWithRetry( Callable callable, + FailureReporter failureReporter, Class retryableError, Class... moreRetryableErrors) { final Set> retryables = new ImmutableSet.Builder>().add(retryableError).add(moreRetryableErrors).build(); - return callWithRetry(callable, new Predicate() { + return callWithRetry(callable, failureReporter, new Predicate() { @Override public boolean apply(Throwable e) { return any(retryables, supertypeOf(e.getClass())); diff --git a/javatests/google/registry/util/RetrierTest.java b/javatests/google/registry/util/RetrierTest.java index 52d2ac1d4..47eb7dff1 100644 --- a/javatests/google/registry/util/RetrierTest.java +++ b/javatests/google/registry/util/RetrierTest.java @@ -14,9 +14,12 @@ package google.registry.util; +import static com.google.common.truth.Truth.assertThat; + import google.registry.testing.ExceptionRule; import google.registry.testing.FakeClock; import google.registry.testing.FakeSleeper; +import google.registry.util.Retrier.FailureReporter; import java.util.concurrent.Callable; import org.junit.Rule; import org.junit.Test; @@ -33,33 +36,97 @@ public class RetrierTest { Retrier retrier = new Retrier(new FakeSleeper(new FakeClock()), 3); /** An exception to throw from {@link CountingThrower}. */ - class CountingException extends RuntimeException { + static class CountingException extends RuntimeException { CountingException(int count) { super("" + count); } } /** Test object that always throws an exception with the current count. */ - class CountingThrower implements Callable { + static class CountingThrower implements Callable { int count = 0; + final int numThrows; + + CountingThrower(int numThrows) { + this.numThrows = numThrows; + } + @Override - public Object call() { + public Integer call() { + if (count == numThrows) { + return numThrows; + } count++; throw new CountingException(count); } } + static class TestReporter implements FailureReporter { + int numBeforeRetry = 0; + int numOnFinalFailure = 0; + + @Override + public void beforeRetry(Throwable e, int failures, int maxAttempts) { + numBeforeRetry++; + assertThat(failures).isEqualTo(numBeforeRetry); + } + + @Override + public void afterFinalFailure(Throwable e, int failures) { + numOnFinalFailure++; + } + + void assertNumbers(int expectedBeforeRetry, int expectedOnFinalFailure) { + assertThat(numBeforeRetry).isEqualTo(expectedBeforeRetry); + assertThat(numOnFinalFailure).isEqualTo(expectedOnFinalFailure); + } + } + @Test public void testRetryableException() throws Exception { thrown.expect(CountingException.class, "3"); - retrier.callWithRetry(new CountingThrower(), CountingException.class); + retrier.callWithRetry(new CountingThrower(3), CountingException.class); } @Test public void testUnretryableException() throws Exception { thrown.expect(CountingException.class, "1"); - retrier.callWithRetry(new CountingThrower(), IllegalArgumentException.class); + retrier.callWithRetry(new CountingThrower(5), IllegalArgumentException.class); + } + + @Test + public void testRetrySucceeded() throws Exception { + assertThat(retrier.callWithRetry(new CountingThrower(2), CountingException.class)) + .isEqualTo(2); + } + + @Test + public void testRetryFailed_withReporter() throws Exception { + thrown.expect(CountingException.class, "3"); + TestReporter reporter = new TestReporter(); + try { + retrier.callWithRetry(new CountingThrower(3), reporter, CountingException.class); + } catch (CountingException expected) { + reporter.assertNumbers(2, 1); + throw expected; + } + } + + @Test + public void testRetrySucceeded_withReporter() throws Exception { + TestReporter reporter = new TestReporter(); + assertThat(retrier.callWithRetry(new CountingThrower(2), reporter, CountingException.class)) + .isEqualTo(2); + reporter.assertNumbers(2, 0); + } + + @Test + public void testFirstTrySucceeded_withReporter() throws Exception { + TestReporter reporter = new TestReporter(); + assertThat(retrier.callWithRetry(new CountingThrower(0), reporter, CountingException.class)) + .isEqualTo(0); + reporter.assertNumbers(0, 0); } }