diff --git a/java/com/google/domain/registry/mapreduce/ChunkingKeyInput.java b/java/com/google/domain/registry/mapreduce/ChunkingKeyInput.java deleted file mode 100644 index f638848fd..000000000 --- a/java/com/google/domain/registry/mapreduce/ChunkingKeyInput.java +++ /dev/null @@ -1,113 +0,0 @@ -// Copyright 2016 Google Inc. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.google.domain.registry.mapreduce; - -import com.google.appengine.api.datastore.Key; -import com.google.appengine.tools.mapreduce.Input; -import com.google.appengine.tools.mapreduce.InputReader; -import com.google.common.collect.ImmutableList; - -import java.io.IOException; -import java.util.List; -import java.util.NoSuchElementException; - -/** A MapReduce {@link Input} adapter that chunks an input of keys into sublists of keys. */ -public class ChunkingKeyInput extends Input> { - - private static final long serialVersionUID = 1670202385246824694L; - - private final Input input; - private final int chunkSize; - - public ChunkingKeyInput(Input input, int chunkSize) { - this.input = input; - this.chunkSize = chunkSize; - } - - /** - * An input reader that wraps around another input reader and returns its contents in chunks of - * a given size. - */ - private static class ChunkingKeyInputReader extends InputReader> { - - private static final long serialVersionUID = 53502324675703263L; - - private final InputReader reader; - private final int chunkSize; - - ChunkingKeyInputReader(InputReader reader, int chunkSize) { - this.reader = reader; - this.chunkSize = chunkSize; - } - - @Override - public List next() throws IOException { - ImmutableList.Builder chunk = new ImmutableList.Builder<>(); - try { - for (int i = 0; i < chunkSize; i++) { - chunk.add(reader.next()); - } - } catch (NoSuchElementException e) { - // Amazingly this is the recommended (and only) way to test for hasNext(). - } - ImmutableList builtChunk = chunk.build(); - if (builtChunk.isEmpty()) { - throw new NoSuchElementException(); // Maintain the contract. - } - return builtChunk; - } - - @Override - public Double getProgress() { - return reader.getProgress(); - } - - @Override - public void beginShard() throws IOException { - reader.beginShard(); - } - - @Override - public void beginSlice() throws IOException { - reader.beginSlice(); - } - - @Override - public void endSlice() throws IOException { - reader.endSlice(); - } - - @Override - public void endShard() throws IOException { - reader.endShard(); - } - - @Override - public long estimateMemoryRequirement() { - // The reader's memory requirement plus the memory for this chunk's worth of buffered keys. - // 256 comes from DatastoreKeyInputReader.AVERAGE_KEY_SIZE. - return reader.estimateMemoryRequirement() + chunkSize * 256; - } - } - - @Override - public List>> createReaders() throws IOException { - ImmutableList.Builder>> readers = new ImmutableList.Builder<>(); - for (InputReader reader : input.createReaders()) { - readers.add(new ChunkingKeyInputReader(reader, chunkSize)); - } - return readers.build(); - } -} diff --git a/java/com/google/domain/registry/mapreduce/ConcatenatingInput.java b/java/com/google/domain/registry/mapreduce/ConcatenatingInput.java deleted file mode 100644 index c0822e0e7..000000000 --- a/java/com/google/domain/registry/mapreduce/ConcatenatingInput.java +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright 2016 Google Inc. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.google.domain.registry.mapreduce; - -import com.google.appengine.tools.mapreduce.Input; -import com.google.appengine.tools.mapreduce.InputReader; -import com.google.appengine.tools.mapreduce.inputs.ConcatenatingInputReader; -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.ListMultimap; - -import java.io.IOException; -import java.util.Collection; -import java.util.List; -import java.util.Set; - -/** - * A MapReduce {@link Input} adapter that joins multiple inputs. - * - * @param input type - */ -public class ConcatenatingInput extends Input { - - private static final long serialVersionUID = 1225981408139437077L; - - private final Set> inputs; - private final int numShards; - - public ConcatenatingInput(Iterable> inputs, int numShards) { - this.inputs = ImmutableSet.copyOf(inputs); - this.numShards = numShards; - } - - @Override - public List> createReaders() throws IOException { - ListMultimap> shards = ArrayListMultimap.create(); - int i = 0; - for (Input input : inputs) { - for (InputReader reader : input.createReaders()) { - // Covariant cast is safe because an InputReader only outputs I and never consumes it. - @SuppressWarnings("unchecked") - InputReader typedReader = (InputReader) reader; - shards.put(i % numShards, typedReader); - i++; - } - } - ImmutableList.Builder> concatenatingReaders = new ImmutableList.Builder<>(); - for (Collection> shard : shards.asMap().values()) { - concatenatingReaders.add(new ConcatenatingInputReader<>(ImmutableList.copyOf(shard))); - } - return concatenatingReaders.build(); - } -} diff --git a/java/com/google/domain/registry/mapreduce/EppResourceInputs.java b/java/com/google/domain/registry/mapreduce/EppResourceInputs.java deleted file mode 100644 index 90af7122b..000000000 --- a/java/com/google/domain/registry/mapreduce/EppResourceInputs.java +++ /dev/null @@ -1,396 +0,0 @@ -// Copyright 2016 Google Inc. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.google.domain.registry.mapreduce; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Predicates.not; -import static com.google.common.collect.Iterables.all; -import static com.google.common.collect.Lists.asList; -import static com.google.domain.registry.model.EntityClasses.CLASS_TO_KIND_FUNCTION; -import static com.google.domain.registry.model.ofy.ObjectifyService.ofy; -import static com.google.domain.registry.util.CollectionUtils.difference; -import static com.google.domain.registry.util.TypeUtils.hasAnnotation; - -import com.google.appengine.api.datastore.Cursor; -import com.google.appengine.api.datastore.QueryResultIterator; -import com.google.appengine.tools.mapreduce.Input; -import com.google.appengine.tools.mapreduce.InputReader; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.domain.registry.model.EppResource; -import com.google.domain.registry.model.index.EppResourceIndex; -import com.google.domain.registry.model.index.EppResourceIndexBucket; -import com.google.domain.registry.util.FormattingLogger; - -import com.googlecode.objectify.Key; -import com.googlecode.objectify.Ref; -import com.googlecode.objectify.annotation.EntitySubclass; -import com.googlecode.objectify.cmd.Query; - -import java.util.List; -import java.util.NoSuchElementException; - -/** - * Mapreduce {@link Input} types (and related helpers) for {@link EppResource} keys and objects. - * - *

