diff --git a/core/src/main/java/google/registry/backup/AppEngineEnvironment.java b/core/src/main/java/google/registry/backup/AppEngineEnvironment.java
index 7c955370e..f22bf3621 100644
--- a/core/src/main/java/google/registry/backup/AppEngineEnvironment.java
+++ b/core/src/main/java/google/registry/backup/AppEngineEnvironment.java
@@ -31,7 +31,7 @@ public class AppEngineEnvironment implements Closeable {
private boolean isPlaceHolderNeeded;
- AppEngineEnvironment() {
+ public AppEngineEnvironment() {
isPlaceHolderNeeded = ApiProxy.getCurrentEnvironment() == null;
// isPlaceHolderNeeded may be true when we are invoked in a test with AppEngineRule.
if (isPlaceHolderNeeded) {
diff --git a/core/src/main/java/google/registry/backup/VersionedEntity.java b/core/src/main/java/google/registry/backup/VersionedEntity.java
index 2cbd56150..36570fad4 100644
--- a/core/src/main/java/google/registry/backup/VersionedEntity.java
+++ b/core/src/main/java/google/registry/backup/VersionedEntity.java
@@ -28,11 +28,49 @@ import java.util.stream.Stream;
import javax.annotation.Nullable;
/**
- * A Datastore {@link Entity Entity's} timestamped state.
+ * A Datastore {@link Entity Entity's} serialized state with timestamp. The intended use case is a
+ * multi-stage pipeline where an Entity's Java form is not needed in most stages.
*
- *
For a new or updated Entity, its ProtocolBuffer bytes are stored along with its {@link Key}.
- * For a deleted entity, only its {@link Key} is stored, and the {@link #entityProtoBytes} is left
- * as null.
+ *
For a new or updated Entity, its serialized bytes are stored along with its Datastore {@link
+ * Key}. For a deleted entity, only its Datastore {@link Key} is stored, and the {@link
+ * #entityProtoBytes} field is left unset.
+ *
+ *
Storing raw bytes is motivated by two factors. First, since I/O is frequent and the Java
+ * objects are rarely needed in our target use case, storing raw bytes is the most efficient
+ * approach. More importantly, due to our data model and our customization of {@link
+ * google.registry.model.ofy.ObjectifyService ObjectifyService}, it is challenging to implement a
+ * serializer for Objectify entities that preserves the value of all properties. Without such
+ * serializers, Objectify entities cannot be used in a pipeline.
+ *
+ *
Objectify entities do not implement {@link Serializable}, serialization of such objects is as
+ * follows:
+ *
+ *
+ *
Convert an Objectify entity to a Datastore {@link Entity}: {@code
+ * ofy().save().toEntity(..)}
+ *
Entity is serializable, but the more efficient approach is to convert an Entity to a
+ * ProtocolBuffer ({@link com.google.storage.onestore.v3.OnestoreEntity.EntityProto}) and then
+ * to raw bytes.
+ *
+ *
+ *
When the first conversion above is applied to an Objectify entity, a property value in the
+ * output may differ from the input in two situations:
+ *
+ *
+ *
If a property is of an assign-on-persist data type, e.g., {@link
+ * google.registry.model.UpdateAutoTimestamp}.
+ *
If it is related to CommitLog management, e.g., {@link google.registry.model.EppResource
+ * EppResource.revisions}.
+ *
+ *
+ *
Working around the side effects caused by our customization is difficult. Any solution would
+ * likely rely on Objectify's stack of context. However, many Objectify invocations in our code base
+ * are hardcoded to call the customized version of ObjectifyService, rendering Objectify's stack
+ * useless.
+ *
+ *
For now, this inability to use Objectify entities in pipelines is mostly a testing problem: we
+ * can not perform {@link org.apache.beam.sdk.testing.PAssert BEAM pipeline assertions} on Objectify
+ * entities. {@code InitSqlTestUtils.assertContainsExactlyElementsIn} is an example of a workaround.
*
*
Note that {@link Optional java.util.Optional} is not serializable, therefore cannot be used as
* property type in this class.
diff --git a/core/src/main/java/google/registry/beam/initsql/BackupPaths.java b/core/src/main/java/google/registry/beam/initsql/BackupPaths.java
index 0a4c498a6..52973eb29 100644
--- a/core/src/main/java/google/registry/beam/initsql/BackupPaths.java
+++ b/core/src/main/java/google/registry/beam/initsql/BackupPaths.java
@@ -15,8 +15,11 @@
package google.registry.beam.initsql;
import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Strings.isNullOrEmpty;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Streams;
import org.joda.time.DateTime;
/**
@@ -45,6 +48,21 @@ public final class BackupPaths {
return String.format(EXPORT_PATTERN_TEMPLATE, exportDir, kind, WILDCARD_CHAR);
}
+ /**
+ * Returns an {@link ImmutableList} of regex patterns that match all Datastore export files of the
+ * given {@code kinds}.
+ *
+ * @param exportDir path to the top directory of a Datastore export
+ * @param kinds all entity 'kinds' to be matched
+ */
+ public static ImmutableList getExportFilePatterns(
+ String exportDir, Iterable kinds) {
+ checkNotNull(kinds, "kinds");
+ return Streams.stream(kinds)
+ .map(kind -> getExportFileNamePattern(exportDir, kind))
+ .collect(ImmutableList.toImmutableList());
+ }
+
/**
* Returns the fully qualified path of a Datastore export file with the given {@code kind} and
* {@code shard}.
@@ -60,8 +78,9 @@ public final class BackupPaths {
return String.format(EXPORT_PATTERN_TEMPLATE, exportDir, kind, Integer.toString(shard));
}
- public static String getCommitLogFileNamePattern(String commitLogDir) {
- return String.format(COMMIT_LOG_PATTERN_TEMPLATE, commitLogDir);
+ /** Returns an {@link ImmutableList} of regex patterns that match all CommitLog files. */
+ public static ImmutableList getCommitLogFilePatterns(String commitLogDir) {
+ return ImmutableList.of(String.format(COMMIT_LOG_PATTERN_TEMPLATE, commitLogDir));
}
/** Gets the Commit timestamp from a CommitLog file name. */
diff --git a/core/src/main/java/google/registry/beam/initsql/CommitLogTransforms.java b/core/src/main/java/google/registry/beam/initsql/CommitLogTransforms.java
deleted file mode 100644
index 429722d8a..000000000
--- a/core/src/main/java/google/registry/beam/initsql/CommitLogTransforms.java
+++ /dev/null
@@ -1,109 +0,0 @@
-// Copyright 2020 The Nomulus Authors. All Rights Reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package google.registry.beam.initsql;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static google.registry.beam.initsql.BackupPaths.getCommitLogFileNamePattern;
-import static google.registry.beam.initsql.BackupPaths.getCommitLogTimestamp;
-import static google.registry.beam.initsql.Transforms.processFiles;
-import static google.registry.util.DateTimeUtils.isBeforeOrAt;
-
-import google.registry.backup.CommitLogImports;
-import google.registry.backup.VersionedEntity;
-import java.io.IOException;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.FileIO.ReadableFile;
-import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.DateTime;
-
-/**
- * {@link org.apache.beam.sdk.transforms.PTransform Pipeline transforms} for loading from Nomulus
- * CommitLog files. They are all part of a transformation that loads raw records from a sequence of
- * Datastore CommitLog files, and are broken apart for testing.
- */
-public class CommitLogTransforms {
-
- /**
- * Returns a {@link PTransform transform} that can generate a collection of patterns that match
- * all Datastore CommitLog files.
- */
- public static PTransform> getCommitLogFilePatterns(
- String commitLogDir) {
- return Create.of(getCommitLogFileNamePattern(commitLogDir)).withCoder(StringUtf8Coder.of());
- }
-
- /**
- * Returns files with timestamps between {@code fromTime} (inclusive) and {@code endTime}
- * (exclusive).
- */
- public static PTransform, PCollection>
- filterCommitLogsByTime(DateTime fromTime, DateTime toTime) {
- checkNotNull(fromTime, "fromTime");
- checkNotNull(toTime, "toTime");
- checkArgument(
- fromTime.isBefore(toTime),
- "Invalid time range: fromTime (%s) is before endTime (%s)",
- fromTime,
- toTime);
- return ParDo.of(new FilterCommitLogFileByTime(fromTime, toTime));
- }
-
- /** Returns a {@link PTransform} from file {@link Metadata} to {@link VersionedEntity}. */
- public static PTransform, PCollection>
- loadCommitLogsFromFiles() {
- return processFiles(new LoadOneCommitLogsFile());
- }
-
- static class FilterCommitLogFileByTime extends DoFn {
- private final DateTime fromTime;
- private final DateTime toTime;
-
- public FilterCommitLogFileByTime(DateTime fromTime, DateTime toTime) {
- this.fromTime = fromTime;
- this.toTime = toTime;
- }
-
- @ProcessElement
- public void processElement(@Element String fileName, OutputReceiver out) {
- DateTime timestamp = getCommitLogTimestamp(fileName);
- if (isBeforeOrAt(fromTime, timestamp) && timestamp.isBefore(toTime)) {
- out.output(fileName);
- }
- }
- }
-
- /**
- * Reads a CommitLog file and converts its content into {@link VersionedEntity VersionedEntities}.
- */
- static class LoadOneCommitLogsFile extends DoFn {
-
- @ProcessElement
- public void processElement(@Element ReadableFile file, OutputReceiver out) {
- try {
- CommitLogImports.loadEntities(file.open()).forEach(out::output);
- } catch (IOException e) {
- // Let the pipeline retry the whole file.
- throw new RuntimeException(e);
- }
- }
- }
-}
diff --git a/core/src/main/java/google/registry/beam/initsql/ExportLoadingTransforms.java b/core/src/main/java/google/registry/beam/initsql/ExportLoadingTransforms.java
deleted file mode 100644
index 4f2324c54..000000000
--- a/core/src/main/java/google/registry/beam/initsql/ExportLoadingTransforms.java
+++ /dev/null
@@ -1,82 +0,0 @@
-// Copyright 2020 The Nomulus Authors. All Rights Reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package google.registry.beam.initsql;
-
-import static google.registry.beam.initsql.BackupPaths.getExportFileNamePattern;
-import static google.registry.beam.initsql.Transforms.processFiles;
-
-import com.google.common.collect.ImmutableList;
-import google.registry.backup.VersionedEntity;
-import google.registry.tools.LevelDbLogReader;
-import java.io.IOException;
-import java.util.Collection;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.FileIO.ReadableFile;
-import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-
-/**
- * {@link PTransform Pipeline transforms} for loading from Datastore export files. They are all part
- * of a transformation that loads raw records from a Datastore export, and are broken apart for
- * testing.
- */
-public class ExportLoadingTransforms {
-
- /**
- * Returns a {@link PTransform transform} that can generate a collection of patterns that match
- * all Datastore export files of the given {@code kinds}.
- */
- public static PTransform> getDatastoreExportFilePatterns(
- String exportDir, Collection kinds) {
- return Create.of(
- kinds.stream()
- .map(kind -> getExportFileNamePattern(exportDir, kind))
- .collect(ImmutableList.toImmutableList()))
- .withCoder(StringUtf8Coder.of());
- }
-
- /** Returns a {@link PTransform} from file {@link Metadata} to {@link VersionedEntity}. */
- public static PTransform, PCollection>
- loadExportDataFromFiles() {
- return processFiles(new LoadOneExportShard());
- }
-
- /**
- * Reads a LevelDb file and converts each raw record into a {@link VersionedEntity}. All such
- * entities use {@link Long#MIN_VALUE} as timestamp, so that they go before data from CommitLogs.
- *
- *
LevelDb files are not seekable because a large object may span multiple blocks. If a
- * sequential read fails, the file needs to be retried from the beginning.
- */
- private static class LoadOneExportShard extends DoFn {
-
- private static final long TIMESTAMP = Long.MIN_VALUE;
-
- @ProcessElement
- public void processElement(@Element ReadableFile file, OutputReceiver output) {
- try {
- LevelDbLogReader.from(file.open())
- .forEachRemaining(record -> output.output(VersionedEntity.from(TIMESTAMP, record)));
- } catch (IOException e) {
- // Let the pipeline retry the whole file.
- throw new RuntimeException(e);
- }
- }
- }
-}
diff --git a/core/src/main/java/google/registry/beam/initsql/Transforms.java b/core/src/main/java/google/registry/beam/initsql/Transforms.java
index 8eefc0cef..343e5aa29 100644
--- a/core/src/main/java/google/registry/beam/initsql/Transforms.java
+++ b/core/src/main/java/google/registry/beam/initsql/Transforms.java
@@ -14,22 +14,66 @@
package google.registry.beam.initsql;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static google.registry.beam.initsql.BackupPaths.getCommitLogTimestamp;
+import static google.registry.beam.initsql.BackupPaths.getExportFilePatterns;
+import static google.registry.util.DateTimeUtils.START_OF_TIME;
+import static google.registry.util.DateTimeUtils.isBeforeOrAt;
+
+import avro.shaded.com.google.common.collect.Iterators;
+import com.google.common.annotations.VisibleForTesting;
+import google.registry.backup.CommitLogImports;
import google.registry.backup.VersionedEntity;
+import google.registry.tools.LevelDbLogReader;
+import java.util.Collection;
+import java.util.Iterator;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.FileIO.ReadableFile;
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.ProcessFunction;
+import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.DateTime;
/**
- * Common {@link PTransform pipeline transforms} used in pipelines that load from both Datastore
- * export files and Nomulus CommitLog files.
+ * {@link PTransform Pipeline transforms} used in pipelines that load from both Datastore export
+ * files and Nomulus CommitLog files.
*/
-public class Transforms {
+public final class Transforms {
+
+ private Transforms() {}
+
+ /**
+ * The commitTimestamp assigned to all entities loaded from a Datastore export file. The exact
+ * value does not matter, but it must be lower than the timestamps of real CommitLog records.
+ */
+ @VisibleForTesting static final long EXPORT_ENTITY_TIME_STAMP = START_OF_TIME.getMillis();
+
+ /**
+ * Returns a {@link PTransform transform} that can generate a collection of patterns that match
+ * all Datastore CommitLog files.
+ */
+ public static PTransform> getCommitLogFilePatterns(
+ String commitLogDir) {
+ return toStringPCollection(BackupPaths.getCommitLogFilePatterns(commitLogDir));
+ }
+
+ /**
+ * Returns a {@link PTransform transform} that can generate a collection of patterns that match
+ * all Datastore export files of the given {@code kinds}.
+ */
+ public static PTransform> getDatastoreExportFilePatterns(
+ String exportDir, Collection kinds) {
+ return toStringPCollection(getExportFilePatterns(exportDir, kinds));
+ }
/**
* Returns a {@link PTransform} from file name patterns to file {@link Metadata Metadata records}.
@@ -43,11 +87,46 @@ public class Transforms {
};
}
+ /**
+ * Returns CommitLog files with timestamps between {@code fromTime} (inclusive) and {@code
+ * endTime} (exclusive).
+ */
+ public static PTransform, PCollection>
+ filterCommitLogsByTime(DateTime fromTime, DateTime toTime) {
+ return ParDo.of(new FilterCommitLogFileByTime(fromTime, toTime));
+ }
+
+ /** Returns a {@link PTransform} from file {@link Metadata} to {@link VersionedEntity}. */
+ public static PTransform, PCollection>
+ loadExportDataFromFiles() {
+ return processFiles(
+ new BackupFileReader(
+ file ->
+ Iterators.transform(
+ LevelDbLogReader.from(file.open()),
+ (byte[] bytes) -> VersionedEntity.from(EXPORT_ENTITY_TIME_STAMP, bytes))));
+ }
+
+ /** Returns a {@link PTransform} from file {@link Metadata} to {@link VersionedEntity}. */
+ public static PTransform, PCollection>
+ loadCommitLogsFromFiles() {
+ return processFiles(
+ new BackupFileReader(file -> CommitLogImports.loadEntities(file.open()).iterator()));
+ }
+
+ /**
+ * Returns a {@link PTransform} that produces a {@link PCollection} containing all elements in the
+ * given {@link Iterable}.
+ */
+ static PTransform> toStringPCollection(Iterable strings) {
+ return Create.of(strings).withCoder(StringUtf8Coder.of());
+ }
+
/**
* Returns a {@link PTransform} from file {@link Metadata} to {@link VersionedEntity} using
* caller-provided {@code transformer}.
*/
- public static PTransform, PCollection> processFiles(
+ static PTransform, PCollection> processFiles(
DoFn transformer) {
return new PTransform, PCollection>() {
@Override
@@ -55,7 +134,62 @@ public class Transforms {
return input
.apply(FileIO.readMatches().withCompression(Compression.UNCOMPRESSED))
.apply(transformer.getClass().getSimpleName(), ParDo.of(transformer));
+ // TODO(weiminyu): reshuffle to enable dynamic work rebalance per beam dev guide
}
};
}
+
+ private static class FilterCommitLogFileByTime extends DoFn {
+ private final DateTime fromTime;
+ private final DateTime toTime;
+
+ public FilterCommitLogFileByTime(DateTime fromTime, DateTime toTime) {
+ checkNotNull(fromTime, "fromTime");
+ checkNotNull(toTime, "toTime");
+ checkArgument(
+ fromTime.isBefore(toTime),
+ "Invalid time range: fromTime (%s) is before endTime (%s)",
+ fromTime,
+ toTime);
+ this.fromTime = fromTime;
+ this.toTime = toTime;
+ }
+
+ @ProcessElement
+ public void processElement(@Element String fileName, OutputReceiver out) {
+ DateTime timestamp = getCommitLogTimestamp(fileName);
+ if (isBeforeOrAt(fromTime, timestamp) && timestamp.isBefore(toTime)) {
+ out.output(fileName);
+ }
+ }
+ }
+
+ /**
+ * Reads from a Datastore backup file and converts its content into {@link VersionedEntity
+ * VersionedEntities}.
+ *
+ *
The input file may be either a LevelDb file from a Datastore export or a CommitLog file
+ * generated by the Nomulus server. In either case, the file contains variable-length records and
+ * must be read sequentially from the beginning. If the read fails, the file needs to be retried
+ * from the beginning.
+ */
+ private static class BackupFileReader extends DoFn {
+ private final ProcessFunction> reader;
+
+ private BackupFileReader(ProcessFunction> reader) {
+ this.reader = reader;
+ }
+
+ @ProcessElement
+ public void processElement(@Element ReadableFile file, OutputReceiver out) {
+ try {
+ reader.apply(file).forEachRemaining(out::output);
+ } catch (Exception e) {
+ // Let the pipeline use default retry strategy on the whole file. For GCP Dataflow this
+ // means retrying up to 4 times (may include other files grouped with this one), and failing
+ // the pipeline if no success.
+ throw new RuntimeException(e);
+ }
+ }
+ }
}
diff --git a/core/src/test/java/google/registry/beam/initsql/BackupTestStore.java b/core/src/test/java/google/registry/beam/initsql/BackupTestStore.java
index 2b5dd33e1..be19740bf 100644
--- a/core/src/test/java/google/registry/beam/initsql/BackupTestStore.java
+++ b/core/src/test/java/google/registry/beam/initsql/BackupTestStore.java
@@ -18,15 +18,20 @@ import static com.google.common.base.Preconditions.checkState;
import static google.registry.model.ofy.ObjectifyService.ofy;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
+import com.google.appengine.api.datastore.DatastoreService;
+import com.google.appengine.api.datastore.DatastoreServiceFactory;
import com.google.appengine.api.datastore.Entity;
+import com.google.appengine.api.datastore.EntityNotFoundException;
import com.googlecode.objectify.Key;
import google.registry.backup.CommitLogExports;
+import google.registry.backup.VersionedEntity;
import google.registry.model.ofy.CommitLogCheckpoint;
import google.registry.testing.AppEngineRule;
import google.registry.testing.FakeClock;
import google.registry.tools.LevelDbFileBuilder;
import java.io.File;
import java.io.IOException;
+import java.util.NoSuchElementException;
import java.util.Set;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
@@ -49,6 +54,8 @@ class BackupTestStore implements AutoCloseable {
private final FakeClock fakeClock;
private AppEngineRule appEngine;
+ /** For fetching the persisted Datastore Entity directly. */
+ private DatastoreService datastoreService;
private CommitLogCheckpoint prevCommitLogCheckpoint;
@@ -61,31 +68,69 @@ class BackupTestStore implements AutoCloseable {
.withClock(fakeClock)
.build();
this.appEngine.beforeEach(null);
+ datastoreService = DatastoreServiceFactory.getDatastoreService();
}
- void transact(Iterable