Retry any Datastore reads in EppResource map-reduce input

Datastore has a non-zero chance of failing on reads. A map-reduce with too many
failures will eventually give up. As a result, any map-reduce that goes over a
large number of datastore entities is almost guaranteed to fail.

Since we expect to have a large number of EppResources, we make sure to wrap
all datastore reads with some retrying mechanism to reduce the number of
transient failures that propagate to Map-Reduce.

This feature already existed for CommitLogManifestReader, we refactor the code to use the same retrying mechanism in EppResource readers.

Also removed the transactNew around the reads because looking at the source - it doesn't actually do anything we need (doesn't retry on any failure other than concurrency failure)

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=190633281
This commit is contained in:
guyben 2018-03-27 10:21:21 -07:00 committed by jianglai
parent 422ec9b97a
commit 2bbde9d9a9
8 changed files with 293 additions and 167 deletions

View file

@ -140,9 +140,7 @@ public final class DeleteOldCommitLogsAction implements Runnable {
// If it isn't a Key<CommitLogManifest> then it should be an EppResource, which we need to
// load to emit the revisions.
//
// We want to make sure we retry any load individually to reduce the chance of the entire
// shard failing, hence we wrap it in a transactNew.
Object object = ofy().transactNew(() -> ofy().load().key(key).now());
Object object = ofy().load().key(key).now();
checkNotNull(object, "Received a key to a missing object. key: %s", key);
checkState(
object instanceof EppResource,

View file

@ -25,7 +25,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.googlecode.objectify.Key;
import com.googlecode.objectify.annotation.Entity;
import com.googlecode.objectify.cmd.Query;
import google.registry.model.EppResource;
import google.registry.model.ImmutableObject;
import google.registry.model.index.EppResourceIndex;
@ -33,6 +32,7 @@ import google.registry.model.index.EppResourceIndexBucket;
import google.registry.util.FormattingLogger;
import java.io.IOException;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
/**
* Reader that maps over {@link EppResourceIndex} and returns resources that are children of
@ -40,24 +40,21 @@ import java.util.NoSuchElementException;
*/
class ChildEntityReader<R extends EppResource, I extends ImmutableObject> extends InputReader<I> {
private static final long serialVersionUID = -7430731417793849164L;
private static final long serialVersionUID = 7481761146349663848L;
static final FormattingLogger logger = FormattingLogger.getLoggerForCallerClass();
/** This reader uses an EppResourceEntityReader under the covers to iterate over EPP resources. */
private final EppResourceEntityReader<? extends R> eppResourceEntityReader;
/** The current EPP resource being referenced for child entity queries. */
private Key<? extends R> currentEppResource;
/** The child resource classes to postfilter for. */
private final ImmutableList<Class<? extends I>> childResourceClasses;
/** The index within the list above for the next ofy query. */
private int childResourceClassIndex;
/** An iterator over queries for child entities of EppResources. */
private transient QueryResultIterator<I> childQueryIterator;
/** A cursor for queries for child entities of EppResources. */
private Cursor childCursor;
/** A reader used to go over children of the current eppResourceEntity and childResourceClass. */
@Nullable private ChildReader<? extends I> childReader;
public ChildEntityReader(
Key<EppResourceIndexBucket> bucketKey,
@ -92,28 +89,58 @@ class ChildEntityReader<R extends EppResource, I extends ImmutableObject> extend
* @throws NoSuchElementException if there are no more EPP resources to iterate over.
*/
I nextChild() throws NoSuchElementException {
try {
while (true) {
if (currentEppResource == null) {
currentEppResource = Key.create(eppResourceEntityReader.next());
// This code implements a single iteration over a triple-nested loop. It returns the next
// innermost item of that 3-nested loop. The entire loop would look like this:
//
// NOTE: I'm treating eppResourceEntityReader and childReader as if they were iterables for
// brevity, although they aren't - they are Readers
//
// I'm also using the python 'yield' command to show we're returning this item one by one.
//
// for (eppResourceEntity : eppResourceEntityReader) {
// for (childResourceClass : childResourceClasses) {
// for (I child : ChildReader.create(childResourceClass, Key.create(eppResourceEntity)) {
// yield child; // returns the 'child's one by one.
// }
// }
// }
// First, set all the variables if they aren't set yet. This should only happen on the first
// time in the function.
//
// This can be merged with the calls in the "catch" below to avoid code duplication, but it
// makes the code harder to read.
if (childReader == null) {
childResourceClassIndex = 0;
childReader =
ChildReader.create(
childResourceClasses.get(childResourceClassIndex),
Key.create(eppResourceEntityReader.next()));
}
// Then continue advancing the 3-nested loop until we find a value
while (true) {
try {
// Advance the inner loop and return the next value.
return childReader.next();
} catch (NoSuchElementException e) {
// If we got here it means the inner loop (childQueryIterator) is done - we need to advance
// the middle loop by one, and then reset the inner loop.
childResourceClassIndex++;
// Check if the middle loop is done as well
if (childResourceClassIndex < childResourceClasses.size()) {
// The middle loop is not done. Reset the inner loop.
childReader = childReader.withType(childResourceClasses.get(childResourceClassIndex));
} else {
// We're done with the middle loop as well! Advance the outer loop, and reset the middle
// loop and inner loops
childResourceClassIndex = 0;
childQueryIterator = null;
}
if (childQueryIterator == null) {
childQueryIterator = childQuery().iterator();
}
try {
return childQueryIterator.next();
} catch (NoSuchElementException e) {
childQueryIterator = null;
childResourceClassIndex++;
if (childResourceClassIndex >= childResourceClasses.size()) {
currentEppResource = null;
}
childReader =
ChildReader.create(
childResourceClasses.get(childResourceClassIndex),
Key.create(eppResourceEntityReader.next()));
}
// Loop back up the while, to try reading reading a value again
}
} finally {
ofy().clearSessionCache(); // Try not to leak memory.
}
}
@ -132,30 +159,20 @@ class ChildEntityReader<R extends EppResource, I extends ImmutableObject> extend
}
}
/** Query for children of the current resource and of the current child class. */
private Query<I> childQuery() {
@SuppressWarnings("unchecked")
Query<I> query = (Query<I>) ofy().load()
.type(childResourceClasses.get(childResourceClassIndex))
.ancestor(currentEppResource);
return query;
}
@Override
public void beginSlice() {
eppResourceEntityReader.beginSlice();
if (childCursor != null) {
Query<I> query = childQuery().startAt(childCursor);
childQueryIterator = query.iterator();
if (childReader != null) {
childReader.beginSlice();
}
}
@Override
public void endSlice() {
if (childQueryIterator != null) {
childCursor = childQueryIterator.getCursor();
}
eppResourceEntityReader.endSlice();
if (childReader != null) {
childReader.endSlice();
}
}
@Override
@ -187,4 +204,53 @@ class ChildEntityReader<R extends EppResource, I extends ImmutableObject> extend
public void endShard() throws IOException {
eppResourceEntityReader.endShard();
}
private static class ChildReader<I> extends RetryingInputReader<I, I> {
private static final long serialVersionUID = -8443132445119657998L;
private final Class<I> type;
private final Key<?> ancestor;
/** Create a reader that goes over all the children of a given type to the given ancestor. */
public ChildReader(Class<I> type, Key<?> ancestor) {
this.type = type;
this.ancestor = ancestor;
// This reader isn't initialized by mapreduce, so we need to initialize it ourselves
beginShard();
beginSlice();
}
/**
* Create a reader that goes over all the children of a given type to the given ancestor.
*
* <p>We need this function in addition to the constructor so that we can create a ChildReader<?
* extends I>.
*/
public static <I> ChildReader<I> create(Class<I> type, Key<?> ancestor) {
return new ChildReader<I>(type, ancestor);
}
/** Query for children of the current resource and of the current child class. */
@Override
public QueryResultIterator<I> getQueryIterator(Cursor cursor) {
return startQueryAt(ofy().load().type(type).ancestor(ancestor), cursor).iterator();
}
@Override
public int getTotal() {
return 0;
}
@Override
public I next() {
return nextQueryResult();
}
/** Retruns a new ChildReader of the same ancestor for the given type. */
public <J> ChildReader<J> withType(Class<J> type) {
return create(type, ancestor);
}
}
}

View file

@ -14,11 +14,9 @@
package google.registry.mapreduce.inputs;
import static com.google.common.base.Preconditions.checkNotNull;
import static google.registry.model.ofy.ObjectifyService.ofy;
import com.google.appengine.api.datastore.Cursor;
import com.google.appengine.api.datastore.DatastoreTimeoutException;
import com.google.appengine.api.datastore.QueryResultIterator;
import com.google.appengine.tools.mapreduce.InputReader;
import com.googlecode.objectify.Key;
@ -26,14 +24,13 @@ import com.googlecode.objectify.cmd.Query;
import google.registry.model.ofy.CommitLogBucket;
import google.registry.model.ofy.CommitLogManifest;
import google.registry.util.FormattingLogger;
import google.registry.util.Retrier;
import google.registry.util.SystemSleeper;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
import org.joda.time.DateTime;
/** {@link InputReader} that maps over {@link CommitLogManifest}. */
class CommitLogManifestReader extends InputReader<Key<CommitLogManifest>> {
class CommitLogManifestReader
extends RetryingInputReader<Key<CommitLogManifest>, Key<CommitLogManifest>> {
static final FormattingLogger logger = FormattingLogger.getLoggerForCallerClass();
@ -45,8 +42,7 @@ class CommitLogManifestReader extends InputReader<Key<CommitLogManifest>> {
*/
private static final long MEMORY_ESTIMATE = 100 * 1024;
private static final Retrier retrier = new Retrier(new SystemSleeper(), 3);
private static final long serialVersionUID = 2553537421598284748L;
private static final long serialVersionUID = 6215490573108252100L;
private final Key<CommitLogBucket> bucketKey;
@ -58,55 +54,19 @@ class CommitLogManifestReader extends InputReader<Key<CommitLogManifest>> {
@Nullable
private final DateTime olderThan;
private Cursor cursor;
private int total;
private int loaded;
private transient QueryResultIterator<Key<CommitLogManifest>> queryIterator;
CommitLogManifestReader(Key<CommitLogBucket> bucketKey, @Nullable DateTime olderThan) {
this.bucketKey = bucketKey;
this.olderThan = olderThan;
}
/** Called once at start. Cache the expected size. */
@Override
public void beginShard() {
total = query().count();
public QueryResultIterator<Key<CommitLogManifest>> getQueryIterator(@Nullable Cursor cursor) {
return startQueryAt(query(), cursor).keys().iterator();
}
/** Called every time we are deserialized. Create a new query or resume an existing one. */
@Override
public void beginSlice() {
Query<CommitLogManifest> query = query();
if (cursor != null) {
// The underlying query is strongly consistent, and according to the documentation at
// https://cloud.google.com/appengine/docs/java/datastore/queries#Java_Data_consistency
// "strongly consistent queries are always transactionally consistent". However, each time
// we restart the query at a cursor we have a new effective query, and "if the results for a
// query change between uses of a cursor, the query notices only changes that occur in
// results after the cursor. If a new result appears before the cursor's position for the
// query, it will not be returned when the results after the cursor are fetched."
// What this means in practice is that entities that are created after the initial query
// begins may or may not be seen by this reader, depending on whether the query was
// paused and restarted with a cursor before it would have reached the new entity.
query = query.startAt(cursor);
}
queryIterator = query.keys().iterator();
}
/** Called occasionally alongside {@link #next}. */
@Override
public Double getProgress() {
// Cap progress at 1.0, since the query's count() can increase during the run of the mapreduce
// if more entities are written, but we've cached the value once in "total".
return Math.min(1.0, ((double) loaded) / total);
}
/** Called before we are serialized. Save a serializable cursor for this query. */
@Override
public void endSlice() {
cursor = queryIterator.getCursor();
public int getTotal() {
return query().count();
}
/** Query for children of this bucket. */
@ -133,19 +93,6 @@ class CommitLogManifestReader extends InputReader<Key<CommitLogManifest>> {
*/
@Override
public Key<CommitLogManifest> next() {
loaded++;
final Cursor currentCursor = queryIterator.getCursor();
try {
return retrier.callWithRetry(
() -> queryIterator.next(),
(thrown, failures, maxAttempts) -> {
checkNotNull(currentCursor, "Can't retry because cursor is null. Giving up.");
queryIterator = query().startAt(currentCursor).keys().iterator();
},
DatastoreTimeoutException.class);
} finally {
ofy().clearSessionCache(); // Try not to leak memory.
}
return nextQueryResult();
}
}

View file

@ -27,17 +27,17 @@ import google.registry.model.EppResource;
import google.registry.model.index.EppResourceIndex;
import google.registry.model.index.EppResourceIndexBucket;
import google.registry.util.FormattingLogger;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
/** Base class for {@link InputReader} classes that map over {@link EppResourceIndex}. */
abstract class EppResourceBaseReader<T> extends InputReader<T> {
abstract class EppResourceBaseReader<T> extends RetryingInputReader<EppResourceIndex, T> {
static final FormattingLogger logger = FormattingLogger.getLoggerForCallerClass();
/** Number of bytes in 1MB of memory, used for memory estimates. */
static final long ONE_MB = 1024 * 1024;
private static final long serialVersionUID = -2970253037856017147L;
private static final long serialVersionUID = 7942584269402339168L;
/**
* The resource kinds to filter for.
@ -51,12 +51,6 @@ abstract class EppResourceBaseReader<T> extends InputReader<T> {
private final Key<EppResourceIndexBucket> bucketKey;
private final long memoryEstimate;
private Cursor cursor;
private int total;
private int loaded;
private transient QueryResultIterator<EppResourceIndex> queryIterator;
EppResourceBaseReader(
Key<EppResourceIndexBucket> bucketKey,
long memoryEstimate,
@ -66,44 +60,14 @@ abstract class EppResourceBaseReader<T> extends InputReader<T> {
this.filterKinds = filterKinds;
}
/** Called once at start. Cache the expected size. */
@Override
public void beginShard() {
total = query().count();
public QueryResultIterator<EppResourceIndex> getQueryIterator(@Nullable Cursor cursor) {
return startQueryAt(query(), cursor).iterator();
}
/** Called every time we are deserialized. Create a new query or resume an existing one. */
@Override
public void beginSlice() {
Query<EppResourceIndex> query = query();
if (cursor != null) {
// The underlying query is strongly consistent, and according to the documentation at
// https://cloud.google.com/appengine/docs/java/datastore/queries#Java_Data_consistency
// "strongly consistent queries are always transactionally consistent". However, each time
// we restart the query at a cursor we have a new effective query, and "if the results for a
// query change between uses of a cursor, the query notices only changes that occur in
// results after the cursor. If a new result appears before the cursor's position for the
// query, it will not be returned when the results after the cursor are fetched."
// What this means in practice is that entities that are created after the initial query
// begins may or may not be seen by this reader, depending on whether the query was
// paused and restarted with a cursor before it would have reached the new entity.
query = query.startAt(cursor);
}
queryIterator = query.iterator();
}
/** Called occasionally alongside {@link #next}. */
@Override
public Double getProgress() {
// Cap progress at 1.0, since the query's count() can increase during the run of the mapreduce
// if more entities are written, but we've cached the value once in "total".
return Math.min(1.0, ((double) loaded) / total);
}
/** Called before we are serialized. Save a serializable cursor for this query. */
@Override
public void endSlice() {
cursor = queryIterator.getCursor();
public int getTotal() {
return query().count();
}
/** Query for children of this bucket. */
@ -118,20 +82,6 @@ abstract class EppResourceBaseReader<T> extends InputReader<T> {
return memoryEstimate;
}
/**
* Get the next {@link EppResourceIndex} from the query.
*
* @throws NoSuchElementException if there are no more elements.
*/
EppResourceIndex nextEri() {
loaded++;
try {
return queryIterator.next();
} finally {
ofy().clearSessionCache(); // Try not to leak memory.
}
}
static <R extends EppResource> ImmutableSet<String> varargsToKinds(
ImmutableSet<Class<? extends R>> resourceClasses) {
// Ignore EppResource when finding kinds, since it doesn't have one and doesn't imply filtering.

View file

@ -56,9 +56,9 @@ class EppResourceEntityReader<R extends EppResource> extends EppResourceBaseRead
*/
@Override
public R next() throws NoSuchElementException {
// Loop until we find a value, or nextEri() throws a NoSuchElementException.
// Loop until we find a value, or nextQueryResult() throws a NoSuchElementException.
while (true) {
Key<? extends EppResource> key = nextEri().getKey();
Key<? extends EppResource> key = nextQueryResult().getKey();
EppResource resource = ofy().load().key(key).now();
if (resource == null) {
logger.severefmt("EppResourceIndex key %s points at a missing resource", key);

View file

@ -40,6 +40,6 @@ class EppResourceIndexReader extends EppResourceBaseReader<EppResourceIndex> {
*/
@Override
public EppResourceIndex next() throws NoSuchElementException {
return nextEri();
return nextQueryResult();
}
}

View file

@ -49,6 +49,6 @@ class EppResourceKeyReader<R extends EppResource> extends EppResourceBaseReader<
@SuppressWarnings("unchecked")
public Key<R> next() throws NoSuchElementException {
// This is a safe cast because we filtered on kind inside the query.
return (Key<R>) nextEri().getKey();
return (Key<R>) nextQueryResult().getKey();
}
}

View file

@ -0,0 +1,165 @@
// Copyright 2018 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.mapreduce.inputs;
import static com.google.common.base.Preconditions.checkNotNull;
import static google.registry.model.ofy.ObjectifyService.ofy;
import com.google.appengine.api.datastore.Cursor;
import com.google.appengine.api.datastore.DatastoreTimeoutException;
import com.google.appengine.api.datastore.QueryResultIterator;
import com.google.appengine.tools.mapreduce.InputReader;
import com.googlecode.objectify.cmd.Query;
import google.registry.util.FormattingLogger;
import google.registry.util.Retrier;
import google.registry.util.SystemSleeper;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
/**
* A reader over objectify query that retries reads on failure.
*
* <p>When doing a mapreduce over a large number of elements from Datastore, the random
* DatastoreTimeoutExceptions that happen sometimes can eventually add up and cause the entire
* mapreduce to fail.
*
* <p>This base RetryingInputReader will automatically retry any DatastoreTimeoutException to
* minimize the failures.
*
* <p>I is the internal Objectify read type, while T is the InputReader return type.
*/
abstract class RetryingInputReader<I, T> extends InputReader<T> {
private static final long serialVersionUID = -4897677478541818899L;
private static final Retrier retrier = new Retrier(new SystemSleeper(), 5);
private static final FormattingLogger logger = FormattingLogger.getLoggerForCallerClass();
@Nullable private Cursor cursor;
private int total;
private int loaded;
private transient QueryResultIterator<I> queryIterator;
/**
* Return the iterator over Query results, starting at the cursor location.
*
* <p>Must always return an iterator over the same query.
*
* <p>The underlying {@link Query} must have an ancestor filter, so that it is strongly
* consistent. According to the documentation at
* https://cloud.google.com/appengine/docs/java/datastore/queries#Java_Data_consistency
*
* <p>"strongly consistent queries are always transactionally consistent". However, each time we
* restart the query at a cursor we have a new effective query, and "if the results for a query
* change between uses of a cursor, the query notices only changes that occur in results after the
* cursor. If a new result appears before the cursor's position for the query, it will not be
* returned when the results after the cursor are fetched."
*
* <p>What this means in practice is that entities that are created after the initial query begins
* may or may not be seen by this reader, depending on whether the query was paused and restarted
* with a cursor before it would have reached the new entity.
*
* @param cursor the initial location for the iterator to start from. If null - start from
* beginning.
*/
public abstract QueryResultIterator<I> getQueryIterator(@Nullable Cursor cursor);
/**
* Return the total number of elements the iterator goes over.
*
* <p>The results are cached - this function will only be called once on the start of the shard,
* or when the iterator is reset.
*
* <p>The results are only used for debugging / progress display. It is safe to return 0.
*/
public abstract int getTotal();
/**
* Return the next item of this InputReader.
*
* <p>You probably want to use {@link #nextQueryResult} internally when preparing the next item.
* It is OK to call {@link #nextQueryResult} multiple times.
*/
@Override
public abstract T next();
/** Called once at start. Cache the expected size. */
@Override
public void beginShard() {
total = getTotal();
}
/** Called every time we are deserialized. Create a new query or resume an existing one. */
@Override
public void beginSlice() {
queryIterator = getQueryIterator(cursor);
}
/** Called occasionally alongside {@link #next}. */
@Override
public Double getProgress() {
// Cap progress at 1.0, since the query's count() can increase during the run of the mapreduce
// if more entities are written, but we've cached the value once in "total".
return Math.min(1.0, ((double) loaded) / Math.max(1, total));
}
/** Called before we are serialized. Save a serializable cursor for this query. */
@Override
public void endSlice() {
cursor = queryIterator.getCursor();
}
/**
* Get the next item from the query results.
*
* <p>Use this to create the next() function.
*
* @throws NoSuchElementException if there are no more elements.
*/
protected final I nextQueryResult() {
cursor = queryIterator.getCursor();
loaded++;
try {
return retrier.callWithRetry(
() -> queryIterator.next(),
(thrown, failures, maxAttempts) -> {
checkNotNull(cursor, "Can't retry because cursor is null. Giving up.");
logger.infofmt(
"Retriable failure while reading item %d/%d - attempt %d/%d: %s",
loaded, total, failures, maxAttempts, thrown);
queryIterator = getQueryIterator(cursor);
},
DatastoreTimeoutException.class);
} catch (NoSuchElementException e) {
// We expect NoSuchElementException to be thrown, and it isn't an error. Just rethrow.
throw e;
} catch (Throwable e) {
logger.warningfmt(e, "Got an unrecoverable failure while reading item %d/%d.", loaded, total);
throw e;
} finally {
ofy().clearSessionCache();
}
}
/**
* Utility function to start a query from a given nullable cursor.
*
* @param query the query to work on
* @param cursor the location to start from. If null - starts from the beginning.
*/
public static <T> Query<T> startQueryAt(Query<T> query, @Nullable Cursor cursor) {
return (cursor == null) ? query : query.startAt(cursor);
}
}