The inputs provided by this class are not deletion-aware and do not project the resources - * forward in time. That is the responsibility of mappers that use these inputs. - */ -public class EppResourceInputs { - - private static final FormattingLogger logger = FormattingLogger.getLoggerForCallerClass(); - - /** Number of bytes in 1MB of memory, used for memory estimates. */ - private static final long ONE_MB = 1024 * 1024; - - /** Returns a MapReduce {@link Input} that loads all {@link EppResourceIndex} objects. */ - public static Input createIndexInput() { - return new IndexInput(); - } - - /** - * Returns a MapReduce {@link Input} that loads all {@link EppResource} objects of a given type, - * including deleted resources. - * - *

Note: Do not concatenate multiple EntityInputs together (this is inefficient as it iterates - * through all buckets multiple times). Specify the types in a single input, or load all types by - * specifying {@link EppResource} as the class. - */ - @SafeVarargs - public static Input createEntityInput( - Class resourceClass, - Class... moreResourceClasses) { - return new EntityInput(ImmutableSet.copyOf(asList(resourceClass, moreResourceClasses))); - } - - /** - * Returns a MapReduce {@link Input} that loads keys to all {@link EppResource} objects of a given - * type, including deleted resources. - * - *

Note: Do not concatenate multiple KeyInputs together (this is inefficient as it iterates - * through all buckets multiple times). Specify the types in a single input, or load all types by - * specifying {@link EppResource} as the class. - */ - @SafeVarargs - public static Input> createKeyInput( - Class resourceClass, - Class... moreResourceClasses) { - ImmutableSet> resourceClasses = - ImmutableSet.copyOf(asList(resourceClass, moreResourceClasses)); - checkArgument( - all(resourceClasses, not(hasAnnotation(EntitySubclass.class))), - "Mapping over keys requires a non-polymorphic Entity"); - return new KeyInput<>(resourceClasses); - } - - /** Base class for {@link Input} classes that map over {@link EppResourceIndex}. */ - private abstract static class BaseInput extends Input { - - private static final long serialVersionUID = -6681886718929462122L; - - @Override - public List> createReaders() { - ImmutableList.Builder> readers = new ImmutableList.Builder<>(); - for (Key bucketKey : EppResourceIndexBucket.getAllBuckets()) { - readers.add(bucketToReader(bucketKey)); - } - return readers.build(); - } - - /** Creates a reader that returns the resources under a bucket. */ - protected abstract InputReader bucketToReader(Key bucketKey); - } - - /** - * A MapReduce {@link Input} that loads all {@link EppResourceIndex} entities. - */ - private static class IndexInput extends BaseInput { - - private static final long serialVersionUID = -1231269296567279059L; - - @Override - protected InputReader bucketToReader(Key bucketKey) { - return new IndexReader(bucketKey); - } - } - - /** A MapReduce {@link Input} that loads all {@link EppResource} objects of a given type. */ - private static class EntityInput extends BaseInput { - - private static final long serialVersionUID = 8162607479124406226L; - - private final ImmutableSet> resourceClasses; - - public EntityInput(ImmutableSet> resourceClasses) { - this.resourceClasses = resourceClasses; - checkResourceClassesForInheritance(resourceClasses); - } - - @Override - protected InputReader bucketToReader(Key bucketKey) { - return new EntityReader(bucketKey, resourceClasses); - } - } - - /** - * A MapReduce {@link Input} that loads keys to all {@link EppResource} objects of a given type. - * - *

When mapping over keys we can't distinguish between Objectify polymorphic types. - */ - private static class KeyInput extends BaseInput> { - - private static final long serialVersionUID = -5426821384707653743L; - - private final ImmutableSet> resourceClasses; - - public KeyInput(ImmutableSet> resourceClasses) { - this.resourceClasses = resourceClasses; - checkResourceClassesForInheritance(resourceClasses); - } - - @Override - protected InputReader> bucketToReader(Key bucketKey) { - return new KeyReader<>(bucketKey, resourceClasses); - } - } - - /** Base class for {@link InputReader} classes that map over {@link EppResourceIndex}. */ - private abstract static class BaseReader extends InputReader { - - private static final long serialVersionUID = -2970253037856017147L; - - /** - * The resource kinds to filter for. - * - *

This can be empty, or any of {"ContactResource", "HostResource", "DomainBase"}. It will - * never contain "EppResource", "DomainResource" or "DomainApplication" since these aren't - * actual kinds in Datastore. - */ - private final ImmutableSet filterKinds; - - private final Key bucketKey; - private final long memoryEstimate; - - private Cursor cursor; - private int total; - private int loaded; - - private transient QueryResultIterator queryIterator; - - BaseReader( - Key - bucketKey, - long memoryEstimate, - ImmutableSet filterKinds) { - this.bucketKey = bucketKey; - this.memoryEstimate = memoryEstimate; - this.filterKinds = filterKinds; - } - - /** Called once at start. Cache the expected size. */ - @Override - public void beginShard() { - total = query().count(); - } - - /** Called every time we are deserialized. Create a new query or resume an existing one. */ - @Override - public void beginSlice() { - Query query = query(); - if (cursor != null) { - // The underlying query is strongly consistent, and according to the documentation at - // https://cloud.google.com/appengine/docs/java/datastore/queries#Java_Data_consistency - // "strongly consistent queries are always transactionally consistent". However, each time - // we restart the query at a cursor we have a new effective query, and "if the results for a - // query change between uses of a cursor, the query notices only changes that occur in - // results after the cursor. If a new result appears before the cursor's position for the - // query, it will not be returned when the results after the cursor are fetched." - // What this means in practice is that entities that are created after the initial query - // begins may or may not be seen by this reader, depending on whether the query was - // paused and restarted with a cursor before it would have reached the new entity. - query = query.startAt(cursor); - } - queryIterator = query.iterator(); - } - - /** Called occasionally alongside {@link #next}. */ - @Override - public Double getProgress() { - // Cap progress at 1.0, since the query's count() can increase during the run of the mapreduce - // if more entities are written, but we've cached the value once in "total". - return Math.min(1.0, ((double) loaded) / total); - } - - /** Called before we are serialized. Save a serializable cursor for this query. */ - @Override - public void endSlice() { - cursor = queryIterator.getCursor(); - } - - /** Query for children of this bucket. */ - Query query() { - Query query = ofy().load().type(EppResourceIndex.class).ancestor(bucketKey); - return filterKinds.isEmpty() ? query : query.filter("kind in", filterKinds); - } - - /** Returns the estimated memory that will be used by this reader in bytes. */ - @Override - public long estimateMemoryRequirement() { - return memoryEstimate; - } - - /** - * Get the next {@link EppResourceIndex} from the query. - * - * @throws NoSuchElementException if there are no more elements. - */ - EppResourceIndex nextEri() { - loaded++; - try { - return queryIterator.next(); - } finally { - ofy().clearSessionCache(); // Try not to leak memory. - } - } - } - - /** Reader that maps over {@link EppResourceIndex} and returns the index objects themselves. */ - private static class IndexReader extends BaseReader { - - private static final long serialVersionUID = -4816383426796766911L; - - public IndexReader(Key bucketKey) { - // Estimate 1MB of memory for this reader, which is massive overkill. - // Use an empty set for the filter kinds, which disables filtering. - super(bucketKey, ONE_MB, ImmutableSet.of()); - } - - /** - * Called for each map invocation. - * - * @throws NoSuchElementException if there are no more elements, as specified in the - * {@link InputReader#next} Javadoc. - */ - @Override - public EppResourceIndex next() throws NoSuchElementException { - return nextEri(); - } - } - - /** - * Reader that maps over {@link EppResourceIndex} and returns resource keys. - * - *

When mapping over keys we can't distinguish between Objectify polymorphic types. - */ - private static class KeyReader extends BaseReader> { - - private static final long serialVersionUID = -428232054739189774L; - - public KeyReader( - Key bucketKey, ImmutableSet> resourceClasses) { - super( - bucketKey, - ONE_MB, // Estimate 1MB of memory for this reader, which is massive overkill. - varargsToKinds(resourceClasses)); - } - - /** - * Called for each map invocation. - * - * @throws NoSuchElementException if there are no more elements, as specified in the - * {@link InputReader#next} Javadoc. - */ - @Override - @SuppressWarnings("unchecked") - public Key next() throws NoSuchElementException { - // This is a safe cast because we filtered on kind inside the query. - return (Key) nextEri().getReference().getKey(); - } - } - - /** Reader that maps over {@link EppResourceIndex} and returns resources. */ - private static class EntityReader extends BaseReader { - - private static final long serialVersionUID = -8042933349899971801L; - - /** - * The resource classes to postfilter for. - * - *

This can be {@link EppResource} or any descendant classes, regardless of whether those - * classes map directly to a kind in datastore, with the restriction that none of the classes - * is a supertype of any of the others. - */ - private final ImmutableSet> resourceClasses; - - public EntityReader( - Key bucketKey, - ImmutableSet> resourceClasses) { - super( - bucketKey, - ONE_MB * 2, // Estimate 2MB of memory for this reader, since it loads a (max 1MB) entity. - varargsToKinds(resourceClasses)); - this.resourceClasses = resourceClasses; - } - - /** - * Called for each map invocation. - * - * @throws NoSuchElementException if there are no more elements, as specified in the - * {@link InputReader#next} Javadoc. - */ - @Override - public R next() throws NoSuchElementException { - // Loop until we find a value, or nextRef() throws a NoSuchElementException. - while (true) { - Ref reference = nextEri().getReference(); - EppResource resource = reference.get(); - if (resource == null) { - logger.severefmt("Broken ERI reference: %s", reference.getKey()); - continue; - } - // Postfilter to distinguish polymorphic types (e.g. DomainBase and DomainResource). - for (Class resourceClass : resourceClasses) { - if (resourceClass.isAssignableFrom(resource.getClass())) { - @SuppressWarnings("unchecked") - R r = (R) resource; - return r; - } - } - } - } - } - - private static ImmutableSet varargsToKinds( - ImmutableSet> resourceClasses) { - // Ignore EppResource when finding kinds, since it doesn't have one and doesn't imply filtering. - return resourceClasses.contains(EppResource.class) - ? ImmutableSet.of() - : FluentIterable.from(resourceClasses).transform(CLASS_TO_KIND_FUNCTION).toSet(); - } - - private static void checkResourceClassesForInheritance( - ImmutableSet> resourceClasses) { - for (Class resourceClass : resourceClasses) { - for (Class potentialSuperclass : difference(resourceClasses, resourceClass)) { - checkArgument( - !potentialSuperclass.isAssignableFrom(resourceClass), - "Cannot specify resource classes with inheritance relationship: %s extends %s", - resourceClass, - potentialSuperclass); - } - } - } -} diff --git a/java/com/google/domain/registry/mapreduce/NullInput.java b/java/com/google/domain/registry/mapreduce/NullInput.java deleted file mode 100644 index b0788053b..000000000 --- a/java/com/google/domain/registry/mapreduce/NullInput.java +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright 2016 Google Inc. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.google.domain.registry.mapreduce; - -import com.google.appengine.tools.mapreduce.Input; -import com.google.appengine.tools.mapreduce.InputReader; -import com.google.common.collect.ImmutableList; - -import java.util.List; -import java.util.NoSuchElementException; - -/** An input that returns a single {@code null} value. */ -public class NullInput extends Input { - - private static final long serialVersionUID = 1816836937031979851L; - - private static final class NullReader extends InputReader { - - private static final long serialVersionUID = -8176201363578913125L; - - boolean read = false; - - @Override - public T next() throws NoSuchElementException { - if (read) { - throw new NoSuchElementException(); - } - read = true; - return null; - } - - @Override - public Double getProgress() { - return read ? 1.0 : 0.0; - } - } - - @Override - public List> createReaders() { - return ImmutableList.of(new NullReader()); - } -}