diff --git a/java/google/registry/mapreduce/inputs/CommitLogManifestInput.java b/java/google/registry/mapreduce/inputs/CommitLogManifestInput.java new file mode 100644 index 000000000..449ccdfc0 --- /dev/null +++ b/java/google/registry/mapreduce/inputs/CommitLogManifestInput.java @@ -0,0 +1,55 @@ +// Copyright 2017 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.mapreduce.inputs; + +import com.google.appengine.tools.mapreduce.Input; +import com.google.appengine.tools.mapreduce.InputReader; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import com.googlecode.objectify.Key; +import google.registry.model.ofy.CommitLogBucket; +import google.registry.model.ofy.CommitLogManifest; +import java.util.List; +import org.joda.time.DateTime; + +/** Base class for {@link Input} classes that map over {@link CommitLogManifest}. */ +public class CommitLogManifestInput extends Input { + + private static final long serialVersionUID = 2043552272352286428L; + + /** + * Cutoff date for result. + * + * If present, all resulting CommitLogManifest will be dated prior to this date. + */ + private final Optional olderThan; + + public CommitLogManifestInput(Optional olderThan) { + this.olderThan = olderThan; + } + + @Override + public List> createReaders() { + ImmutableList.Builder> readers = new ImmutableList.Builder<>(); + for (Key bucketKey : CommitLogBucket.getAllBucketKeys()) { + readers.add(bucketToReader(bucketKey)); + } + return readers.build(); + } + + private InputReader bucketToReader(Key bucketKey) { + return new CommitLogManifestReader(bucketKey, olderThan); + } +} diff --git a/java/google/registry/mapreduce/inputs/CommitLogManifestReader.java b/java/google/registry/mapreduce/inputs/CommitLogManifestReader.java new file mode 100644 index 000000000..b73cba06c --- /dev/null +++ b/java/google/registry/mapreduce/inputs/CommitLogManifestReader.java @@ -0,0 +1,135 @@ +// Copyright 2017 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.mapreduce.inputs; + +import static google.registry.model.ofy.ObjectifyService.ofy; + +import com.google.appengine.api.datastore.Cursor; +import com.google.appengine.api.datastore.QueryResultIterator; +import com.google.appengine.tools.mapreduce.InputReader; +import com.google.common.base.Optional; +import com.googlecode.objectify.Key; +import com.googlecode.objectify.cmd.Query; +import google.registry.model.ofy.CommitLogBucket; +import google.registry.model.ofy.CommitLogManifest; +import java.util.NoSuchElementException; +import org.joda.time.DateTime; + +/** {@link InputReader} that maps over {@link CommitLogManifest}. */ +class CommitLogManifestReader extends InputReader { + + private static final long serialVersionUID = 5117046535590539778L; + + /** + * Memory estimation for this reader. + * + * Elements are relatively small (parent key, Id, and a set of deleted keys), so this should be + * more than enough. + */ + private static final long MEMORY_ESTIMATE = 100 * 1024; + + private final Key bucketKey; + + /** + * Cutoff date for result. + * + * If present, all resulting CommitLogManifest will be dated prior to this date. + */ + private final Optional olderThan; + + private Cursor cursor; + private int total; + private int loaded; + + private transient QueryResultIterator queryIterator; + + CommitLogManifestReader(Key bucketKey, Optional olderThan) { + this.bucketKey = bucketKey; + this.olderThan = olderThan; + } + + /** Called once at start. Cache the expected size. */ + @Override + public void beginShard() { + total = query().count(); + } + + /** Called every time we are deserialized. Create a new query or resume an existing one. */ + @Override + public void beginSlice() { + Query query = query(); + if (cursor != null) { + // The underlying query is strongly consistent, and according to the documentation at + // https://cloud.google.com/appengine/docs/java/datastore/queries#Java_Data_consistency + // "strongly consistent queries are always transactionally consistent". However, each time + // we restart the query at a cursor we have a new effective query, and "if the results for a + // query change between uses of a cursor, the query notices only changes that occur in + // results after the cursor. If a new result appears before the cursor's position for the + // query, it will not be returned when the results after the cursor are fetched." + // What this means in practice is that entities that are created after the initial query + // begins may or may not be seen by this reader, depending on whether the query was + // paused and restarted with a cursor before it would have reached the new entity. + query = query.startAt(cursor); + } + queryIterator = query.iterator(); + } + + /** Called occasionally alongside {@link #next}. */ + @Override + public Double getProgress() { + // Cap progress at 1.0, since the query's count() can increase during the run of the mapreduce + // if more entities are written, but we've cached the value once in "total". + return Math.min(1.0, ((double) loaded) / total); + } + + /** Called before we are serialized. Save a serializable cursor for this query. */ + @Override + public void endSlice() { + cursor = queryIterator.getCursor(); + } + + /** Query for children of this bucket. */ + Query query() { + Query query = ofy().load().type(CommitLogManifest.class).ancestor(bucketKey); + if (olderThan.isPresent()) { + query = query.filterKey( + "<", + Key.create(bucketKey, CommitLogManifest.class, olderThan.get().getMillis())); + } + return query; + } + + /** Returns the estimated memory that will be used by this reader in bytes. */ + @Override + public long estimateMemoryRequirement() { + return MEMORY_ESTIMATE; + } + + /** + * Get the next {@link CommitLogManifest} from the query. + * + * @throws NoSuchElementException if there are no more elements. + */ + @Override + public CommitLogManifest next() { + loaded++; + try { + return queryIterator.next(); + } finally { + ofy().clearSessionCache(); // Try not to leak memory. + } + } +} + diff --git a/javatests/google/registry/mapreduce/inputs/CommitLogManifestInputTest.java b/javatests/google/registry/mapreduce/inputs/CommitLogManifestInputTest.java new file mode 100644 index 000000000..df16bc5b1 --- /dev/null +++ b/javatests/google/registry/mapreduce/inputs/CommitLogManifestInputTest.java @@ -0,0 +1,133 @@ +// Copyright 2017 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.mapreduce.inputs; + +import static com.google.common.truth.Truth.assertThat; +import static com.google.common.truth.Truth.assert_; + +import com.google.appengine.tools.mapreduce.Input; +import com.google.appengine.tools.mapreduce.InputReader; +import com.google.common.base.Optional; +import com.googlecode.objectify.Key; +import google.registry.model.ofy.CommitLogBucket; +import google.registry.model.ofy.CommitLogManifest; +import google.registry.testing.AppEngineRule; +import google.registry.testing.DatastoreHelper; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Set; +import org.joda.time.DateTime; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for {@link CommitLogManifestInput}. */ +@RunWith(JUnit4.class) +public final class CommitLogManifestInputTest { + + private static final DateTime DATE_TIME_OLD = DateTime.parse("2015-12-19T12:00Z"); + private static final DateTime DATE_TIME_OLD2 = DateTime.parse("2016-12-19T11:59Z"); + + private static final DateTime DATE_TIME_THRESHOLD = DateTime.parse("2016-12-19T12:00Z"); + + private static final DateTime DATE_TIME_NEW = DateTime.parse("2016-12-19T12:01Z"); + private static final DateTime DATE_TIME_NEW2 = DateTime.parse("2017-12-19T12:00Z"); + + @Rule + public final AppEngineRule appEngine = AppEngineRule.builder().withDatastore().build(); + + @Test + public void testInputOlderThan_allFound() throws Exception { + Set created = new HashSet<>(); + for (int i = 1; i <= 3; i++) { + created.add(createManifest(CommitLogBucket.getBucketKey(i), DATE_TIME_OLD)); + } + List seen = new ArrayList<>(); + Input input = new CommitLogManifestInput(Optional.of(DATE_TIME_THRESHOLD)); + for (InputReader reader + : input.createReaders()) { + reader.beginShard(); + reader.beginSlice(); + seen.add(reader.next()); + try { + reader.next(); + assert_().fail("Unexpected element"); + } catch (NoSuchElementException expected) { + } + } + assertThat(seen).containsExactlyElementsIn(created); + } + + @Test + public void testInputOlderThan_skipsNew() throws Exception { + Set old = new HashSet<>(); + for (int i = 1; i <= 3; i++) { + createManifest(CommitLogBucket.getBucketKey(i), DATE_TIME_NEW); + createManifest(CommitLogBucket.getBucketKey(i), DATE_TIME_NEW2); + old.add(createManifest(CommitLogBucket.getBucketKey(i), DATE_TIME_OLD)); + old.add(createManifest(CommitLogBucket.getBucketKey(i), DATE_TIME_OLD2)); + } + List seen = new ArrayList<>(); + Input input = new CommitLogManifestInput(Optional.of(DATE_TIME_THRESHOLD)); + for (InputReader reader + : input.createReaders()) { + reader.beginShard(); + reader.beginSlice(); + try { + for (int i = 0; i < 10; i++) { + seen.add(reader.next()); + } + assert_().fail("Unexpected element"); + } catch (NoSuchElementException expected) { + } + } + assertThat(seen).containsExactlyElementsIn(old); + } + + @Test + public void testInputAll() throws Exception { + Set created = new HashSet<>(); + for (int i = 1; i <= 3; i++) { + created.add(createManifest(CommitLogBucket.getBucketKey(i), DATE_TIME_NEW)); + created.add(createManifest(CommitLogBucket.getBucketKey(i), DATE_TIME_NEW2)); + created.add(createManifest(CommitLogBucket.getBucketKey(i), DATE_TIME_OLD)); + created.add(createManifest(CommitLogBucket.getBucketKey(i), DATE_TIME_OLD2)); + } + List seen = new ArrayList<>(); + Input input = new CommitLogManifestInput(Optional.absent()); + for (InputReader reader + : input.createReaders()) { + reader.beginShard(); + reader.beginSlice(); + try { + for (int i = 0; i < 10; i++) { + seen.add(reader.next()); + } + assert_().fail("Unexpected element"); + } catch (NoSuchElementException expected) { + } + } + assertThat(seen).containsExactlyElementsIn(created); + } + + private static CommitLogManifest createManifest(Key parent, DateTime dateTime) { + CommitLogManifest commitLogManifest = CommitLogManifest.create(parent, dateTime, null); + DatastoreHelper.persistResource(commitLogManifest); + return commitLogManifest; + } +}