mirror of
https://github.com/google/nomulus.git
synced 2025-05-13 16:07:15 +02:00
Retry Datastore errors in CommitLogManifestReader.next()
When trying to run the MapReduce for DeleteOldCommitLogsAction, we run into a lot of DatastoreTimeoutException during CommitLogManifestReader.next. This causes the entire shard to fail. Since we have a lot of keys (tens of millions), this is almost guaranteed to happen, dooming the entire MapReduce. Here is an attempt to recover from the Timeout Exception by saving the state before the read, then on failure restoring that state and trying again. ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=165172222
This commit is contained in:
parent
8b0b54e997
commit
00f2662f33
3 changed files with 172 additions and 9 deletions
|
@ -14,9 +14,11 @@
|
||||||
|
|
||||||
package google.registry.mapreduce.inputs;
|
package google.registry.mapreduce.inputs;
|
||||||
|
|
||||||
|
import static com.google.common.base.Preconditions.checkNotNull;
|
||||||
import static google.registry.model.ofy.ObjectifyService.ofy;
|
import static google.registry.model.ofy.ObjectifyService.ofy;
|
||||||
|
|
||||||
import com.google.appengine.api.datastore.Cursor;
|
import com.google.appengine.api.datastore.Cursor;
|
||||||
|
import com.google.appengine.api.datastore.DatastoreTimeoutException;
|
||||||
import com.google.appengine.api.datastore.QueryResultIterator;
|
import com.google.appengine.api.datastore.QueryResultIterator;
|
||||||
import com.google.appengine.tools.mapreduce.InputReader;
|
import com.google.appengine.tools.mapreduce.InputReader;
|
||||||
import com.google.common.base.Optional;
|
import com.google.common.base.Optional;
|
||||||
|
@ -24,7 +26,11 @@ import com.googlecode.objectify.Key;
|
||||||
import com.googlecode.objectify.cmd.Query;
|
import com.googlecode.objectify.cmd.Query;
|
||||||
import google.registry.model.ofy.CommitLogBucket;
|
import google.registry.model.ofy.CommitLogBucket;
|
||||||
import google.registry.model.ofy.CommitLogManifest;
|
import google.registry.model.ofy.CommitLogManifest;
|
||||||
|
import google.registry.util.FormattingLogger;
|
||||||
|
import google.registry.util.Retrier;
|
||||||
|
import google.registry.util.SystemSleeper;
|
||||||
import java.util.NoSuchElementException;
|
import java.util.NoSuchElementException;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
|
|
||||||
/** {@link InputReader} that maps over {@link CommitLogManifest}. */
|
/** {@link InputReader} that maps over {@link CommitLogManifest}. */
|
||||||
|
@ -32,6 +38,8 @@ class CommitLogManifestReader extends InputReader<Key<CommitLogManifest>> {
|
||||||
|
|
||||||
private static final long serialVersionUID = 5117046535590539778L;
|
private static final long serialVersionUID = 5117046535590539778L;
|
||||||
|
|
||||||
|
static final FormattingLogger logger = FormattingLogger.getLoggerForCallerClass();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Memory estimation for this reader.
|
* Memory estimation for this reader.
|
||||||
*
|
*
|
||||||
|
@ -40,6 +48,8 @@ class CommitLogManifestReader extends InputReader<Key<CommitLogManifest>> {
|
||||||
*/
|
*/
|
||||||
private static final long MEMORY_ESTIMATE = 100 * 1024;
|
private static final long MEMORY_ESTIMATE = 100 * 1024;
|
||||||
|
|
||||||
|
private static final Retrier retrier = new Retrier(new SystemSleeper(), 3);
|
||||||
|
|
||||||
private final Key<CommitLogBucket> bucketKey;
|
private final Key<CommitLogBucket> bucketKey;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -125,8 +135,31 @@ class CommitLogManifestReader extends InputReader<Key<CommitLogManifest>> {
|
||||||
@Override
|
@Override
|
||||||
public Key<CommitLogManifest> next() {
|
public Key<CommitLogManifest> next() {
|
||||||
loaded++;
|
loaded++;
|
||||||
|
final Cursor currentCursor = queryIterator.getCursor();
|
||||||
try {
|
try {
|
||||||
|
return retrier.callWithRetry(
|
||||||
|
new Callable<Key<CommitLogManifest>>() {
|
||||||
|
@Override
|
||||||
|
public Key<CommitLogManifest> call() {
|
||||||
return queryIterator.next();
|
return queryIterator.next();
|
||||||
|
}
|
||||||
|
},
|
||||||
|
new Retrier.FailureReporter() {
|
||||||
|
@Override
|
||||||
|
public void beforeRetry(Throwable thrown, int failures, int maxAttempts) {
|
||||||
|
checkNotNull(currentCursor, "Can't retry because cursor is null. Giving up.");
|
||||||
|
queryIterator = query().startAt(currentCursor).keys().iterator();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void afterFinalFailure(Throwable thrown, int failures) {
|
||||||
|
logger.severefmt(
|
||||||
|
"Max retry attempts reached trying to read item %d/%d. Giving up.",
|
||||||
|
loaded,
|
||||||
|
total);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
DatastoreTimeoutException.class);
|
||||||
} finally {
|
} finally {
|
||||||
ofy().clearSessionCache(); // Try not to leak memory.
|
ofy().clearSessionCache(); // Try not to leak memory.
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,6 +39,29 @@ public class Retrier implements Serializable {
|
||||||
private final Sleeper sleeper;
|
private final Sleeper sleeper;
|
||||||
private final int attempts;
|
private final int attempts;
|
||||||
|
|
||||||
|
/** Holds functions to call whenever the code being retried fails. */
|
||||||
|
public static interface FailureReporter {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called after a retriable failure happened.
|
||||||
|
*
|
||||||
|
* <p>Not called after the final failure, nor if the Throwable thrown isn't "a retriable error".
|
||||||
|
*
|
||||||
|
* <p>Not called at all if the retrier succeeded on its first attempt.
|
||||||
|
*/
|
||||||
|
public void beforeRetry(Throwable thrown, int failures, int maxAttempts);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called after a a non-retriable error.
|
||||||
|
*
|
||||||
|
* <p>Called either after the final failure, or if the Throwable thrown isn't "a retriable
|
||||||
|
* error". The retrier throws right after calling this function.
|
||||||
|
*
|
||||||
|
* <p>Not called at all if the retrier succeeds.
|
||||||
|
*/
|
||||||
|
public void afterFinalFailure(Throwable thrown, int failures);
|
||||||
|
}
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public Retrier(Sleeper sleeper, @Named("transientFailureRetries") int transientFailureRetries) {
|
public Retrier(Sleeper sleeper, @Named("transientFailureRetries") int transientFailureRetries) {
|
||||||
this.sleeper = sleeper;
|
this.sleeper = sleeper;
|
||||||
|
@ -57,17 +80,21 @@ public class Retrier implements Serializable {
|
||||||
*
|
*
|
||||||
* @return <V> the value returned by the {@link Callable}.
|
* @return <V> the value returned by the {@link Callable}.
|
||||||
*/
|
*/
|
||||||
private <V> V callWithRetry(Callable<V> callable, Predicate<Throwable> isRetryable) {
|
private <V> V callWithRetry(
|
||||||
|
Callable<V> callable,
|
||||||
|
FailureReporter failureReporter,
|
||||||
|
Predicate<Throwable> isRetryable) {
|
||||||
int failures = 0;
|
int failures = 0;
|
||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
return callable.call();
|
return callable.call();
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
if (++failures == attempts || !isRetryable.apply(e)) {
|
if (++failures == attempts || !isRetryable.apply(e)) {
|
||||||
|
failureReporter.afterFinalFailure(e, failures);
|
||||||
throwIfUnchecked(e);
|
throwIfUnchecked(e);
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
logger.info(e, "Retrying transient error, attempt " + failures);
|
failureReporter.beforeRetry(e, failures, attempts);
|
||||||
try {
|
try {
|
||||||
// Wait 100ms on the first attempt, doubling on each subsequent attempt.
|
// Wait 100ms on the first attempt, doubling on each subsequent attempt.
|
||||||
sleeper.sleep(Duration.millis(pow(2, failures) * 100));
|
sleeper.sleep(Duration.millis(pow(2, failures) * 100));
|
||||||
|
@ -82,6 +109,41 @@ public class Retrier implements Serializable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static final FailureReporter LOGGING_FAILURE_REPORTER = new FailureReporter() {
|
||||||
|
@Override
|
||||||
|
public void beforeRetry(Throwable thrown, int failures, int maxAttempts) {
|
||||||
|
logger.infofmt(thrown, "Retrying transient error, attempt %d", failures);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void afterFinalFailure(Throwable thrown, int failures) {}
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Retries a unit of work in the face of transient errors.
|
||||||
|
*
|
||||||
|
* <p>Retrying is done a fixed number of times, with exponential backoff, if the exception that is
|
||||||
|
* thrown is on a whitelist of retryable errors. If the error is not on the whitelist, or if the
|
||||||
|
* thread is interrupted, or if the allowable number of attempts has been exhausted, the original
|
||||||
|
* exception is propagated through to the caller. Checked exceptions are wrapped in a
|
||||||
|
* RuntimeException, while unchecked exceptions are propagated as-is.
|
||||||
|
*
|
||||||
|
* <p>Uses a default FailureReporter that logs before each retry.
|
||||||
|
*
|
||||||
|
* @return <V> the value returned by the {@link Callable}.
|
||||||
|
*/
|
||||||
|
@SafeVarargs
|
||||||
|
public final <V> V callWithRetry(
|
||||||
|
Callable<V> callable,
|
||||||
|
Class<? extends Throwable> retryableError,
|
||||||
|
Class<? extends Throwable>... moreRetryableErrors) {
|
||||||
|
return callWithRetry(
|
||||||
|
callable,
|
||||||
|
LOGGING_FAILURE_REPORTER,
|
||||||
|
retryableError,
|
||||||
|
moreRetryableErrors);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Retries a unit of work in the face of transient errors.
|
* Retries a unit of work in the face of transient errors.
|
||||||
*
|
*
|
||||||
|
@ -96,11 +158,12 @@ public class Retrier implements Serializable {
|
||||||
@SafeVarargs
|
@SafeVarargs
|
||||||
public final <V> V callWithRetry(
|
public final <V> V callWithRetry(
|
||||||
Callable<V> callable,
|
Callable<V> callable,
|
||||||
|
FailureReporter failureReporter,
|
||||||
Class<? extends Throwable> retryableError,
|
Class<? extends Throwable> retryableError,
|
||||||
Class<? extends Throwable>... moreRetryableErrors) {
|
Class<? extends Throwable>... moreRetryableErrors) {
|
||||||
final Set<Class<?>> retryables =
|
final Set<Class<?>> retryables =
|
||||||
new ImmutableSet.Builder<Class<?>>().add(retryableError).add(moreRetryableErrors).build();
|
new ImmutableSet.Builder<Class<?>>().add(retryableError).add(moreRetryableErrors).build();
|
||||||
return callWithRetry(callable, new Predicate<Throwable>() {
|
return callWithRetry(callable, failureReporter, new Predicate<Throwable>() {
|
||||||
@Override
|
@Override
|
||||||
public boolean apply(Throwable e) {
|
public boolean apply(Throwable e) {
|
||||||
return any(retryables, supertypeOf(e.getClass()));
|
return any(retryables, supertypeOf(e.getClass()));
|
||||||
|
|
|
@ -14,9 +14,12 @@
|
||||||
|
|
||||||
package google.registry.util;
|
package google.registry.util;
|
||||||
|
|
||||||
|
import static com.google.common.truth.Truth.assertThat;
|
||||||
|
|
||||||
import google.registry.testing.ExceptionRule;
|
import google.registry.testing.ExceptionRule;
|
||||||
import google.registry.testing.FakeClock;
|
import google.registry.testing.FakeClock;
|
||||||
import google.registry.testing.FakeSleeper;
|
import google.registry.testing.FakeSleeper;
|
||||||
|
import google.registry.util.Retrier.FailureReporter;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -33,33 +36,97 @@ public class RetrierTest {
|
||||||
Retrier retrier = new Retrier(new FakeSleeper(new FakeClock()), 3);
|
Retrier retrier = new Retrier(new FakeSleeper(new FakeClock()), 3);
|
||||||
|
|
||||||
/** An exception to throw from {@link CountingThrower}. */
|
/** An exception to throw from {@link CountingThrower}. */
|
||||||
class CountingException extends RuntimeException {
|
static class CountingException extends RuntimeException {
|
||||||
CountingException(int count) {
|
CountingException(int count) {
|
||||||
super("" + count);
|
super("" + count);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Test object that always throws an exception with the current count. */
|
/** Test object that always throws an exception with the current count. */
|
||||||
class CountingThrower implements Callable<Object> {
|
static class CountingThrower implements Callable<Integer> {
|
||||||
|
|
||||||
int count = 0;
|
int count = 0;
|
||||||
|
|
||||||
|
final int numThrows;
|
||||||
|
|
||||||
|
CountingThrower(int numThrows) {
|
||||||
|
this.numThrows = numThrows;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Object call() {
|
public Integer call() {
|
||||||
|
if (count == numThrows) {
|
||||||
|
return numThrows;
|
||||||
|
}
|
||||||
count++;
|
count++;
|
||||||
throw new CountingException(count);
|
throw new CountingException(count);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static class TestReporter implements FailureReporter {
|
||||||
|
int numBeforeRetry = 0;
|
||||||
|
int numOnFinalFailure = 0;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void beforeRetry(Throwable e, int failures, int maxAttempts) {
|
||||||
|
numBeforeRetry++;
|
||||||
|
assertThat(failures).isEqualTo(numBeforeRetry);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void afterFinalFailure(Throwable e, int failures) {
|
||||||
|
numOnFinalFailure++;
|
||||||
|
}
|
||||||
|
|
||||||
|
void assertNumbers(int expectedBeforeRetry, int expectedOnFinalFailure) {
|
||||||
|
assertThat(numBeforeRetry).isEqualTo(expectedBeforeRetry);
|
||||||
|
assertThat(numOnFinalFailure).isEqualTo(expectedOnFinalFailure);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRetryableException() throws Exception {
|
public void testRetryableException() throws Exception {
|
||||||
thrown.expect(CountingException.class, "3");
|
thrown.expect(CountingException.class, "3");
|
||||||
retrier.callWithRetry(new CountingThrower(), CountingException.class);
|
retrier.callWithRetry(new CountingThrower(3), CountingException.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testUnretryableException() throws Exception {
|
public void testUnretryableException() throws Exception {
|
||||||
thrown.expect(CountingException.class, "1");
|
thrown.expect(CountingException.class, "1");
|
||||||
retrier.callWithRetry(new CountingThrower(), IllegalArgumentException.class);
|
retrier.callWithRetry(new CountingThrower(5), IllegalArgumentException.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRetrySucceeded() throws Exception {
|
||||||
|
assertThat(retrier.callWithRetry(new CountingThrower(2), CountingException.class))
|
||||||
|
.isEqualTo(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRetryFailed_withReporter() throws Exception {
|
||||||
|
thrown.expect(CountingException.class, "3");
|
||||||
|
TestReporter reporter = new TestReporter();
|
||||||
|
try {
|
||||||
|
retrier.callWithRetry(new CountingThrower(3), reporter, CountingException.class);
|
||||||
|
} catch (CountingException expected) {
|
||||||
|
reporter.assertNumbers(2, 1);
|
||||||
|
throw expected;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRetrySucceeded_withReporter() throws Exception {
|
||||||
|
TestReporter reporter = new TestReporter();
|
||||||
|
assertThat(retrier.callWithRetry(new CountingThrower(2), reporter, CountingException.class))
|
||||||
|
.isEqualTo(2);
|
||||||
|
reporter.assertNumbers(2, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFirstTrySucceeded_withReporter() throws Exception {
|
||||||
|
TestReporter reporter = new TestReporter();
|
||||||
|
assertThat(retrier.callWithRetry(new CountingThrower(0), reporter, CountingException.class))
|
||||||
|
.isEqualTo(0);
|
||||||
|
reporter.assertNumbers(0, 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue