From 5fb337b2fe57e6c7b92d22d4c36e7dd704670f4d Mon Sep 17 00:00:00 2001 From: Weimin Yu Date: Wed, 17 Jun 2020 22:10:14 -0400 Subject: [PATCH] Refactor pipline for Datastore backup loading (#628) * Refactor pipline for Datastore backup loading Refactored pipeline transforms. Added testing utilities that handles assertions better. Investigated and documented challenges in serializing Ofy entities without side effects. --- .../registry/backup/AppEngineEnvironment.java | 2 +- .../registry/backup/VersionedEntity.java | 46 ++++- .../registry/beam/initsql/BackupPaths.java | 23 ++- .../beam/initsql/CommitLogTransforms.java | 109 ------------ .../beam/initsql/ExportLoadingTransforms.java | 82 --------- .../registry/beam/initsql/Transforms.java | 142 ++++++++++++++- .../beam/initsql/BackupTestStore.java | 67 ++++++-- .../beam/initsql/BackupTestStoreTest.java | 125 +++++--------- .../beam/initsql/CommitLogTransformsTest.java | 75 +++----- .../initsql/ExportloadingTransformsTest.java | 60 +++---- .../beam/initsql/InitSqlTestUtils.java | 161 ++++++++++++++++++ 11 files changed, 517 insertions(+), 375 deletions(-) delete mode 100644 core/src/main/java/google/registry/beam/initsql/CommitLogTransforms.java delete mode 100644 core/src/main/java/google/registry/beam/initsql/ExportLoadingTransforms.java create mode 100644 core/src/test/java/google/registry/beam/initsql/InitSqlTestUtils.java diff --git a/core/src/main/java/google/registry/backup/AppEngineEnvironment.java b/core/src/main/java/google/registry/backup/AppEngineEnvironment.java index 7c955370e..f22bf3621 100644 --- a/core/src/main/java/google/registry/backup/AppEngineEnvironment.java +++ b/core/src/main/java/google/registry/backup/AppEngineEnvironment.java @@ -31,7 +31,7 @@ public class AppEngineEnvironment implements Closeable { private boolean isPlaceHolderNeeded; - AppEngineEnvironment() { + public AppEngineEnvironment() { isPlaceHolderNeeded = ApiProxy.getCurrentEnvironment() == null; // isPlaceHolderNeeded may be true when we are invoked in a test with AppEngineRule. if (isPlaceHolderNeeded) { diff --git a/core/src/main/java/google/registry/backup/VersionedEntity.java b/core/src/main/java/google/registry/backup/VersionedEntity.java index 2cbd56150..36570fad4 100644 --- a/core/src/main/java/google/registry/backup/VersionedEntity.java +++ b/core/src/main/java/google/registry/backup/VersionedEntity.java @@ -28,11 +28,49 @@ import java.util.stream.Stream; import javax.annotation.Nullable; /** - * A Datastore {@link Entity Entity's} timestamped state. + * A Datastore {@link Entity Entity's} serialized state with timestamp. The intended use case is a + * multi-stage pipeline where an Entity's Java form is not needed in most stages. * - *

For a new or updated Entity, its ProtocolBuffer bytes are stored along with its {@link Key}. - * For a deleted entity, only its {@link Key} is stored, and the {@link #entityProtoBytes} is left - * as null. + *

For a new or updated Entity, its serialized bytes are stored along with its Datastore {@link + * Key}. For a deleted entity, only its Datastore {@link Key} is stored, and the {@link + * #entityProtoBytes} field is left unset. + * + *

Storing raw bytes is motivated by two factors. First, since I/O is frequent and the Java + * objects are rarely needed in our target use case, storing raw bytes is the most efficient + * approach. More importantly, due to our data model and our customization of {@link + * google.registry.model.ofy.ObjectifyService ObjectifyService}, it is challenging to implement a + * serializer for Objectify entities that preserves the value of all properties. Without such + * serializers, Objectify entities cannot be used in a pipeline. + * + *

Objectify entities do not implement {@link Serializable}, serialization of such objects is as + * follows: + * + *

+ * + *

When the first conversion above is applied to an Objectify entity, a property value in the + * output may differ from the input in two situations: + * + *

+ * + *

Working around the side effects caused by our customization is difficult. Any solution would + * likely rely on Objectify's stack of context. However, many Objectify invocations in our code base + * are hardcoded to call the customized version of ObjectifyService, rendering Objectify's stack + * useless. + * + *

For now, this inability to use Objectify entities in pipelines is mostly a testing problem: we + * can not perform {@link org.apache.beam.sdk.testing.PAssert BEAM pipeline assertions} on Objectify + * entities. {@code InitSqlTestUtils.assertContainsExactlyElementsIn} is an example of a workaround. * *

Note that {@link Optional java.util.Optional} is not serializable, therefore cannot be used as * property type in this class. diff --git a/core/src/main/java/google/registry/beam/initsql/BackupPaths.java b/core/src/main/java/google/registry/beam/initsql/BackupPaths.java index 0a4c498a6..52973eb29 100644 --- a/core/src/main/java/google/registry/beam/initsql/BackupPaths.java +++ b/core/src/main/java/google/registry/beam/initsql/BackupPaths.java @@ -15,8 +15,11 @@ package google.registry.beam.initsql; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Strings.isNullOrEmpty; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Streams; import org.joda.time.DateTime; /** @@ -45,6 +48,21 @@ public final class BackupPaths { return String.format(EXPORT_PATTERN_TEMPLATE, exportDir, kind, WILDCARD_CHAR); } + /** + * Returns an {@link ImmutableList} of regex patterns that match all Datastore export files of the + * given {@code kinds}. + * + * @param exportDir path to the top directory of a Datastore export + * @param kinds all entity 'kinds' to be matched + */ + public static ImmutableList getExportFilePatterns( + String exportDir, Iterable kinds) { + checkNotNull(kinds, "kinds"); + return Streams.stream(kinds) + .map(kind -> getExportFileNamePattern(exportDir, kind)) + .collect(ImmutableList.toImmutableList()); + } + /** * Returns the fully qualified path of a Datastore export file with the given {@code kind} and * {@code shard}. @@ -60,8 +78,9 @@ public final class BackupPaths { return String.format(EXPORT_PATTERN_TEMPLATE, exportDir, kind, Integer.toString(shard)); } - public static String getCommitLogFileNamePattern(String commitLogDir) { - return String.format(COMMIT_LOG_PATTERN_TEMPLATE, commitLogDir); + /** Returns an {@link ImmutableList} of regex patterns that match all CommitLog files. */ + public static ImmutableList getCommitLogFilePatterns(String commitLogDir) { + return ImmutableList.of(String.format(COMMIT_LOG_PATTERN_TEMPLATE, commitLogDir)); } /** Gets the Commit timestamp from a CommitLog file name. */ diff --git a/core/src/main/java/google/registry/beam/initsql/CommitLogTransforms.java b/core/src/main/java/google/registry/beam/initsql/CommitLogTransforms.java deleted file mode 100644 index 429722d8a..000000000 --- a/core/src/main/java/google/registry/beam/initsql/CommitLogTransforms.java +++ /dev/null @@ -1,109 +0,0 @@ -// Copyright 2020 The Nomulus Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package google.registry.beam.initsql; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static google.registry.beam.initsql.BackupPaths.getCommitLogFileNamePattern; -import static google.registry.beam.initsql.BackupPaths.getCommitLogTimestamp; -import static google.registry.beam.initsql.Transforms.processFiles; -import static google.registry.util.DateTimeUtils.isBeforeOrAt; - -import google.registry.backup.CommitLogImports; -import google.registry.backup.VersionedEntity; -import java.io.IOException; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.FileIO.ReadableFile; -import org.apache.beam.sdk.io.fs.MatchResult.Metadata; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.joda.time.DateTime; - -/** - * {@link org.apache.beam.sdk.transforms.PTransform Pipeline transforms} for loading from Nomulus - * CommitLog files. They are all part of a transformation that loads raw records from a sequence of - * Datastore CommitLog files, and are broken apart for testing. - */ -public class CommitLogTransforms { - - /** - * Returns a {@link PTransform transform} that can generate a collection of patterns that match - * all Datastore CommitLog files. - */ - public static PTransform> getCommitLogFilePatterns( - String commitLogDir) { - return Create.of(getCommitLogFileNamePattern(commitLogDir)).withCoder(StringUtf8Coder.of()); - } - - /** - * Returns files with timestamps between {@code fromTime} (inclusive) and {@code endTime} - * (exclusive). - */ - public static PTransform, PCollection> - filterCommitLogsByTime(DateTime fromTime, DateTime toTime) { - checkNotNull(fromTime, "fromTime"); - checkNotNull(toTime, "toTime"); - checkArgument( - fromTime.isBefore(toTime), - "Invalid time range: fromTime (%s) is before endTime (%s)", - fromTime, - toTime); - return ParDo.of(new FilterCommitLogFileByTime(fromTime, toTime)); - } - - /** Returns a {@link PTransform} from file {@link Metadata} to {@link VersionedEntity}. */ - public static PTransform, PCollection> - loadCommitLogsFromFiles() { - return processFiles(new LoadOneCommitLogsFile()); - } - - static class FilterCommitLogFileByTime extends DoFn { - private final DateTime fromTime; - private final DateTime toTime; - - public FilterCommitLogFileByTime(DateTime fromTime, DateTime toTime) { - this.fromTime = fromTime; - this.toTime = toTime; - } - - @ProcessElement - public void processElement(@Element String fileName, OutputReceiver out) { - DateTime timestamp = getCommitLogTimestamp(fileName); - if (isBeforeOrAt(fromTime, timestamp) && timestamp.isBefore(toTime)) { - out.output(fileName); - } - } - } - - /** - * Reads a CommitLog file and converts its content into {@link VersionedEntity VersionedEntities}. - */ - static class LoadOneCommitLogsFile extends DoFn { - - @ProcessElement - public void processElement(@Element ReadableFile file, OutputReceiver out) { - try { - CommitLogImports.loadEntities(file.open()).forEach(out::output); - } catch (IOException e) { - // Let the pipeline retry the whole file. - throw new RuntimeException(e); - } - } - } -} diff --git a/core/src/main/java/google/registry/beam/initsql/ExportLoadingTransforms.java b/core/src/main/java/google/registry/beam/initsql/ExportLoadingTransforms.java deleted file mode 100644 index 4f2324c54..000000000 --- a/core/src/main/java/google/registry/beam/initsql/ExportLoadingTransforms.java +++ /dev/null @@ -1,82 +0,0 @@ -// Copyright 2020 The Nomulus Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package google.registry.beam.initsql; - -import static google.registry.beam.initsql.BackupPaths.getExportFileNamePattern; -import static google.registry.beam.initsql.Transforms.processFiles; - -import com.google.common.collect.ImmutableList; -import google.registry.backup.VersionedEntity; -import google.registry.tools.LevelDbLogReader; -import java.io.IOException; -import java.util.Collection; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.FileIO.ReadableFile; -import org.apache.beam.sdk.io.fs.MatchResult.Metadata; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; - -/** - * {@link PTransform Pipeline transforms} for loading from Datastore export files. They are all part - * of a transformation that loads raw records from a Datastore export, and are broken apart for - * testing. - */ -public class ExportLoadingTransforms { - - /** - * Returns a {@link PTransform transform} that can generate a collection of patterns that match - * all Datastore export files of the given {@code kinds}. - */ - public static PTransform> getDatastoreExportFilePatterns( - String exportDir, Collection kinds) { - return Create.of( - kinds.stream() - .map(kind -> getExportFileNamePattern(exportDir, kind)) - .collect(ImmutableList.toImmutableList())) - .withCoder(StringUtf8Coder.of()); - } - - /** Returns a {@link PTransform} from file {@link Metadata} to {@link VersionedEntity}. */ - public static PTransform, PCollection> - loadExportDataFromFiles() { - return processFiles(new LoadOneExportShard()); - } - - /** - * Reads a LevelDb file and converts each raw record into a {@link VersionedEntity}. All such - * entities use {@link Long#MIN_VALUE} as timestamp, so that they go before data from CommitLogs. - * - *

LevelDb files are not seekable because a large object may span multiple blocks. If a - * sequential read fails, the file needs to be retried from the beginning. - */ - private static class LoadOneExportShard extends DoFn { - - private static final long TIMESTAMP = Long.MIN_VALUE; - - @ProcessElement - public void processElement(@Element ReadableFile file, OutputReceiver output) { - try { - LevelDbLogReader.from(file.open()) - .forEachRemaining(record -> output.output(VersionedEntity.from(TIMESTAMP, record))); - } catch (IOException e) { - // Let the pipeline retry the whole file. - throw new RuntimeException(e); - } - } - } -} diff --git a/core/src/main/java/google/registry/beam/initsql/Transforms.java b/core/src/main/java/google/registry/beam/initsql/Transforms.java index 8eefc0cef..343e5aa29 100644 --- a/core/src/main/java/google/registry/beam/initsql/Transforms.java +++ b/core/src/main/java/google/registry/beam/initsql/Transforms.java @@ -14,22 +14,66 @@ package google.registry.beam.initsql; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static google.registry.beam.initsql.BackupPaths.getCommitLogTimestamp; +import static google.registry.beam.initsql.BackupPaths.getExportFilePatterns; +import static google.registry.util.DateTimeUtils.START_OF_TIME; +import static google.registry.util.DateTimeUtils.isBeforeOrAt; + +import avro.shaded.com.google.common.collect.Iterators; +import com.google.common.annotations.VisibleForTesting; +import google.registry.backup.CommitLogImports; import google.registry.backup.VersionedEntity; +import google.registry.tools.LevelDbLogReader; +import java.util.Collection; +import java.util.Iterator; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.Compression; import org.apache.beam.sdk.io.FileIO; import org.apache.beam.sdk.io.FileIO.ReadableFile; import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; +import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.ProcessFunction; +import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; +import org.joda.time.DateTime; /** - * Common {@link PTransform pipeline transforms} used in pipelines that load from both Datastore - * export files and Nomulus CommitLog files. + * {@link PTransform Pipeline transforms} used in pipelines that load from both Datastore export + * files and Nomulus CommitLog files. */ -public class Transforms { +public final class Transforms { + + private Transforms() {} + + /** + * The commitTimestamp assigned to all entities loaded from a Datastore export file. The exact + * value does not matter, but it must be lower than the timestamps of real CommitLog records. + */ + @VisibleForTesting static final long EXPORT_ENTITY_TIME_STAMP = START_OF_TIME.getMillis(); + + /** + * Returns a {@link PTransform transform} that can generate a collection of patterns that match + * all Datastore CommitLog files. + */ + public static PTransform> getCommitLogFilePatterns( + String commitLogDir) { + return toStringPCollection(BackupPaths.getCommitLogFilePatterns(commitLogDir)); + } + + /** + * Returns a {@link PTransform transform} that can generate a collection of patterns that match + * all Datastore export files of the given {@code kinds}. + */ + public static PTransform> getDatastoreExportFilePatterns( + String exportDir, Collection kinds) { + return toStringPCollection(getExportFilePatterns(exportDir, kinds)); + } /** * Returns a {@link PTransform} from file name patterns to file {@link Metadata Metadata records}. @@ -43,11 +87,46 @@ public class Transforms { }; } + /** + * Returns CommitLog files with timestamps between {@code fromTime} (inclusive) and {@code + * endTime} (exclusive). + */ + public static PTransform, PCollection> + filterCommitLogsByTime(DateTime fromTime, DateTime toTime) { + return ParDo.of(new FilterCommitLogFileByTime(fromTime, toTime)); + } + + /** Returns a {@link PTransform} from file {@link Metadata} to {@link VersionedEntity}. */ + public static PTransform, PCollection> + loadExportDataFromFiles() { + return processFiles( + new BackupFileReader( + file -> + Iterators.transform( + LevelDbLogReader.from(file.open()), + (byte[] bytes) -> VersionedEntity.from(EXPORT_ENTITY_TIME_STAMP, bytes)))); + } + + /** Returns a {@link PTransform} from file {@link Metadata} to {@link VersionedEntity}. */ + public static PTransform, PCollection> + loadCommitLogsFromFiles() { + return processFiles( + new BackupFileReader(file -> CommitLogImports.loadEntities(file.open()).iterator())); + } + + /** + * Returns a {@link PTransform} that produces a {@link PCollection} containing all elements in the + * given {@link Iterable}. + */ + static PTransform> toStringPCollection(Iterable strings) { + return Create.of(strings).withCoder(StringUtf8Coder.of()); + } + /** * Returns a {@link PTransform} from file {@link Metadata} to {@link VersionedEntity} using * caller-provided {@code transformer}. */ - public static PTransform, PCollection> processFiles( + static PTransform, PCollection> processFiles( DoFn transformer) { return new PTransform, PCollection>() { @Override @@ -55,7 +134,62 @@ public class Transforms { return input .apply(FileIO.readMatches().withCompression(Compression.UNCOMPRESSED)) .apply(transformer.getClass().getSimpleName(), ParDo.of(transformer)); + // TODO(weiminyu): reshuffle to enable dynamic work rebalance per beam dev guide } }; } + + private static class FilterCommitLogFileByTime extends DoFn { + private final DateTime fromTime; + private final DateTime toTime; + + public FilterCommitLogFileByTime(DateTime fromTime, DateTime toTime) { + checkNotNull(fromTime, "fromTime"); + checkNotNull(toTime, "toTime"); + checkArgument( + fromTime.isBefore(toTime), + "Invalid time range: fromTime (%s) is before endTime (%s)", + fromTime, + toTime); + this.fromTime = fromTime; + this.toTime = toTime; + } + + @ProcessElement + public void processElement(@Element String fileName, OutputReceiver out) { + DateTime timestamp = getCommitLogTimestamp(fileName); + if (isBeforeOrAt(fromTime, timestamp) && timestamp.isBefore(toTime)) { + out.output(fileName); + } + } + } + + /** + * Reads from a Datastore backup file and converts its content into {@link VersionedEntity + * VersionedEntities}. + * + *

The input file may be either a LevelDb file from a Datastore export or a CommitLog file + * generated by the Nomulus server. In either case, the file contains variable-length records and + * must be read sequentially from the beginning. If the read fails, the file needs to be retried + * from the beginning. + */ + private static class BackupFileReader extends DoFn { + private final ProcessFunction> reader; + + private BackupFileReader(ProcessFunction> reader) { + this.reader = reader; + } + + @ProcessElement + public void processElement(@Element ReadableFile file, OutputReceiver out) { + try { + reader.apply(file).forEachRemaining(out::output); + } catch (Exception e) { + // Let the pipeline use default retry strategy on the whole file. For GCP Dataflow this + // means retrying up to 4 times (may include other files grouped with this one), and failing + // the pipeline if no success. + throw new RuntimeException(e); + } + } + } } diff --git a/core/src/test/java/google/registry/beam/initsql/BackupTestStore.java b/core/src/test/java/google/registry/beam/initsql/BackupTestStore.java index 2b5dd33e1..be19740bf 100644 --- a/core/src/test/java/google/registry/beam/initsql/BackupTestStore.java +++ b/core/src/test/java/google/registry/beam/initsql/BackupTestStore.java @@ -18,15 +18,20 @@ import static com.google.common.base.Preconditions.checkState; import static google.registry.model.ofy.ObjectifyService.ofy; import static google.registry.persistence.transaction.TransactionManagerFactory.tm; +import com.google.appengine.api.datastore.DatastoreService; +import com.google.appengine.api.datastore.DatastoreServiceFactory; import com.google.appengine.api.datastore.Entity; +import com.google.appengine.api.datastore.EntityNotFoundException; import com.googlecode.objectify.Key; import google.registry.backup.CommitLogExports; +import google.registry.backup.VersionedEntity; import google.registry.model.ofy.CommitLogCheckpoint; import google.registry.testing.AppEngineRule; import google.registry.testing.FakeClock; import google.registry.tools.LevelDbFileBuilder; import java.io.File; import java.io.IOException; +import java.util.NoSuchElementException; import java.util.Set; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; @@ -49,6 +54,8 @@ class BackupTestStore implements AutoCloseable { private final FakeClock fakeClock; private AppEngineRule appEngine; + /** For fetching the persisted Datastore Entity directly. */ + private DatastoreService datastoreService; private CommitLogCheckpoint prevCommitLogCheckpoint; @@ -61,31 +68,69 @@ class BackupTestStore implements AutoCloseable { .withClock(fakeClock) .build(); this.appEngine.beforeEach(null); + datastoreService = DatastoreServiceFactory.getDatastoreService(); } - void transact(Iterable deletes, Iterable newOrUpdated) { + /** Returns the timestamp of the transaction. */ + long transact(Iterable deletes, Iterable newOrUpdated) { + long timestamp = fakeClock.nowUtc().getMillis(); tm().transact( () -> { ofy().delete().entities(deletes); ofy().save().entities(newOrUpdated); }); fakeClock.advanceOneMilli(); + return timestamp; } - /** Inserts or updates {@code entities} in the Datastore. */ + /** + * Inserts or updates {@code entities} in the Datastore and returns the timestamp of this + * transaction. + */ @SafeVarargs - final void insertOrUpdate(Object... entities) { + final long insertOrUpdate(Object... entities) { + long timestamp = fakeClock.nowUtc().getMillis(); tm().transact(() -> ofy().save().entities(entities).now()); fakeClock.advanceOneMilli(); + return timestamp; } - /** Deletes {@code entities} from the Datastore. */ + /** Deletes {@code entities} from the Datastore and returns the timestamp of this transaction. */ @SafeVarargs - final void delete(Object... entities) { + final long delete(Object... entities) { + long timestamp = fakeClock.nowUtc().getMillis(); tm().transact(() -> ofy().delete().entities(entities).now()); fakeClock.advanceOneMilli(); + return timestamp; } + /** + * Returns the persisted data that corresponds to {@code ofyEntity} as a Datastore {@link Entity}. + * + *

A typical use case for this method is in a test, when the caller has persisted newly created + * Objectify entity and want to find out the values of certain assign-on-persist properties. See + * {@link VersionedEntity} for more information. + */ + Entity loadAsDatastoreEntity(Object ofyEntity) { + try { + return datastoreService.get(Key.create(ofyEntity).getRaw()); + } catch (EntityNotFoundException e) { + throw new NoSuchElementException(e.getMessage()); + } + } + + /** + * Returns the persisted data that corresponds to {@code ofyEntity} as an Objectify entity. + * + *

See {@link #loadAsDatastoreEntity} and {@link VersionedEntity} for more information. + */ + Object loadAsOfyEntity(Object ofyEntity) { + try { + return ofy().load().fromEntity(datastoreService.get(Key.create(ofyEntity).getRaw())); + } catch (EntityNotFoundException e) { + throw new NoSuchElementException(e.getMessage()); + } + } /** * Exports entities of the caller provided types and returns the directory where data is exported. * @@ -96,7 +141,8 @@ class BackupTestStore implements AutoCloseable { * to simulate an inconsistent export * @return directory where data is exported */ - File export(String exportRootPath, Iterable> pojoTypes, Set> excludes) + File export( + String exportRootPath, Iterable> pojoTypes, Set> excludes) throws IOException { File exportDirectory = getExportDirectory(exportRootPath); for (Class pojoType : pojoTypes) { @@ -119,8 +165,9 @@ class BackupTestStore implements AutoCloseable { for (Object pojo : ofy().load().type(pojoType).iterable()) { if (!excludes.contains(Key.create(pojo))) { try { - builder.addEntity(toEntity(pojo)); - } catch (IOException e) { + // Must preserve UpdateTimestamp. Do not use ofy().save().toEntity(pojo)! + builder.addEntity(datastoreService.get(Key.create(pojo).getRaw())); + } catch (Exception e) { throw new RuntimeException(e); } } @@ -144,10 +191,6 @@ class BackupTestStore implements AutoCloseable { } } - private Entity toEntity(Object pojo) { - return tm().transactNew(() -> ofy().save().toEntity(pojo)); - } - private File getExportDirectory(String exportRootPath) { File exportDirectory = new File(exportRootPath, fakeClock.nowUtc().toString(EXPORT_TIMESTAMP_FORMAT)); diff --git a/core/src/test/java/google/registry/beam/initsql/BackupTestStoreTest.java b/core/src/test/java/google/registry/beam/initsql/BackupTestStoreTest.java index ecdf5fd64..d14512e0a 100644 --- a/core/src/test/java/google/registry/beam/initsql/BackupTestStoreTest.java +++ b/core/src/test/java/google/registry/beam/initsql/BackupTestStoreTest.java @@ -18,19 +18,13 @@ import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.Truth.assertWithMessage; import static com.google.common.truth.Truth8.assertThat; import static google.registry.model.common.EntityGroupRoot.getCrossTldKey; -import static google.registry.model.ofy.ObjectifyService.ofy; -import static google.registry.persistence.transaction.TransactionManagerFactory.tm; import static google.registry.testing.DatastoreHelper.newContactResource; import static google.registry.testing.DatastoreHelper.newDomainBase; import static google.registry.testing.DatastoreHelper.newRegistry; -import com.google.appengine.api.datastore.Entity; -import com.google.appengine.api.datastore.EntityTranslator; -import com.google.common.base.Predicates; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Streams; -import com.google.storage.onestore.v3.OnestoreEntity.EntityProto; import com.googlecode.objectify.Key; import google.registry.backup.CommitLogImports; import google.registry.backup.VersionedEntity; @@ -48,10 +42,9 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.Collections; -import java.util.Optional; import java.util.Set; -import java.util.function.Function; import java.util.stream.Stream; +import org.apache.beam.sdk.values.KV; import org.joda.time.DateTime; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -63,17 +56,18 @@ import org.junit.jupiter.api.io.TempDir; public class BackupTestStoreTest { private static final DateTime START_TIME = DateTime.parse("2000-01-01T00:00:00.0Z"); - private FakeClock fakeClock; - private BackupTestStore store; - - private Registry registry; - private ContactResource contact; - private DomainBase domain; - @TempDir File tempDir; @RegisterExtension InjectRule injectRule = new InjectRule(); + private FakeClock fakeClock; + private BackupTestStore store; + + // Test data: + private Registry registry; + private ContactResource contact; + private DomainBase domain; + @BeforeEach void beforeEach() throws Exception { fakeClock = new FakeClock(START_TIME); @@ -82,9 +76,15 @@ public class BackupTestStoreTest { registry = newRegistry("tld1", "TLD1"); store.insertOrUpdate(registry); + contact = newContactResource("contact_1"); domain = newDomainBase("domain1.tld1", contact); store.insertOrUpdate(contact, domain); + + // Save persisted data for assertions. + registry = (Registry) store.loadAsOfyEntity(registry); + contact = (ContactResource) store.loadAsOfyEntity(contact); + domain = (DomainBase) store.loadAsOfyEntity(domain); } @AfterEach @@ -131,39 +131,33 @@ public class BackupTestStoreTest { void export_dataReadBack() throws IOException { String exportRootPath = tempDir.getAbsolutePath(); File exportFolder = export(exportRootPath, Collections.EMPTY_SET); - ImmutableList tldStrings = - loadPropertyFromExportedEntities( - new File(exportFolder, "/all_namespaces/kind_Registry/input-0"), - Registry.class, - Registry::getTldStr); - assertThat(tldStrings).containsExactly("tld1"); - ImmutableList domainStrings = - loadPropertyFromExportedEntities( - new File(exportFolder, "/all_namespaces/kind_DomainBase/input-0"), - DomainBase.class, - DomainBase::getDomainName); - assertThat(domainStrings).containsExactly("domain1.tld1"); - ImmutableList contactIds = - loadPropertyFromExportedEntities( - new File(exportFolder, "/all_namespaces/kind_ContactResource/input-0"), - ContactResource.class, - ContactResource::getContactId); - assertThat(contactIds).containsExactly("contact_1"); + ImmutableList loadedRegistries = + loadExportedEntities(new File(exportFolder, "/all_namespaces/kind_Registry/input-0")); + assertThat(loadedRegistries).containsExactly(registry); + + ImmutableList loadedDomains = + loadExportedEntities(new File(exportFolder, "/all_namespaces/kind_DomainBase/input-0")); + assertThat(loadedDomains).containsExactly(domain); + + ImmutableList loadedContacts = + loadExportedEntities( + new File(exportFolder, "/all_namespaces/kind_ContactResource/input-0")); + assertThat(loadedContacts).containsExactly(contact); } @Test void export_excludeSomeEntity() throws IOException { - store.insertOrUpdate(newRegistry("tld2", "TLD2")); + Registry newRegistry = newRegistry("tld2", "TLD2"); + store.insertOrUpdate(newRegistry); + newRegistry = (Registry) store.loadAsOfyEntity(newRegistry); + String exportRootPath = tempDir.getAbsolutePath(); File exportFolder = export( exportRootPath, ImmutableSet.of(Key.create(getCrossTldKey(), Registry.class, "tld1"))); - ImmutableList tlds = - loadPropertyFromExportedEntities( - new File(exportFolder, "/all_namespaces/kind_Registry/input-0"), - Registry.class, - Registry::getTldStr); - assertThat(tlds).containsExactly("tld2"); + ImmutableList loadedRegistries = + loadExportedEntities(new File(exportFolder, "/all_namespaces/kind_Registry/input-0")); + assertThat(loadedRegistries).containsExactly(newRegistry); } @Test @@ -178,14 +172,11 @@ public class BackupTestStoreTest { File commitLogFile = store.saveCommitLogs(tempDir.getAbsolutePath()); assertThat(commitLogFile.exists()).isTrue(); ImmutableList mutations = CommitLogImports.loadEntities(commitLogFile); - assertThat(mutations.stream().map(VersionedEntity::getEntity).map(Optional::get)) - .containsExactlyElementsIn(toDatastoreEntities(registry, contact, domain)); - // Registry created at -2, contract and domain created at -1. - assertThat(mutations.stream().map(VersionedEntity::commitTimeMills)) - .containsExactly( - fakeClock.nowUtc().getMillis() - 2, - fakeClock.nowUtc().getMillis() - 1, - fakeClock.nowUtc().getMillis() - 1); + InitSqlTestUtils.assertContainsExactlyElementsIn( + mutations, + KV.of(fakeClock.nowUtc().getMillis() - 2, store.loadAsDatastoreEntity(registry)), + KV.of(fakeClock.nowUtc().getMillis() - 1, store.loadAsDatastoreEntity(contact)), + KV.of(fakeClock.nowUtc().getMillis() - 1, store.loadAsDatastoreEntity(domain))); } @Test @@ -205,18 +196,13 @@ public class BackupTestStoreTest { .build(); store.insertOrUpdate(domain, newContact); store.delete(contact); - fakeClock.advanceOneMilli(); File commitLogFile = store.saveCommitLogs(tempDir.getAbsolutePath()); ImmutableList mutations = CommitLogImports.loadEntities(commitLogFile); - assertThat(mutations.stream().filter(VersionedEntity::isDelete).map(VersionedEntity::key)) - .containsExactly(Key.create(contact).getRaw()); - - assertThat( - mutations.stream() - .filter(Predicates.not(VersionedEntity::isDelete)) - .map(VersionedEntity::getEntity) - .map(Optional::get)) - .containsExactlyElementsIn(toDatastoreEntities(domain, newContact)); + InitSqlTestUtils.assertContainsExactlyElementsIn( + mutations, + KV.of(fakeClock.nowUtc().getMillis() - 1, Key.create(contact).getRaw()), + KV.of(fakeClock.nowUtc().getMillis() - 2, store.loadAsDatastoreEntity(domain)), + KV.of(fakeClock.nowUtc().getMillis() - 2, store.loadAsDatastoreEntity(newContact))); } @Test @@ -236,27 +222,10 @@ public class BackupTestStoreTest { excludes); } - private static ImmutableList loadPropertyFromExportedEntities( - File dataFile, Class ofyEntityType, Function getter) throws IOException { + private static ImmutableList loadExportedEntities(File dataFile) throws IOException { return Streams.stream(LevelDbLogReader.from(dataFile.toPath())) - .map(bytes -> toOfyEntity(bytes, ofyEntityType)) - .map(getter) + .map(InitSqlTestUtils::bytesToEntity) + .map(InitSqlTestUtils::datastoreToOfyEntity) .collect(ImmutableList.toImmutableList()); } - - private static T toOfyEntity(byte[] rawRecord, Class ofyEntityType) { - EntityProto proto = new EntityProto(); - proto.parseFrom(rawRecord); - Entity entity = EntityTranslator.createFromPb(proto); - return ofyEntityType.cast(ofy().load().fromEntity(entity)); - } - - @SafeVarargs - private static ImmutableList toDatastoreEntities(Object... ofyEntities) { - return tm().transact( - () -> - Stream.of(ofyEntities) - .map(oe -> ofy().save().toEntity(oe)) - .collect(ImmutableList.toImmutableList())); - } } diff --git a/core/src/test/java/google/registry/beam/initsql/CommitLogTransformsTest.java b/core/src/test/java/google/registry/beam/initsql/CommitLogTransformsTest.java index d4240cd63..63df45316 100644 --- a/core/src/test/java/google/registry/beam/initsql/CommitLogTransformsTest.java +++ b/core/src/test/java/google/registry/beam/initsql/CommitLogTransformsTest.java @@ -14,13 +14,10 @@ package google.registry.beam.initsql; -import static google.registry.model.ofy.ObjectifyService.ofy; -import static google.registry.persistence.transaction.TransactionManagerFactory.tm; import static google.registry.testing.DatastoreHelper.newContactResource; import static google.registry.testing.DatastoreHelper.newDomainBase; import static google.registry.testing.DatastoreHelper.newRegistry; -import com.google.appengine.api.datastore.Entity; import com.google.common.collect.ImmutableList; import google.registry.backup.VersionedEntity; import google.registry.model.contact.ContactResource; @@ -39,6 +36,7 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.joda.time.DateTime; import org.junit.After; @@ -50,7 +48,7 @@ import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -/** Unit tests for {@link CommitLogTransforms}. */ +/** Unit tests for {@link Transforms} related to loading CommitLogs. */ // TODO(weiminyu): Upgrade to JUnit5 when TestPipeline is upgraded. It is also easy to adapt with // a wrapper. @RunWith(JUnit4.class) @@ -69,9 +67,11 @@ public class CommitLogTransformsTest implements Serializable { private transient BackupTestStore store; private File commitLogsDir; private File firstCommitLogFile; - // Canned data that are persisted to Datastore, used by assertions in tests. - // TODO(weiminyu): use Ofy entity pojos directly. - private transient ImmutableList persistedEntities; + + // Canned data: + private transient Registry registry; + private transient ContactResource contact; + private transient DomainBase domain; @Before public void beforeEach() throws Exception { @@ -79,15 +79,17 @@ public class CommitLogTransformsTest implements Serializable { store = new BackupTestStore(fakeClock); injectRule.setStaticField(Ofy.class, "clock", fakeClock); - Registry registry = newRegistry("tld1", "TLD1"); + registry = newRegistry("tld1", "TLD1"); store.insertOrUpdate(registry); - ContactResource contact1 = newContactResource("contact_1"); - DomainBase domain1 = newDomainBase("domain1.tld1", contact1); - store.insertOrUpdate(contact1, domain1); - persistedEntities = - ImmutableList.of(registry, contact1, domain1).stream() - .map(ofyEntity -> tm().transact(() -> ofy().save().toEntity(ofyEntity))) - .collect(ImmutableList.toImmutableList()); + contact = newContactResource("contact_1"); + domain = newDomainBase("domain1.tld1", contact); + store.insertOrUpdate(contact, domain); + + // Save persisted data for assertions. + registry = (Registry) store.loadAsOfyEntity(registry); + contact = (ContactResource) store.loadAsOfyEntity(contact); + domain = (DomainBase) store.loadAsOfyEntity(domain); + commitLogsDir = temporaryFolder.newFolder(); firstCommitLogFile = store.saveCommitLogs(commitLogsDir.getAbsolutePath()); } @@ -106,7 +108,7 @@ public class CommitLogTransformsTest implements Serializable { PCollection patterns = pipeline.apply( "Get CommitLog file patterns", - CommitLogTransforms.getCommitLogFilePatterns(commitLogsDir.getAbsolutePath())); + Transforms.getCommitLogFilePatterns(commitLogsDir.getAbsolutePath())); ImmutableList expectedPatterns = ImmutableList.of(commitLogsDir.getAbsolutePath() + "/commit_diff_until_*"); @@ -165,7 +167,7 @@ public class CommitLogTransformsTest implements Serializable { Create.of(commitLogFilenames).withCoder(StringUtf8Coder.of())) .apply( "Filtered by Time", - CommitLogTransforms.filterCommitLogsByTime( + Transforms.filterCommitLogsByTime( DateTime.parse("2000-01-01T00:00:00.001Z"), DateTime.parse("2000-01-01T00:00:00.003Z"))); PAssert.that(filteredFilenames) @@ -183,40 +185,15 @@ public class CommitLogTransformsTest implements Serializable { pipeline .apply( "Get CommitLog file patterns", - CommitLogTransforms.getCommitLogFilePatterns(commitLogsDir.getAbsolutePath())) + Transforms.getCommitLogFilePatterns(commitLogsDir.getAbsolutePath())) .apply("Find CommitLogs", Transforms.getFilesByPatterns()) - .apply(CommitLogTransforms.loadCommitLogsFromFiles()); + .apply(Transforms.loadCommitLogsFromFiles()); - PCollection timestamps = - entities.apply( - "Extract commitTimeMillis", - ParDo.of( - new DoFn() { - @ProcessElement - public void processElement( - @Element VersionedEntity entity, OutputReceiver out) { - out.output(entity.commitTimeMills()); - } - })); - PAssert.that(timestamps) - .containsInAnyOrder( - fakeClock.nowUtc().getMillis() - 2, - fakeClock.nowUtc().getMillis() - 1, - fakeClock.nowUtc().getMillis() - 1); - - PCollection datastoreEntities = - entities.apply( - "To Datastore Entities", - ParDo.of( - new DoFn() { - @ProcessElement - public void processElement( - @Element VersionedEntity entity, OutputReceiver out) { - entity.getEntity().ifPresent(out::output); - } - })); - - PAssert.that(datastoreEntities).containsInAnyOrder(persistedEntities); + InitSqlTestUtils.assertContainsExactlyElementsIn( + entities, + KV.of(fakeClock.nowUtc().getMillis() - 2, store.loadAsDatastoreEntity(registry)), + KV.of(fakeClock.nowUtc().getMillis() - 1, store.loadAsDatastoreEntity(contact)), + KV.of(fakeClock.nowUtc().getMillis() - 1, store.loadAsDatastoreEntity(domain))); pipeline.run(); } diff --git a/core/src/test/java/google/registry/beam/initsql/ExportloadingTransformsTest.java b/core/src/test/java/google/registry/beam/initsql/ExportloadingTransformsTest.java index a08f69d49..c61038304 100644 --- a/core/src/test/java/google/registry/beam/initsql/ExportloadingTransformsTest.java +++ b/core/src/test/java/google/registry/beam/initsql/ExportloadingTransformsTest.java @@ -14,13 +14,10 @@ package google.registry.beam.initsql; -import static google.registry.model.ofy.ObjectifyService.ofy; -import static google.registry.persistence.transaction.TransactionManagerFactory.tm; import static google.registry.testing.DatastoreHelper.newContactResource; import static google.registry.testing.DatastoreHelper.newDomainBase; import static google.registry.testing.DatastoreHelper.newRegistry; -import com.google.appengine.api.datastore.Entity; import com.google.common.collect.ImmutableList; import com.googlecode.objectify.Key; import google.registry.backup.VersionedEntity; @@ -41,6 +38,7 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.joda.time.DateTime; import org.junit.After; @@ -53,7 +51,7 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; /** - * Unit tests for {@link ExportLoadingTransforms}. + * Unit tests for {@link Transforms} related to loading Datastore exports. * *

This class implements {@link Serializable} so that test {@link DoFn} classes may be inlined. */ @@ -79,9 +77,11 @@ public class ExportloadingTransformsTest implements Serializable { private FakeClock fakeClock; private transient BackupTestStore store; private File exportDir; - // Canned data that are persisted to Datastore, used by assertions in tests. - // TODO(weiminyu): use Ofy entity pojos directly. - private transient ImmutableList persistedEntities; + + // Canned data: + private transient Registry registry; + private transient ContactResource contact; + private transient DomainBase domain; @Before public void beforeEach() throws Exception { @@ -89,15 +89,17 @@ public class ExportloadingTransformsTest implements Serializable { store = new BackupTestStore(fakeClock); injectRule.setStaticField(Ofy.class, "clock", fakeClock); - Registry registry = newRegistry("tld1", "TLD1"); + registry = newRegistry("tld1", "TLD1"); store.insertOrUpdate(registry); - ContactResource contact1 = newContactResource("contact_1"); - DomainBase domain1 = newDomainBase("domain1.tld1", contact1); - store.insertOrUpdate(contact1, domain1); - persistedEntities = - ImmutableList.of(registry, contact1, domain1).stream() - .map(ofyEntity -> tm().transact(() -> ofy().save().toEntity(ofyEntity))) - .collect(ImmutableList.toImmutableList()); + + contact = newContactResource("contact_1"); + domain = newDomainBase("domain1.tld1", contact); + store.insertOrUpdate(contact, domain); + + // Save persisted data for assertions. + registry = (Registry) store.loadAsOfyEntity(registry); + contact = (ContactResource) store.loadAsOfyEntity(contact); + domain = (DomainBase) store.loadAsOfyEntity(domain); exportDir = store.export(exportRootDir.getRoot().getAbsolutePath(), ALL_KINDS, Collections.EMPTY_SET); @@ -117,8 +119,7 @@ public class ExportloadingTransformsTest implements Serializable { PCollection patterns = pipeline.apply( "Get Datastore file patterns", - ExportLoadingTransforms.getDatastoreExportFilePatterns( - exportDir.getAbsolutePath(), ALL_KIND_STRS)); + Transforms.getDatastoreExportFilePatterns(exportDir.getAbsolutePath(), ALL_KIND_STRS)); ImmutableList expectedPatterns = ImmutableList.of( @@ -172,29 +173,20 @@ public class ExportloadingTransformsTest implements Serializable { @Test public void loadDataFromFiles() { - PCollection taggedRecords = + PCollection entities = pipeline .apply( "Get Datastore file patterns", - ExportLoadingTransforms.getDatastoreExportFilePatterns( + Transforms.getDatastoreExportFilePatterns( exportDir.getAbsolutePath(), ALL_KIND_STRS)) .apply("Find Datastore files", Transforms.getFilesByPatterns()) - .apply("Load from Datastore files", ExportLoadingTransforms.loadExportDataFromFiles()); + .apply("Load from Datastore files", Transforms.loadExportDataFromFiles()); - // Transform bytes to pojo for analysis - PCollection entities = - taggedRecords.apply( - "Raw records to Entity", - ParDo.of( - new DoFn() { - @ProcessElement - public void processElement( - @Element VersionedEntity versionedEntity, OutputReceiver out) { - out.output(versionedEntity.getEntity().get()); - } - })); - - PAssert.that(entities).containsInAnyOrder(persistedEntities); + InitSqlTestUtils.assertContainsExactlyElementsIn( + entities, + KV.of(Transforms.EXPORT_ENTITY_TIME_STAMP, store.loadAsDatastoreEntity(registry)), + KV.of(Transforms.EXPORT_ENTITY_TIME_STAMP, store.loadAsDatastoreEntity(contact)), + KV.of(Transforms.EXPORT_ENTITY_TIME_STAMP, store.loadAsDatastoreEntity(domain))); pipeline.run(); } diff --git a/core/src/test/java/google/registry/beam/initsql/InitSqlTestUtils.java b/core/src/test/java/google/registry/beam/initsql/InitSqlTestUtils.java new file mode 100644 index 000000000..6e3f1fb95 --- /dev/null +++ b/core/src/test/java/google/registry/beam/initsql/InitSqlTestUtils.java @@ -0,0 +1,161 @@ +// Copyright 2020 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam.initsql; + +import static com.google.common.truth.Truth8.assertThat; +import static google.registry.model.ofy.ObjectifyService.ofy; +import static org.apache.beam.sdk.values.TypeDescriptors.kvs; +import static org.apache.beam.sdk.values.TypeDescriptors.strings; + +import com.google.appengine.api.datastore.Entity; +import com.google.appengine.api.datastore.EntityTranslator; +import com.google.appengine.api.datastore.Key; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Streams; +import com.google.common.truth.Truth; +import com.google.storage.onestore.v3.OnestoreEntity.EntityProto; +import google.registry.backup.AppEngineEnvironment; +import google.registry.backup.VersionedEntity; +import java.io.Serializable; +import java.util.Collection; +import java.util.stream.Stream; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; + +/** Test helpers for populating SQL with Datastore backups. */ +public final class InitSqlTestUtils { + + /** Converts a Datastore {@link Entity} to an Objectify entity. */ + public static Object datastoreToOfyEntity(Entity entity) { + return ofy().load().fromEntity(entity); + } + + /** Serializes a Datastore {@link Entity} to byte array. */ + public static byte[] entityToBytes(Entity entity) { + return EntityTranslator.convertToPb(entity).toByteArray(); + } + + /** Deserializes raw bytes into {@link Entity}. */ + public static Entity bytesToEntity(byte[] bytes) { + EntityProto proto = new EntityProto(); + proto.parseFrom(bytes); + return EntityTranslator.createFromPb(proto); + } + + /** + * Asserts that the {@code actual} {@link Collection} of {@link VersionedEntity VersionedEntities} + * contains exactly the same elements in the {@code expected} array. + * + *

Each {@code expected} {@link KV key-value pair} refers to a versioned state of an Ofy + * entity. The {@link KV#getKey key} is the timestamp, while the {@link KV#getValue value} is + * either a Datastore {@link Entity} (for an existing entity) or a Datastore {@link Key} (for a + * deleted entity). + * + *

The {@Entity} instances in both actual and expected data are converted to Objectify entities + * so that value-equality checks can be performed. Datastore {@link Entity#equals Entity's equals + * method} only checks key-equality. + */ + @SafeVarargs + public static void assertContainsExactlyElementsIn( + Collection actual, KV... expected) { + assertThat(actual.stream().map(InitSqlTestUtils::rawEntityToOfyWithTimestamp)) + .containsExactlyElementsIn( + Stream.of(expected) + .map(InitSqlTestUtils::expectedToOfyWithTimestamp) + .collect(ImmutableList.toImmutableList())); + } + + /** + * Asserts that the {@code actual} {@link PCollection} of {@link VersionedEntity + * VersionedEntities} contains exactly the same elements in the {@code expected} array. + * + *

This method makes assertions in the pipeline and only use {@link PAssert} on the result. + * This has two advantages over {@code PAssert}: + * + *

    + *
  • It supports assertions on 'containsExactlyElementsIn', which is not available in {@code + * PAssert}. + *
  • It supports assertions on Objectify entities, which {@code PAssert} cannot not do. + * Compared with PAssert-compatible options like {@code google.registry.tools.EntityWrapper} + * or {@link EntityProto}, Objectify entities in Java give better-formatted error messages + * when assertions fail. + *
+ * + *

Each {@code expected} {@link KV key-value pair} refers to a versioned state of an Ofy + * entity. The {@link KV#getKey key} is the timestamp, while the {@link KV#getValue value} is + * either a Datastore {@link Entity} (for an existing entity) or a Datastore {@link Key} (for a + * deleted entity). + * + *

The {@Entity} instances in both actual and expected data are converted to Objectify entities + * so that value-equality checks can be performed. Datastore {@link Entity#equals Entity's equals + * method} only checks key-equality. + */ + @SafeVarargs + public static void assertContainsExactlyElementsIn( + PCollection actual, KV... expected) { + PCollection errMsgs = + actual + .apply( + MapElements.into(kvs(strings(), TypeDescriptor.of(VersionedEntity.class))) + .via(rawEntity -> KV.of("The One Key", rawEntity))) + .apply(GroupByKey.create()) + .apply( + "assertContainsExactlyElementsIn", + ParDo.of( + new DoFn>, String>() { + @ProcessElement + public void processElement( + @Element KV> input, + OutputReceiver out) { + try (AppEngineEnvironment env = new AppEngineEnvironment()) { + ImmutableList> actual = + Streams.stream(input.getValue()) + .map(InitSqlTestUtils::rawEntityToOfyWithTimestamp) + .collect(ImmutableList.toImmutableList()); + try { + Truth.assertThat(actual) + .containsExactlyElementsIn( + Stream.of(expected) + .map(InitSqlTestUtils::expectedToOfyWithTimestamp) + .collect(ImmutableList.toImmutableList())); + } catch (AssertionError e) { + out.output(e.toString()); + } + } + } + })); + PAssert.that(errMsgs).empty(); + } + + private static KV rawEntityToOfyWithTimestamp(VersionedEntity rawEntity) { + return KV.of( + rawEntity.commitTimeMills(), + rawEntity.getEntity().map(InitSqlTestUtils::datastoreToOfyEntity).orElse(rawEntity.key())); + } + + private static KV expectedToOfyWithTimestamp(KV kv) { + return KV.of( + kv.getKey(), + kv.getValue() instanceof Key + ? kv.getValue() + : datastoreToOfyEntity((Entity) kv.getValue())); + } +}