diff --git a/java/google/registry/backup/DeleteOldCommitLogsAction.java b/java/google/registry/backup/DeleteOldCommitLogsAction.java index 76913de07..e1d250ec6 100644 --- a/java/google/registry/backup/DeleteOldCommitLogsAction.java +++ b/java/google/registry/backup/DeleteOldCommitLogsAction.java @@ -140,9 +140,7 @@ public final class DeleteOldCommitLogsAction implements Runnable { // If it isn't a Key then it should be an EppResource, which we need to // load to emit the revisions. // - // We want to make sure we retry any load individually to reduce the chance of the entire - // shard failing, hence we wrap it in a transactNew. - Object object = ofy().transactNew(() -> ofy().load().key(key).now()); + Object object = ofy().load().key(key).now(); checkNotNull(object, "Received a key to a missing object. key: %s", key); checkState( object instanceof EppResource, diff --git a/java/google/registry/mapreduce/inputs/ChildEntityReader.java b/java/google/registry/mapreduce/inputs/ChildEntityReader.java index 3d4eb6008..2bbab5cf2 100644 --- a/java/google/registry/mapreduce/inputs/ChildEntityReader.java +++ b/java/google/registry/mapreduce/inputs/ChildEntityReader.java @@ -25,7 +25,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.googlecode.objectify.Key; import com.googlecode.objectify.annotation.Entity; -import com.googlecode.objectify.cmd.Query; import google.registry.model.EppResource; import google.registry.model.ImmutableObject; import google.registry.model.index.EppResourceIndex; @@ -33,6 +32,7 @@ import google.registry.model.index.EppResourceIndexBucket; import google.registry.util.FormattingLogger; import java.io.IOException; import java.util.NoSuchElementException; +import javax.annotation.Nullable; /** * Reader that maps over {@link EppResourceIndex} and returns resources that are children of @@ -40,24 +40,21 @@ import java.util.NoSuchElementException; */ class ChildEntityReader extends InputReader { - private static final long serialVersionUID = -7430731417793849164L; + private static final long serialVersionUID = 7481761146349663848L; static final FormattingLogger logger = FormattingLogger.getLoggerForCallerClass(); /** This reader uses an EppResourceEntityReader under the covers to iterate over EPP resources. */ private final EppResourceEntityReader eppResourceEntityReader; - /** The current EPP resource being referenced for child entity queries. */ - private Key currentEppResource; /** The child resource classes to postfilter for. */ private final ImmutableList> childResourceClasses; + /** The index within the list above for the next ofy query. */ private int childResourceClassIndex; - /** An iterator over queries for child entities of EppResources. */ - private transient QueryResultIterator childQueryIterator; - /** A cursor for queries for child entities of EppResources. */ - private Cursor childCursor; + /** A reader used to go over children of the current eppResourceEntity and childResourceClass. */ + @Nullable private ChildReader childReader; public ChildEntityReader( Key bucketKey, @@ -92,28 +89,58 @@ class ChildEntityReader extend * @throws NoSuchElementException if there are no more EPP resources to iterate over. */ I nextChild() throws NoSuchElementException { - try { - while (true) { - if (currentEppResource == null) { - currentEppResource = Key.create(eppResourceEntityReader.next()); + // This code implements a single iteration over a triple-nested loop. It returns the next + // innermost item of that 3-nested loop. The entire loop would look like this: + // + // NOTE: I'm treating eppResourceEntityReader and childReader as if they were iterables for + // brevity, although they aren't - they are Readers + // + // I'm also using the python 'yield' command to show we're returning this item one by one. + // + // for (eppResourceEntity : eppResourceEntityReader) { + // for (childResourceClass : childResourceClasses) { + // for (I child : ChildReader.create(childResourceClass, Key.create(eppResourceEntity)) { + // yield child; // returns the 'child's one by one. + // } + // } + // } + + // First, set all the variables if they aren't set yet. This should only happen on the first + // time in the function. + // + // This can be merged with the calls in the "catch" below to avoid code duplication, but it + // makes the code harder to read. + if (childReader == null) { + childResourceClassIndex = 0; + childReader = + ChildReader.create( + childResourceClasses.get(childResourceClassIndex), + Key.create(eppResourceEntityReader.next())); + } + // Then continue advancing the 3-nested loop until we find a value + while (true) { + try { + // Advance the inner loop and return the next value. + return childReader.next(); + } catch (NoSuchElementException e) { + // If we got here it means the inner loop (childQueryIterator) is done - we need to advance + // the middle loop by one, and then reset the inner loop. + childResourceClassIndex++; + // Check if the middle loop is done as well + if (childResourceClassIndex < childResourceClasses.size()) { + // The middle loop is not done. Reset the inner loop. + childReader = childReader.withType(childResourceClasses.get(childResourceClassIndex)); + } else { + // We're done with the middle loop as well! Advance the outer loop, and reset the middle + // loop and inner loops childResourceClassIndex = 0; - childQueryIterator = null; - } - if (childQueryIterator == null) { - childQueryIterator = childQuery().iterator(); - } - try { - return childQueryIterator.next(); - } catch (NoSuchElementException e) { - childQueryIterator = null; - childResourceClassIndex++; - if (childResourceClassIndex >= childResourceClasses.size()) { - currentEppResource = null; - } + childReader = + ChildReader.create( + childResourceClasses.get(childResourceClassIndex), + Key.create(eppResourceEntityReader.next())); } + // Loop back up the while, to try reading reading a value again } - } finally { - ofy().clearSessionCache(); // Try not to leak memory. } } @@ -132,30 +159,20 @@ class ChildEntityReader extend } } - /** Query for children of the current resource and of the current child class. */ - private Query childQuery() { - @SuppressWarnings("unchecked") - Query query = (Query) ofy().load() - .type(childResourceClasses.get(childResourceClassIndex)) - .ancestor(currentEppResource); - return query; - } - @Override public void beginSlice() { eppResourceEntityReader.beginSlice(); - if (childCursor != null) { - Query query = childQuery().startAt(childCursor); - childQueryIterator = query.iterator(); + if (childReader != null) { + childReader.beginSlice(); } } @Override public void endSlice() { - if (childQueryIterator != null) { - childCursor = childQueryIterator.getCursor(); - } eppResourceEntityReader.endSlice(); + if (childReader != null) { + childReader.endSlice(); + } } @Override @@ -187,4 +204,53 @@ class ChildEntityReader extend public void endShard() throws IOException { eppResourceEntityReader.endShard(); } + + private static class ChildReader extends RetryingInputReader { + + private static final long serialVersionUID = -8443132445119657998L; + + private final Class type; + + private final Key ancestor; + + /** Create a reader that goes over all the children of a given type to the given ancestor. */ + public ChildReader(Class type, Key ancestor) { + this.type = type; + this.ancestor = ancestor; + // This reader isn't initialized by mapreduce, so we need to initialize it ourselves + beginShard(); + beginSlice(); + } + + /** + * Create a reader that goes over all the children of a given type to the given ancestor. + * + *

We need this function in addition to the constructor so that we can create a ChildReader. + */ + public static ChildReader create(Class type, Key ancestor) { + return new ChildReader(type, ancestor); + } + + /** Query for children of the current resource and of the current child class. */ + @Override + public QueryResultIterator getQueryIterator(Cursor cursor) { + return startQueryAt(ofy().load().type(type).ancestor(ancestor), cursor).iterator(); + } + + @Override + public int getTotal() { + return 0; + } + + @Override + public I next() { + return nextQueryResult(); + } + + /** Retruns a new ChildReader of the same ancestor for the given type. */ + public ChildReader withType(Class type) { + return create(type, ancestor); + } + } } diff --git a/java/google/registry/mapreduce/inputs/CommitLogManifestReader.java b/java/google/registry/mapreduce/inputs/CommitLogManifestReader.java index 67a4c5577..eb8d3aaa0 100644 --- a/java/google/registry/mapreduce/inputs/CommitLogManifestReader.java +++ b/java/google/registry/mapreduce/inputs/CommitLogManifestReader.java @@ -14,11 +14,9 @@ package google.registry.mapreduce.inputs; -import static com.google.common.base.Preconditions.checkNotNull; import static google.registry.model.ofy.ObjectifyService.ofy; import com.google.appengine.api.datastore.Cursor; -import com.google.appengine.api.datastore.DatastoreTimeoutException; import com.google.appengine.api.datastore.QueryResultIterator; import com.google.appengine.tools.mapreduce.InputReader; import com.googlecode.objectify.Key; @@ -26,14 +24,13 @@ import com.googlecode.objectify.cmd.Query; import google.registry.model.ofy.CommitLogBucket; import google.registry.model.ofy.CommitLogManifest; import google.registry.util.FormattingLogger; -import google.registry.util.Retrier; -import google.registry.util.SystemSleeper; import java.util.NoSuchElementException; import javax.annotation.Nullable; import org.joda.time.DateTime; /** {@link InputReader} that maps over {@link CommitLogManifest}. */ -class CommitLogManifestReader extends InputReader> { +class CommitLogManifestReader + extends RetryingInputReader, Key> { static final FormattingLogger logger = FormattingLogger.getLoggerForCallerClass(); @@ -45,8 +42,7 @@ class CommitLogManifestReader extends InputReader> { */ private static final long MEMORY_ESTIMATE = 100 * 1024; - private static final Retrier retrier = new Retrier(new SystemSleeper(), 3); - private static final long serialVersionUID = 2553537421598284748L; + private static final long serialVersionUID = 6215490573108252100L; private final Key bucketKey; @@ -58,55 +54,19 @@ class CommitLogManifestReader extends InputReader> { @Nullable private final DateTime olderThan; - private Cursor cursor; - private int total; - private int loaded; - - private transient QueryResultIterator> queryIterator; - CommitLogManifestReader(Key bucketKey, @Nullable DateTime olderThan) { this.bucketKey = bucketKey; this.olderThan = olderThan; } - /** Called once at start. Cache the expected size. */ @Override - public void beginShard() { - total = query().count(); + public QueryResultIterator> getQueryIterator(@Nullable Cursor cursor) { + return startQueryAt(query(), cursor).keys().iterator(); } - /** Called every time we are deserialized. Create a new query or resume an existing one. */ @Override - public void beginSlice() { - Query query = query(); - if (cursor != null) { - // The underlying query is strongly consistent, and according to the documentation at - // https://cloud.google.com/appengine/docs/java/datastore/queries#Java_Data_consistency - // "strongly consistent queries are always transactionally consistent". However, each time - // we restart the query at a cursor we have a new effective query, and "if the results for a - // query change between uses of a cursor, the query notices only changes that occur in - // results after the cursor. If a new result appears before the cursor's position for the - // query, it will not be returned when the results after the cursor are fetched." - // What this means in practice is that entities that are created after the initial query - // begins may or may not be seen by this reader, depending on whether the query was - // paused and restarted with a cursor before it would have reached the new entity. - query = query.startAt(cursor); - } - queryIterator = query.keys().iterator(); - } - - /** Called occasionally alongside {@link #next}. */ - @Override - public Double getProgress() { - // Cap progress at 1.0, since the query's count() can increase during the run of the mapreduce - // if more entities are written, but we've cached the value once in "total". - return Math.min(1.0, ((double) loaded) / total); - } - - /** Called before we are serialized. Save a serializable cursor for this query. */ - @Override - public void endSlice() { - cursor = queryIterator.getCursor(); + public int getTotal() { + return query().count(); } /** Query for children of this bucket. */ @@ -133,19 +93,6 @@ class CommitLogManifestReader extends InputReader> { */ @Override public Key next() { - loaded++; - final Cursor currentCursor = queryIterator.getCursor(); - try { - return retrier.callWithRetry( - () -> queryIterator.next(), - (thrown, failures, maxAttempts) -> { - checkNotNull(currentCursor, "Can't retry because cursor is null. Giving up."); - queryIterator = query().startAt(currentCursor).keys().iterator(); - }, - DatastoreTimeoutException.class); - } finally { - ofy().clearSessionCache(); // Try not to leak memory. - } + return nextQueryResult(); } } - diff --git a/java/google/registry/mapreduce/inputs/EppResourceBaseReader.java b/java/google/registry/mapreduce/inputs/EppResourceBaseReader.java index 9948e19b1..2b843f16f 100644 --- a/java/google/registry/mapreduce/inputs/EppResourceBaseReader.java +++ b/java/google/registry/mapreduce/inputs/EppResourceBaseReader.java @@ -27,17 +27,17 @@ import google.registry.model.EppResource; import google.registry.model.index.EppResourceIndex; import google.registry.model.index.EppResourceIndexBucket; import google.registry.util.FormattingLogger; -import java.util.NoSuchElementException; +import javax.annotation.Nullable; /** Base class for {@link InputReader} classes that map over {@link EppResourceIndex}. */ -abstract class EppResourceBaseReader extends InputReader { +abstract class EppResourceBaseReader extends RetryingInputReader { static final FormattingLogger logger = FormattingLogger.getLoggerForCallerClass(); /** Number of bytes in 1MB of memory, used for memory estimates. */ static final long ONE_MB = 1024 * 1024; - private static final long serialVersionUID = -2970253037856017147L; + private static final long serialVersionUID = 7942584269402339168L; /** * The resource kinds to filter for. @@ -51,12 +51,6 @@ abstract class EppResourceBaseReader extends InputReader { private final Key bucketKey; private final long memoryEstimate; - private Cursor cursor; - private int total; - private int loaded; - - private transient QueryResultIterator queryIterator; - EppResourceBaseReader( Key bucketKey, long memoryEstimate, @@ -66,44 +60,14 @@ abstract class EppResourceBaseReader extends InputReader { this.filterKinds = filterKinds; } - /** Called once at start. Cache the expected size. */ @Override - public void beginShard() { - total = query().count(); + public QueryResultIterator getQueryIterator(@Nullable Cursor cursor) { + return startQueryAt(query(), cursor).iterator(); } - /** Called every time we are deserialized. Create a new query or resume an existing one. */ @Override - public void beginSlice() { - Query query = query(); - if (cursor != null) { - // The underlying query is strongly consistent, and according to the documentation at - // https://cloud.google.com/appengine/docs/java/datastore/queries#Java_Data_consistency - // "strongly consistent queries are always transactionally consistent". However, each time - // we restart the query at a cursor we have a new effective query, and "if the results for a - // query change between uses of a cursor, the query notices only changes that occur in - // results after the cursor. If a new result appears before the cursor's position for the - // query, it will not be returned when the results after the cursor are fetched." - // What this means in practice is that entities that are created after the initial query - // begins may or may not be seen by this reader, depending on whether the query was - // paused and restarted with a cursor before it would have reached the new entity. - query = query.startAt(cursor); - } - queryIterator = query.iterator(); - } - - /** Called occasionally alongside {@link #next}. */ - @Override - public Double getProgress() { - // Cap progress at 1.0, since the query's count() can increase during the run of the mapreduce - // if more entities are written, but we've cached the value once in "total". - return Math.min(1.0, ((double) loaded) / total); - } - - /** Called before we are serialized. Save a serializable cursor for this query. */ - @Override - public void endSlice() { - cursor = queryIterator.getCursor(); + public int getTotal() { + return query().count(); } /** Query for children of this bucket. */ @@ -118,20 +82,6 @@ abstract class EppResourceBaseReader extends InputReader { return memoryEstimate; } - /** - * Get the next {@link EppResourceIndex} from the query. - * - * @throws NoSuchElementException if there are no more elements. - */ - EppResourceIndex nextEri() { - loaded++; - try { - return queryIterator.next(); - } finally { - ofy().clearSessionCache(); // Try not to leak memory. - } - } - static ImmutableSet varargsToKinds( ImmutableSet> resourceClasses) { // Ignore EppResource when finding kinds, since it doesn't have one and doesn't imply filtering. diff --git a/java/google/registry/mapreduce/inputs/EppResourceEntityReader.java b/java/google/registry/mapreduce/inputs/EppResourceEntityReader.java index fdb577c47..52d7bdf76 100644 --- a/java/google/registry/mapreduce/inputs/EppResourceEntityReader.java +++ b/java/google/registry/mapreduce/inputs/EppResourceEntityReader.java @@ -56,9 +56,9 @@ class EppResourceEntityReader extends EppResourceBaseRead */ @Override public R next() throws NoSuchElementException { - // Loop until we find a value, or nextEri() throws a NoSuchElementException. + // Loop until we find a value, or nextQueryResult() throws a NoSuchElementException. while (true) { - Key key = nextEri().getKey(); + Key key = nextQueryResult().getKey(); EppResource resource = ofy().load().key(key).now(); if (resource == null) { logger.severefmt("EppResourceIndex key %s points at a missing resource", key); diff --git a/java/google/registry/mapreduce/inputs/EppResourceIndexReader.java b/java/google/registry/mapreduce/inputs/EppResourceIndexReader.java index c37f0542b..618214174 100644 --- a/java/google/registry/mapreduce/inputs/EppResourceIndexReader.java +++ b/java/google/registry/mapreduce/inputs/EppResourceIndexReader.java @@ -40,6 +40,6 @@ class EppResourceIndexReader extends EppResourceBaseReader { */ @Override public EppResourceIndex next() throws NoSuchElementException { - return nextEri(); + return nextQueryResult(); } } diff --git a/java/google/registry/mapreduce/inputs/EppResourceKeyReader.java b/java/google/registry/mapreduce/inputs/EppResourceKeyReader.java index afc1d58b7..d06e4ccdf 100644 --- a/java/google/registry/mapreduce/inputs/EppResourceKeyReader.java +++ b/java/google/registry/mapreduce/inputs/EppResourceKeyReader.java @@ -49,6 +49,6 @@ class EppResourceKeyReader extends EppResourceBaseReader< @SuppressWarnings("unchecked") public Key next() throws NoSuchElementException { // This is a safe cast because we filtered on kind inside the query. - return (Key) nextEri().getKey(); + return (Key) nextQueryResult().getKey(); } } diff --git a/java/google/registry/mapreduce/inputs/RetryingInputReader.java b/java/google/registry/mapreduce/inputs/RetryingInputReader.java new file mode 100644 index 000000000..8812295d1 --- /dev/null +++ b/java/google/registry/mapreduce/inputs/RetryingInputReader.java @@ -0,0 +1,165 @@ +// Copyright 2018 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.mapreduce.inputs; + +import static com.google.common.base.Preconditions.checkNotNull; +import static google.registry.model.ofy.ObjectifyService.ofy; + +import com.google.appengine.api.datastore.Cursor; +import com.google.appengine.api.datastore.DatastoreTimeoutException; +import com.google.appengine.api.datastore.QueryResultIterator; +import com.google.appengine.tools.mapreduce.InputReader; +import com.googlecode.objectify.cmd.Query; +import google.registry.util.FormattingLogger; +import google.registry.util.Retrier; +import google.registry.util.SystemSleeper; +import java.util.NoSuchElementException; +import javax.annotation.Nullable; + +/** + * A reader over objectify query that retries reads on failure. + * + *

When doing a mapreduce over a large number of elements from Datastore, the random + * DatastoreTimeoutExceptions that happen sometimes can eventually add up and cause the entire + * mapreduce to fail. + * + *

This base RetryingInputReader will automatically retry any DatastoreTimeoutException to + * minimize the failures. + * + *

I is the internal Objectify read type, while T is the InputReader return type. + */ +abstract class RetryingInputReader extends InputReader { + + private static final long serialVersionUID = -4897677478541818899L; + private static final Retrier retrier = new Retrier(new SystemSleeper(), 5); + private static final FormattingLogger logger = FormattingLogger.getLoggerForCallerClass(); + + @Nullable private Cursor cursor; + private int total; + private int loaded; + + private transient QueryResultIterator queryIterator; + + /** + * Return the iterator over Query results, starting at the cursor location. + * + *

Must always return an iterator over the same query. + * + *

The underlying {@link Query} must have an ancestor filter, so that it is strongly + * consistent. According to the documentation at + * https://cloud.google.com/appengine/docs/java/datastore/queries#Java_Data_consistency + * + *

"strongly consistent queries are always transactionally consistent". However, each time we + * restart the query at a cursor we have a new effective query, and "if the results for a query + * change between uses of a cursor, the query notices only changes that occur in results after the + * cursor. If a new result appears before the cursor's position for the query, it will not be + * returned when the results after the cursor are fetched." + * + *

What this means in practice is that entities that are created after the initial query begins + * may or may not be seen by this reader, depending on whether the query was paused and restarted + * with a cursor before it would have reached the new entity. + * + * @param cursor the initial location for the iterator to start from. If null - start from + * beginning. + */ + public abstract QueryResultIterator getQueryIterator(@Nullable Cursor cursor); + + /** + * Return the total number of elements the iterator goes over. + * + *

The results are cached - this function will only be called once on the start of the shard, + * or when the iterator is reset. + * + *

The results are only used for debugging / progress display. It is safe to return 0. + */ + public abstract int getTotal(); + + /** + * Return the next item of this InputReader. + * + *

You probably want to use {@link #nextQueryResult} internally when preparing the next item. + * It is OK to call {@link #nextQueryResult} multiple times. + */ + @Override + public abstract T next(); + + /** Called once at start. Cache the expected size. */ + @Override + public void beginShard() { + total = getTotal(); + } + + /** Called every time we are deserialized. Create a new query or resume an existing one. */ + @Override + public void beginSlice() { + queryIterator = getQueryIterator(cursor); + } + + /** Called occasionally alongside {@link #next}. */ + @Override + public Double getProgress() { + // Cap progress at 1.0, since the query's count() can increase during the run of the mapreduce + // if more entities are written, but we've cached the value once in "total". + return Math.min(1.0, ((double) loaded) / Math.max(1, total)); + } + + /** Called before we are serialized. Save a serializable cursor for this query. */ + @Override + public void endSlice() { + cursor = queryIterator.getCursor(); + } + + /** + * Get the next item from the query results. + * + *

Use this to create the next() function. + * + * @throws NoSuchElementException if there are no more elements. + */ + protected final I nextQueryResult() { + cursor = queryIterator.getCursor(); + loaded++; + try { + return retrier.callWithRetry( + () -> queryIterator.next(), + (thrown, failures, maxAttempts) -> { + checkNotNull(cursor, "Can't retry because cursor is null. Giving up."); + logger.infofmt( + "Retriable failure while reading item %d/%d - attempt %d/%d: %s", + loaded, total, failures, maxAttempts, thrown); + queryIterator = getQueryIterator(cursor); + }, + DatastoreTimeoutException.class); + } catch (NoSuchElementException e) { + // We expect NoSuchElementException to be thrown, and it isn't an error. Just rethrow. + throw e; + } catch (Throwable e) { + logger.warningfmt(e, "Got an unrecoverable failure while reading item %d/%d.", loaded, total); + throw e; + } finally { + ofy().clearSessionCache(); + } + } + + /** + * Utility function to start a query from a given nullable cursor. + * + * @param query the query to work on + * @param cursor the location to start from. If null - starts from the beginning. + */ + public static Query startQueryAt(Query query, @Nullable Cursor cursor) { + return (cursor == null) ? query : query.startAt(cursor); + } +}