mirror of
https://github.com/google/nomulus.git
synced 2025-05-29 08:50:09 +02:00
Refactor pipline for Datastore backup loading (#628)
* Refactor pipline for Datastore backup loading Refactored pipeline transforms. Added testing utilities that handles assertions better. Investigated and documented challenges in serializing Ofy entities without side effects.
This commit is contained in:
parent
0820b672bb
commit
5fb337b2fe
11 changed files with 517 additions and 375 deletions
|
@ -31,7 +31,7 @@ public class AppEngineEnvironment implements Closeable {
|
||||||
|
|
||||||
private boolean isPlaceHolderNeeded;
|
private boolean isPlaceHolderNeeded;
|
||||||
|
|
||||||
AppEngineEnvironment() {
|
public AppEngineEnvironment() {
|
||||||
isPlaceHolderNeeded = ApiProxy.getCurrentEnvironment() == null;
|
isPlaceHolderNeeded = ApiProxy.getCurrentEnvironment() == null;
|
||||||
// isPlaceHolderNeeded may be true when we are invoked in a test with AppEngineRule.
|
// isPlaceHolderNeeded may be true when we are invoked in a test with AppEngineRule.
|
||||||
if (isPlaceHolderNeeded) {
|
if (isPlaceHolderNeeded) {
|
||||||
|
|
|
@ -28,11 +28,49 @@ import java.util.stream.Stream;
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A Datastore {@link Entity Entity's} timestamped state.
|
* A Datastore {@link Entity Entity's} serialized state with timestamp. The intended use case is a
|
||||||
|
* multi-stage pipeline where an Entity's Java form is not needed in most stages.
|
||||||
*
|
*
|
||||||
* <p>For a new or updated Entity, its ProtocolBuffer bytes are stored along with its {@link Key}.
|
* <p>For a new or updated Entity, its serialized bytes are stored along with its Datastore {@link
|
||||||
* For a deleted entity, only its {@link Key} is stored, and the {@link #entityProtoBytes} is left
|
* Key}. For a deleted entity, only its Datastore {@link Key} is stored, and the {@link
|
||||||
* as null.
|
* #entityProtoBytes} field is left unset.
|
||||||
|
*
|
||||||
|
* <p>Storing raw bytes is motivated by two factors. First, since I/O is frequent and the Java
|
||||||
|
* objects are rarely needed in our target use case, storing raw bytes is the most efficient
|
||||||
|
* approach. More importantly, due to our data model and our customization of {@link
|
||||||
|
* google.registry.model.ofy.ObjectifyService ObjectifyService}, it is challenging to implement a
|
||||||
|
* serializer for Objectify entities that preserves the value of all properties. Without such
|
||||||
|
* serializers, Objectify entities cannot be used in a pipeline.
|
||||||
|
*
|
||||||
|
* <p>Objectify entities do not implement {@link Serializable}, serialization of such objects is as
|
||||||
|
* follows:
|
||||||
|
*
|
||||||
|
* <ul>
|
||||||
|
* <li>Convert an Objectify entity to a Datastore {@link Entity}: {@code
|
||||||
|
* ofy().save().toEntity(..)}
|
||||||
|
* <li>Entity is serializable, but the more efficient approach is to convert an Entity to a
|
||||||
|
* ProtocolBuffer ({@link com.google.storage.onestore.v3.OnestoreEntity.EntityProto}) and then
|
||||||
|
* to raw bytes.
|
||||||
|
* </ul>
|
||||||
|
*
|
||||||
|
* <p>When the first conversion above is applied to an Objectify entity, a property value in the
|
||||||
|
* output may differ from the input in two situations:
|
||||||
|
*
|
||||||
|
* <ul>
|
||||||
|
* <li>If a property is of an assign-on-persist data type, e.g., {@link
|
||||||
|
* google.registry.model.UpdateAutoTimestamp}.
|
||||||
|
* <li>If it is related to CommitLog management, e.g., {@link google.registry.model.EppResource
|
||||||
|
* EppResource.revisions}.
|
||||||
|
* </ul>
|
||||||
|
*
|
||||||
|
* <p>Working around the side effects caused by our customization is difficult. Any solution would
|
||||||
|
* likely rely on Objectify's stack of context. However, many Objectify invocations in our code base
|
||||||
|
* are hardcoded to call the customized version of ObjectifyService, rendering Objectify's stack
|
||||||
|
* useless.
|
||||||
|
*
|
||||||
|
* <p>For now, this inability to use Objectify entities in pipelines is mostly a testing problem: we
|
||||||
|
* can not perform {@link org.apache.beam.sdk.testing.PAssert BEAM pipeline assertions} on Objectify
|
||||||
|
* entities. {@code InitSqlTestUtils.assertContainsExactlyElementsIn} is an example of a workaround.
|
||||||
*
|
*
|
||||||
* <p>Note that {@link Optional java.util.Optional} is not serializable, therefore cannot be used as
|
* <p>Note that {@link Optional java.util.Optional} is not serializable, therefore cannot be used as
|
||||||
* property type in this class.
|
* property type in this class.
|
||||||
|
|
|
@ -15,8 +15,11 @@
|
||||||
package google.registry.beam.initsql;
|
package google.registry.beam.initsql;
|
||||||
|
|
||||||
import static com.google.common.base.Preconditions.checkArgument;
|
import static com.google.common.base.Preconditions.checkArgument;
|
||||||
|
import static com.google.common.base.Preconditions.checkNotNull;
|
||||||
import static com.google.common.base.Strings.isNullOrEmpty;
|
import static com.google.common.base.Strings.isNullOrEmpty;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.collect.Streams;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -45,6 +48,21 @@ public final class BackupPaths {
|
||||||
return String.format(EXPORT_PATTERN_TEMPLATE, exportDir, kind, WILDCARD_CHAR);
|
return String.format(EXPORT_PATTERN_TEMPLATE, exportDir, kind, WILDCARD_CHAR);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns an {@link ImmutableList} of regex patterns that match all Datastore export files of the
|
||||||
|
* given {@code kinds}.
|
||||||
|
*
|
||||||
|
* @param exportDir path to the top directory of a Datastore export
|
||||||
|
* @param kinds all entity 'kinds' to be matched
|
||||||
|
*/
|
||||||
|
public static ImmutableList<String> getExportFilePatterns(
|
||||||
|
String exportDir, Iterable<String> kinds) {
|
||||||
|
checkNotNull(kinds, "kinds");
|
||||||
|
return Streams.stream(kinds)
|
||||||
|
.map(kind -> getExportFileNamePattern(exportDir, kind))
|
||||||
|
.collect(ImmutableList.toImmutableList());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the fully qualified path of a Datastore export file with the given {@code kind} and
|
* Returns the fully qualified path of a Datastore export file with the given {@code kind} and
|
||||||
* {@code shard}.
|
* {@code shard}.
|
||||||
|
@ -60,8 +78,9 @@ public final class BackupPaths {
|
||||||
return String.format(EXPORT_PATTERN_TEMPLATE, exportDir, kind, Integer.toString(shard));
|
return String.format(EXPORT_PATTERN_TEMPLATE, exportDir, kind, Integer.toString(shard));
|
||||||
}
|
}
|
||||||
|
|
||||||
public static String getCommitLogFileNamePattern(String commitLogDir) {
|
/** Returns an {@link ImmutableList} of regex patterns that match all CommitLog files. */
|
||||||
return String.format(COMMIT_LOG_PATTERN_TEMPLATE, commitLogDir);
|
public static ImmutableList<String> getCommitLogFilePatterns(String commitLogDir) {
|
||||||
|
return ImmutableList.of(String.format(COMMIT_LOG_PATTERN_TEMPLATE, commitLogDir));
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Gets the Commit timestamp from a CommitLog file name. */
|
/** Gets the Commit timestamp from a CommitLog file name. */
|
||||||
|
|
|
@ -1,109 +0,0 @@
|
||||||
// Copyright 2020 The Nomulus Authors. All Rights Reserved.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package google.registry.beam.initsql;
|
|
||||||
|
|
||||||
import static com.google.common.base.Preconditions.checkArgument;
|
|
||||||
import static com.google.common.base.Preconditions.checkNotNull;
|
|
||||||
import static google.registry.beam.initsql.BackupPaths.getCommitLogFileNamePattern;
|
|
||||||
import static google.registry.beam.initsql.BackupPaths.getCommitLogTimestamp;
|
|
||||||
import static google.registry.beam.initsql.Transforms.processFiles;
|
|
||||||
import static google.registry.util.DateTimeUtils.isBeforeOrAt;
|
|
||||||
|
|
||||||
import google.registry.backup.CommitLogImports;
|
|
||||||
import google.registry.backup.VersionedEntity;
|
|
||||||
import java.io.IOException;
|
|
||||||
import org.apache.beam.sdk.coders.StringUtf8Coder;
|
|
||||||
import org.apache.beam.sdk.io.FileIO.ReadableFile;
|
|
||||||
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
|
|
||||||
import org.apache.beam.sdk.transforms.Create;
|
|
||||||
import org.apache.beam.sdk.transforms.DoFn;
|
|
||||||
import org.apache.beam.sdk.transforms.PTransform;
|
|
||||||
import org.apache.beam.sdk.transforms.ParDo;
|
|
||||||
import org.apache.beam.sdk.values.PBegin;
|
|
||||||
import org.apache.beam.sdk.values.PCollection;
|
|
||||||
import org.joda.time.DateTime;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* {@link org.apache.beam.sdk.transforms.PTransform Pipeline transforms} for loading from Nomulus
|
|
||||||
* CommitLog files. They are all part of a transformation that loads raw records from a sequence of
|
|
||||||
* Datastore CommitLog files, and are broken apart for testing.
|
|
||||||
*/
|
|
||||||
public class CommitLogTransforms {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns a {@link PTransform transform} that can generate a collection of patterns that match
|
|
||||||
* all Datastore CommitLog files.
|
|
||||||
*/
|
|
||||||
public static PTransform<PBegin, PCollection<String>> getCommitLogFilePatterns(
|
|
||||||
String commitLogDir) {
|
|
||||||
return Create.of(getCommitLogFileNamePattern(commitLogDir)).withCoder(StringUtf8Coder.of());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns files with timestamps between {@code fromTime} (inclusive) and {@code endTime}
|
|
||||||
* (exclusive).
|
|
||||||
*/
|
|
||||||
public static PTransform<PCollection<? extends String>, PCollection<String>>
|
|
||||||
filterCommitLogsByTime(DateTime fromTime, DateTime toTime) {
|
|
||||||
checkNotNull(fromTime, "fromTime");
|
|
||||||
checkNotNull(toTime, "toTime");
|
|
||||||
checkArgument(
|
|
||||||
fromTime.isBefore(toTime),
|
|
||||||
"Invalid time range: fromTime (%s) is before endTime (%s)",
|
|
||||||
fromTime,
|
|
||||||
toTime);
|
|
||||||
return ParDo.of(new FilterCommitLogFileByTime(fromTime, toTime));
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Returns a {@link PTransform} from file {@link Metadata} to {@link VersionedEntity}. */
|
|
||||||
public static PTransform<PCollection<Metadata>, PCollection<VersionedEntity>>
|
|
||||||
loadCommitLogsFromFiles() {
|
|
||||||
return processFiles(new LoadOneCommitLogsFile());
|
|
||||||
}
|
|
||||||
|
|
||||||
static class FilterCommitLogFileByTime extends DoFn<String, String> {
|
|
||||||
private final DateTime fromTime;
|
|
||||||
private final DateTime toTime;
|
|
||||||
|
|
||||||
public FilterCommitLogFileByTime(DateTime fromTime, DateTime toTime) {
|
|
||||||
this.fromTime = fromTime;
|
|
||||||
this.toTime = toTime;
|
|
||||||
}
|
|
||||||
|
|
||||||
@ProcessElement
|
|
||||||
public void processElement(@Element String fileName, OutputReceiver<String> out) {
|
|
||||||
DateTime timestamp = getCommitLogTimestamp(fileName);
|
|
||||||
if (isBeforeOrAt(fromTime, timestamp) && timestamp.isBefore(toTime)) {
|
|
||||||
out.output(fileName);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Reads a CommitLog file and converts its content into {@link VersionedEntity VersionedEntities}.
|
|
||||||
*/
|
|
||||||
static class LoadOneCommitLogsFile extends DoFn<ReadableFile, VersionedEntity> {
|
|
||||||
|
|
||||||
@ProcessElement
|
|
||||||
public void processElement(@Element ReadableFile file, OutputReceiver<VersionedEntity> out) {
|
|
||||||
try {
|
|
||||||
CommitLogImports.loadEntities(file.open()).forEach(out::output);
|
|
||||||
} catch (IOException e) {
|
|
||||||
// Let the pipeline retry the whole file.
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,82 +0,0 @@
|
||||||
// Copyright 2020 The Nomulus Authors. All Rights Reserved.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package google.registry.beam.initsql;
|
|
||||||
|
|
||||||
import static google.registry.beam.initsql.BackupPaths.getExportFileNamePattern;
|
|
||||||
import static google.registry.beam.initsql.Transforms.processFiles;
|
|
||||||
|
|
||||||
import com.google.common.collect.ImmutableList;
|
|
||||||
import google.registry.backup.VersionedEntity;
|
|
||||||
import google.registry.tools.LevelDbLogReader;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Collection;
|
|
||||||
import org.apache.beam.sdk.coders.StringUtf8Coder;
|
|
||||||
import org.apache.beam.sdk.io.FileIO.ReadableFile;
|
|
||||||
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
|
|
||||||
import org.apache.beam.sdk.transforms.Create;
|
|
||||||
import org.apache.beam.sdk.transforms.DoFn;
|
|
||||||
import org.apache.beam.sdk.transforms.PTransform;
|
|
||||||
import org.apache.beam.sdk.values.PBegin;
|
|
||||||
import org.apache.beam.sdk.values.PCollection;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* {@link PTransform Pipeline transforms} for loading from Datastore export files. They are all part
|
|
||||||
* of a transformation that loads raw records from a Datastore export, and are broken apart for
|
|
||||||
* testing.
|
|
||||||
*/
|
|
||||||
public class ExportLoadingTransforms {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns a {@link PTransform transform} that can generate a collection of patterns that match
|
|
||||||
* all Datastore export files of the given {@code kinds}.
|
|
||||||
*/
|
|
||||||
public static PTransform<PBegin, PCollection<String>> getDatastoreExportFilePatterns(
|
|
||||||
String exportDir, Collection<String> kinds) {
|
|
||||||
return Create.of(
|
|
||||||
kinds.stream()
|
|
||||||
.map(kind -> getExportFileNamePattern(exportDir, kind))
|
|
||||||
.collect(ImmutableList.toImmutableList()))
|
|
||||||
.withCoder(StringUtf8Coder.of());
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Returns a {@link PTransform} from file {@link Metadata} to {@link VersionedEntity}. */
|
|
||||||
public static PTransform<PCollection<Metadata>, PCollection<VersionedEntity>>
|
|
||||||
loadExportDataFromFiles() {
|
|
||||||
return processFiles(new LoadOneExportShard());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Reads a LevelDb file and converts each raw record into a {@link VersionedEntity}. All such
|
|
||||||
* entities use {@link Long#MIN_VALUE} as timestamp, so that they go before data from CommitLogs.
|
|
||||||
*
|
|
||||||
* <p>LevelDb files are not seekable because a large object may span multiple blocks. If a
|
|
||||||
* sequential read fails, the file needs to be retried from the beginning.
|
|
||||||
*/
|
|
||||||
private static class LoadOneExportShard extends DoFn<ReadableFile, VersionedEntity> {
|
|
||||||
|
|
||||||
private static final long TIMESTAMP = Long.MIN_VALUE;
|
|
||||||
|
|
||||||
@ProcessElement
|
|
||||||
public void processElement(@Element ReadableFile file, OutputReceiver<VersionedEntity> output) {
|
|
||||||
try {
|
|
||||||
LevelDbLogReader.from(file.open())
|
|
||||||
.forEachRemaining(record -> output.output(VersionedEntity.from(TIMESTAMP, record)));
|
|
||||||
} catch (IOException e) {
|
|
||||||
// Let the pipeline retry the whole file.
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -14,22 +14,66 @@
|
||||||
|
|
||||||
package google.registry.beam.initsql;
|
package google.registry.beam.initsql;
|
||||||
|
|
||||||
|
import static com.google.common.base.Preconditions.checkArgument;
|
||||||
|
import static com.google.common.base.Preconditions.checkNotNull;
|
||||||
|
import static google.registry.beam.initsql.BackupPaths.getCommitLogTimestamp;
|
||||||
|
import static google.registry.beam.initsql.BackupPaths.getExportFilePatterns;
|
||||||
|
import static google.registry.util.DateTimeUtils.START_OF_TIME;
|
||||||
|
import static google.registry.util.DateTimeUtils.isBeforeOrAt;
|
||||||
|
|
||||||
|
import avro.shaded.com.google.common.collect.Iterators;
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import google.registry.backup.CommitLogImports;
|
||||||
import google.registry.backup.VersionedEntity;
|
import google.registry.backup.VersionedEntity;
|
||||||
|
import google.registry.tools.LevelDbLogReader;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import org.apache.beam.sdk.coders.StringUtf8Coder;
|
||||||
import org.apache.beam.sdk.io.Compression;
|
import org.apache.beam.sdk.io.Compression;
|
||||||
import org.apache.beam.sdk.io.FileIO;
|
import org.apache.beam.sdk.io.FileIO;
|
||||||
import org.apache.beam.sdk.io.FileIO.ReadableFile;
|
import org.apache.beam.sdk.io.FileIO.ReadableFile;
|
||||||
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
|
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
|
||||||
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
|
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
|
||||||
|
import org.apache.beam.sdk.transforms.Create;
|
||||||
import org.apache.beam.sdk.transforms.DoFn;
|
import org.apache.beam.sdk.transforms.DoFn;
|
||||||
import org.apache.beam.sdk.transforms.PTransform;
|
import org.apache.beam.sdk.transforms.PTransform;
|
||||||
import org.apache.beam.sdk.transforms.ParDo;
|
import org.apache.beam.sdk.transforms.ParDo;
|
||||||
|
import org.apache.beam.sdk.transforms.ProcessFunction;
|
||||||
|
import org.apache.beam.sdk.values.PBegin;
|
||||||
import org.apache.beam.sdk.values.PCollection;
|
import org.apache.beam.sdk.values.PCollection;
|
||||||
|
import org.joda.time.DateTime;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Common {@link PTransform pipeline transforms} used in pipelines that load from both Datastore
|
* {@link PTransform Pipeline transforms} used in pipelines that load from both Datastore export
|
||||||
* export files and Nomulus CommitLog files.
|
* files and Nomulus CommitLog files.
|
||||||
*/
|
*/
|
||||||
public class Transforms {
|
public final class Transforms {
|
||||||
|
|
||||||
|
private Transforms() {}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The commitTimestamp assigned to all entities loaded from a Datastore export file. The exact
|
||||||
|
* value does not matter, but it must be lower than the timestamps of real CommitLog records.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting static final long EXPORT_ENTITY_TIME_STAMP = START_OF_TIME.getMillis();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a {@link PTransform transform} that can generate a collection of patterns that match
|
||||||
|
* all Datastore CommitLog files.
|
||||||
|
*/
|
||||||
|
public static PTransform<PBegin, PCollection<String>> getCommitLogFilePatterns(
|
||||||
|
String commitLogDir) {
|
||||||
|
return toStringPCollection(BackupPaths.getCommitLogFilePatterns(commitLogDir));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a {@link PTransform transform} that can generate a collection of patterns that match
|
||||||
|
* all Datastore export files of the given {@code kinds}.
|
||||||
|
*/
|
||||||
|
public static PTransform<PBegin, PCollection<String>> getDatastoreExportFilePatterns(
|
||||||
|
String exportDir, Collection<String> kinds) {
|
||||||
|
return toStringPCollection(getExportFilePatterns(exportDir, kinds));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a {@link PTransform} from file name patterns to file {@link Metadata Metadata records}.
|
* Returns a {@link PTransform} from file name patterns to file {@link Metadata Metadata records}.
|
||||||
|
@ -43,11 +87,46 @@ public class Transforms {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns CommitLog files with timestamps between {@code fromTime} (inclusive) and {@code
|
||||||
|
* endTime} (exclusive).
|
||||||
|
*/
|
||||||
|
public static PTransform<PCollection<? extends String>, PCollection<String>>
|
||||||
|
filterCommitLogsByTime(DateTime fromTime, DateTime toTime) {
|
||||||
|
return ParDo.of(new FilterCommitLogFileByTime(fromTime, toTime));
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Returns a {@link PTransform} from file {@link Metadata} to {@link VersionedEntity}. */
|
||||||
|
public static PTransform<PCollection<Metadata>, PCollection<VersionedEntity>>
|
||||||
|
loadExportDataFromFiles() {
|
||||||
|
return processFiles(
|
||||||
|
new BackupFileReader(
|
||||||
|
file ->
|
||||||
|
Iterators.transform(
|
||||||
|
LevelDbLogReader.from(file.open()),
|
||||||
|
(byte[] bytes) -> VersionedEntity.from(EXPORT_ENTITY_TIME_STAMP, bytes))));
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Returns a {@link PTransform} from file {@link Metadata} to {@link VersionedEntity}. */
|
||||||
|
public static PTransform<PCollection<Metadata>, PCollection<VersionedEntity>>
|
||||||
|
loadCommitLogsFromFiles() {
|
||||||
|
return processFiles(
|
||||||
|
new BackupFileReader(file -> CommitLogImports.loadEntities(file.open()).iterator()));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a {@link PTransform} that produces a {@link PCollection} containing all elements in the
|
||||||
|
* given {@link Iterable}.
|
||||||
|
*/
|
||||||
|
static PTransform<PBegin, PCollection<String>> toStringPCollection(Iterable<String> strings) {
|
||||||
|
return Create.of(strings).withCoder(StringUtf8Coder.of());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a {@link PTransform} from file {@link Metadata} to {@link VersionedEntity} using
|
* Returns a {@link PTransform} from file {@link Metadata} to {@link VersionedEntity} using
|
||||||
* caller-provided {@code transformer}.
|
* caller-provided {@code transformer}.
|
||||||
*/
|
*/
|
||||||
public static PTransform<PCollection<Metadata>, PCollection<VersionedEntity>> processFiles(
|
static PTransform<PCollection<Metadata>, PCollection<VersionedEntity>> processFiles(
|
||||||
DoFn<ReadableFile, VersionedEntity> transformer) {
|
DoFn<ReadableFile, VersionedEntity> transformer) {
|
||||||
return new PTransform<PCollection<Metadata>, PCollection<VersionedEntity>>() {
|
return new PTransform<PCollection<Metadata>, PCollection<VersionedEntity>>() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -55,7 +134,62 @@ public class Transforms {
|
||||||
return input
|
return input
|
||||||
.apply(FileIO.readMatches().withCompression(Compression.UNCOMPRESSED))
|
.apply(FileIO.readMatches().withCompression(Compression.UNCOMPRESSED))
|
||||||
.apply(transformer.getClass().getSimpleName(), ParDo.of(transformer));
|
.apply(transformer.getClass().getSimpleName(), ParDo.of(transformer));
|
||||||
|
// TODO(weiminyu): reshuffle to enable dynamic work rebalance per beam dev guide
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class FilterCommitLogFileByTime extends DoFn<String, String> {
|
||||||
|
private final DateTime fromTime;
|
||||||
|
private final DateTime toTime;
|
||||||
|
|
||||||
|
public FilterCommitLogFileByTime(DateTime fromTime, DateTime toTime) {
|
||||||
|
checkNotNull(fromTime, "fromTime");
|
||||||
|
checkNotNull(toTime, "toTime");
|
||||||
|
checkArgument(
|
||||||
|
fromTime.isBefore(toTime),
|
||||||
|
"Invalid time range: fromTime (%s) is before endTime (%s)",
|
||||||
|
fromTime,
|
||||||
|
toTime);
|
||||||
|
this.fromTime = fromTime;
|
||||||
|
this.toTime = toTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
@ProcessElement
|
||||||
|
public void processElement(@Element String fileName, OutputReceiver<String> out) {
|
||||||
|
DateTime timestamp = getCommitLogTimestamp(fileName);
|
||||||
|
if (isBeforeOrAt(fromTime, timestamp) && timestamp.isBefore(toTime)) {
|
||||||
|
out.output(fileName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reads from a Datastore backup file and converts its content into {@link VersionedEntity
|
||||||
|
* VersionedEntities}.
|
||||||
|
*
|
||||||
|
* <p>The input file may be either a LevelDb file from a Datastore export or a CommitLog file
|
||||||
|
* generated by the Nomulus server. In either case, the file contains variable-length records and
|
||||||
|
* must be read sequentially from the beginning. If the read fails, the file needs to be retried
|
||||||
|
* from the beginning.
|
||||||
|
*/
|
||||||
|
private static class BackupFileReader extends DoFn<ReadableFile, VersionedEntity> {
|
||||||
|
private final ProcessFunction<ReadableFile, Iterator<VersionedEntity>> reader;
|
||||||
|
|
||||||
|
private BackupFileReader(ProcessFunction<ReadableFile, Iterator<VersionedEntity>> reader) {
|
||||||
|
this.reader = reader;
|
||||||
|
}
|
||||||
|
|
||||||
|
@ProcessElement
|
||||||
|
public void processElement(@Element ReadableFile file, OutputReceiver<VersionedEntity> out) {
|
||||||
|
try {
|
||||||
|
reader.apply(file).forEachRemaining(out::output);
|
||||||
|
} catch (Exception e) {
|
||||||
|
// Let the pipeline use default retry strategy on the whole file. For GCP Dataflow this
|
||||||
|
// means retrying up to 4 times (may include other files grouped with this one), and failing
|
||||||
|
// the pipeline if no success.
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,15 +18,20 @@ import static com.google.common.base.Preconditions.checkState;
|
||||||
import static google.registry.model.ofy.ObjectifyService.ofy;
|
import static google.registry.model.ofy.ObjectifyService.ofy;
|
||||||
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
|
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
|
||||||
|
|
||||||
|
import com.google.appengine.api.datastore.DatastoreService;
|
||||||
|
import com.google.appengine.api.datastore.DatastoreServiceFactory;
|
||||||
import com.google.appengine.api.datastore.Entity;
|
import com.google.appengine.api.datastore.Entity;
|
||||||
|
import com.google.appengine.api.datastore.EntityNotFoundException;
|
||||||
import com.googlecode.objectify.Key;
|
import com.googlecode.objectify.Key;
|
||||||
import google.registry.backup.CommitLogExports;
|
import google.registry.backup.CommitLogExports;
|
||||||
|
import google.registry.backup.VersionedEntity;
|
||||||
import google.registry.model.ofy.CommitLogCheckpoint;
|
import google.registry.model.ofy.CommitLogCheckpoint;
|
||||||
import google.registry.testing.AppEngineRule;
|
import google.registry.testing.AppEngineRule;
|
||||||
import google.registry.testing.FakeClock;
|
import google.registry.testing.FakeClock;
|
||||||
import google.registry.tools.LevelDbFileBuilder;
|
import google.registry.tools.LevelDbFileBuilder;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.NoSuchElementException;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import org.joda.time.format.DateTimeFormat;
|
import org.joda.time.format.DateTimeFormat;
|
||||||
import org.joda.time.format.DateTimeFormatter;
|
import org.joda.time.format.DateTimeFormatter;
|
||||||
|
@ -49,6 +54,8 @@ class BackupTestStore implements AutoCloseable {
|
||||||
|
|
||||||
private final FakeClock fakeClock;
|
private final FakeClock fakeClock;
|
||||||
private AppEngineRule appEngine;
|
private AppEngineRule appEngine;
|
||||||
|
/** For fetching the persisted Datastore Entity directly. */
|
||||||
|
private DatastoreService datastoreService;
|
||||||
|
|
||||||
private CommitLogCheckpoint prevCommitLogCheckpoint;
|
private CommitLogCheckpoint prevCommitLogCheckpoint;
|
||||||
|
|
||||||
|
@ -61,31 +68,69 @@ class BackupTestStore implements AutoCloseable {
|
||||||
.withClock(fakeClock)
|
.withClock(fakeClock)
|
||||||
.build();
|
.build();
|
||||||
this.appEngine.beforeEach(null);
|
this.appEngine.beforeEach(null);
|
||||||
|
datastoreService = DatastoreServiceFactory.getDatastoreService();
|
||||||
}
|
}
|
||||||
|
|
||||||
void transact(Iterable<Object> deletes, Iterable<Object> newOrUpdated) {
|
/** Returns the timestamp of the transaction. */
|
||||||
|
long transact(Iterable<Object> deletes, Iterable<Object> newOrUpdated) {
|
||||||
|
long timestamp = fakeClock.nowUtc().getMillis();
|
||||||
tm().transact(
|
tm().transact(
|
||||||
() -> {
|
() -> {
|
||||||
ofy().delete().entities(deletes);
|
ofy().delete().entities(deletes);
|
||||||
ofy().save().entities(newOrUpdated);
|
ofy().save().entities(newOrUpdated);
|
||||||
});
|
});
|
||||||
fakeClock.advanceOneMilli();
|
fakeClock.advanceOneMilli();
|
||||||
|
return timestamp;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Inserts or updates {@code entities} in the Datastore. */
|
/**
|
||||||
|
* Inserts or updates {@code entities} in the Datastore and returns the timestamp of this
|
||||||
|
* transaction.
|
||||||
|
*/
|
||||||
@SafeVarargs
|
@SafeVarargs
|
||||||
final void insertOrUpdate(Object... entities) {
|
final long insertOrUpdate(Object... entities) {
|
||||||
|
long timestamp = fakeClock.nowUtc().getMillis();
|
||||||
tm().transact(() -> ofy().save().entities(entities).now());
|
tm().transact(() -> ofy().save().entities(entities).now());
|
||||||
fakeClock.advanceOneMilli();
|
fakeClock.advanceOneMilli();
|
||||||
|
return timestamp;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Deletes {@code entities} from the Datastore. */
|
/** Deletes {@code entities} from the Datastore and returns the timestamp of this transaction. */
|
||||||
@SafeVarargs
|
@SafeVarargs
|
||||||
final void delete(Object... entities) {
|
final long delete(Object... entities) {
|
||||||
|
long timestamp = fakeClock.nowUtc().getMillis();
|
||||||
tm().transact(() -> ofy().delete().entities(entities).now());
|
tm().transact(() -> ofy().delete().entities(entities).now());
|
||||||
fakeClock.advanceOneMilli();
|
fakeClock.advanceOneMilli();
|
||||||
|
return timestamp;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the persisted data that corresponds to {@code ofyEntity} as a Datastore {@link Entity}.
|
||||||
|
*
|
||||||
|
* <p>A typical use case for this method is in a test, when the caller has persisted newly created
|
||||||
|
* Objectify entity and want to find out the values of certain assign-on-persist properties. See
|
||||||
|
* {@link VersionedEntity} for more information.
|
||||||
|
*/
|
||||||
|
Entity loadAsDatastoreEntity(Object ofyEntity) {
|
||||||
|
try {
|
||||||
|
return datastoreService.get(Key.create(ofyEntity).getRaw());
|
||||||
|
} catch (EntityNotFoundException e) {
|
||||||
|
throw new NoSuchElementException(e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the persisted data that corresponds to {@code ofyEntity} as an Objectify entity.
|
||||||
|
*
|
||||||
|
* <p>See {@link #loadAsDatastoreEntity} and {@link VersionedEntity} for more information.
|
||||||
|
*/
|
||||||
|
Object loadAsOfyEntity(Object ofyEntity) {
|
||||||
|
try {
|
||||||
|
return ofy().load().fromEntity(datastoreService.get(Key.create(ofyEntity).getRaw()));
|
||||||
|
} catch (EntityNotFoundException e) {
|
||||||
|
throw new NoSuchElementException(e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
* Exports entities of the caller provided types and returns the directory where data is exported.
|
* Exports entities of the caller provided types and returns the directory where data is exported.
|
||||||
*
|
*
|
||||||
|
@ -96,7 +141,8 @@ class BackupTestStore implements AutoCloseable {
|
||||||
* to simulate an inconsistent export
|
* to simulate an inconsistent export
|
||||||
* @return directory where data is exported
|
* @return directory where data is exported
|
||||||
*/
|
*/
|
||||||
File export(String exportRootPath, Iterable<Class<?>> pojoTypes, Set<Key<?>> excludes)
|
File export(
|
||||||
|
String exportRootPath, Iterable<Class<?>> pojoTypes, Set<Key<? extends Object>> excludes)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
File exportDirectory = getExportDirectory(exportRootPath);
|
File exportDirectory = getExportDirectory(exportRootPath);
|
||||||
for (Class<?> pojoType : pojoTypes) {
|
for (Class<?> pojoType : pojoTypes) {
|
||||||
|
@ -119,8 +165,9 @@ class BackupTestStore implements AutoCloseable {
|
||||||
for (Object pojo : ofy().load().type(pojoType).iterable()) {
|
for (Object pojo : ofy().load().type(pojoType).iterable()) {
|
||||||
if (!excludes.contains(Key.create(pojo))) {
|
if (!excludes.contains(Key.create(pojo))) {
|
||||||
try {
|
try {
|
||||||
builder.addEntity(toEntity(pojo));
|
// Must preserve UpdateTimestamp. Do not use ofy().save().toEntity(pojo)!
|
||||||
} catch (IOException e) {
|
builder.addEntity(datastoreService.get(Key.create(pojo).getRaw()));
|
||||||
|
} catch (Exception e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -144,10 +191,6 @@ class BackupTestStore implements AutoCloseable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private Entity toEntity(Object pojo) {
|
|
||||||
return tm().transactNew(() -> ofy().save().toEntity(pojo));
|
|
||||||
}
|
|
||||||
|
|
||||||
private File getExportDirectory(String exportRootPath) {
|
private File getExportDirectory(String exportRootPath) {
|
||||||
File exportDirectory =
|
File exportDirectory =
|
||||||
new File(exportRootPath, fakeClock.nowUtc().toString(EXPORT_TIMESTAMP_FORMAT));
|
new File(exportRootPath, fakeClock.nowUtc().toString(EXPORT_TIMESTAMP_FORMAT));
|
||||||
|
|
|
@ -18,19 +18,13 @@ import static com.google.common.truth.Truth.assertThat;
|
||||||
import static com.google.common.truth.Truth.assertWithMessage;
|
import static com.google.common.truth.Truth.assertWithMessage;
|
||||||
import static com.google.common.truth.Truth8.assertThat;
|
import static com.google.common.truth.Truth8.assertThat;
|
||||||
import static google.registry.model.common.EntityGroupRoot.getCrossTldKey;
|
import static google.registry.model.common.EntityGroupRoot.getCrossTldKey;
|
||||||
import static google.registry.model.ofy.ObjectifyService.ofy;
|
|
||||||
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
|
|
||||||
import static google.registry.testing.DatastoreHelper.newContactResource;
|
import static google.registry.testing.DatastoreHelper.newContactResource;
|
||||||
import static google.registry.testing.DatastoreHelper.newDomainBase;
|
import static google.registry.testing.DatastoreHelper.newDomainBase;
|
||||||
import static google.registry.testing.DatastoreHelper.newRegistry;
|
import static google.registry.testing.DatastoreHelper.newRegistry;
|
||||||
|
|
||||||
import com.google.appengine.api.datastore.Entity;
|
|
||||||
import com.google.appengine.api.datastore.EntityTranslator;
|
|
||||||
import com.google.common.base.Predicates;
|
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.ImmutableSet;
|
import com.google.common.collect.ImmutableSet;
|
||||||
import com.google.common.collect.Streams;
|
import com.google.common.collect.Streams;
|
||||||
import com.google.storage.onestore.v3.OnestoreEntity.EntityProto;
|
|
||||||
import com.googlecode.objectify.Key;
|
import com.googlecode.objectify.Key;
|
||||||
import google.registry.backup.CommitLogImports;
|
import google.registry.backup.CommitLogImports;
|
||||||
import google.registry.backup.VersionedEntity;
|
import google.registry.backup.VersionedEntity;
|
||||||
|
@ -48,10 +42,9 @@ import java.io.IOException;
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.function.Function;
|
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
import org.apache.beam.sdk.values.KV;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
import org.junit.jupiter.api.AfterEach;
|
import org.junit.jupiter.api.AfterEach;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
@ -63,17 +56,18 @@ import org.junit.jupiter.api.io.TempDir;
|
||||||
public class BackupTestStoreTest {
|
public class BackupTestStoreTest {
|
||||||
private static final DateTime START_TIME = DateTime.parse("2000-01-01T00:00:00.0Z");
|
private static final DateTime START_TIME = DateTime.parse("2000-01-01T00:00:00.0Z");
|
||||||
|
|
||||||
private FakeClock fakeClock;
|
|
||||||
private BackupTestStore store;
|
|
||||||
|
|
||||||
private Registry registry;
|
|
||||||
private ContactResource contact;
|
|
||||||
private DomainBase domain;
|
|
||||||
|
|
||||||
@TempDir File tempDir;
|
@TempDir File tempDir;
|
||||||
|
|
||||||
@RegisterExtension InjectRule injectRule = new InjectRule();
|
@RegisterExtension InjectRule injectRule = new InjectRule();
|
||||||
|
|
||||||
|
private FakeClock fakeClock;
|
||||||
|
private BackupTestStore store;
|
||||||
|
|
||||||
|
// Test data:
|
||||||
|
private Registry registry;
|
||||||
|
private ContactResource contact;
|
||||||
|
private DomainBase domain;
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
void beforeEach() throws Exception {
|
void beforeEach() throws Exception {
|
||||||
fakeClock = new FakeClock(START_TIME);
|
fakeClock = new FakeClock(START_TIME);
|
||||||
|
@ -82,9 +76,15 @@ public class BackupTestStoreTest {
|
||||||
|
|
||||||
registry = newRegistry("tld1", "TLD1");
|
registry = newRegistry("tld1", "TLD1");
|
||||||
store.insertOrUpdate(registry);
|
store.insertOrUpdate(registry);
|
||||||
|
|
||||||
contact = newContactResource("contact_1");
|
contact = newContactResource("contact_1");
|
||||||
domain = newDomainBase("domain1.tld1", contact);
|
domain = newDomainBase("domain1.tld1", contact);
|
||||||
store.insertOrUpdate(contact, domain);
|
store.insertOrUpdate(contact, domain);
|
||||||
|
|
||||||
|
// Save persisted data for assertions.
|
||||||
|
registry = (Registry) store.loadAsOfyEntity(registry);
|
||||||
|
contact = (ContactResource) store.loadAsOfyEntity(contact);
|
||||||
|
domain = (DomainBase) store.loadAsOfyEntity(domain);
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterEach
|
@AfterEach
|
||||||
|
@ -131,39 +131,33 @@ public class BackupTestStoreTest {
|
||||||
void export_dataReadBack() throws IOException {
|
void export_dataReadBack() throws IOException {
|
||||||
String exportRootPath = tempDir.getAbsolutePath();
|
String exportRootPath = tempDir.getAbsolutePath();
|
||||||
File exportFolder = export(exportRootPath, Collections.EMPTY_SET);
|
File exportFolder = export(exportRootPath, Collections.EMPTY_SET);
|
||||||
ImmutableList<String> tldStrings =
|
ImmutableList<Object> loadedRegistries =
|
||||||
loadPropertyFromExportedEntities(
|
loadExportedEntities(new File(exportFolder, "/all_namespaces/kind_Registry/input-0"));
|
||||||
new File(exportFolder, "/all_namespaces/kind_Registry/input-0"),
|
assertThat(loadedRegistries).containsExactly(registry);
|
||||||
Registry.class,
|
|
||||||
Registry::getTldStr);
|
ImmutableList<Object> loadedDomains =
|
||||||
assertThat(tldStrings).containsExactly("tld1");
|
loadExportedEntities(new File(exportFolder, "/all_namespaces/kind_DomainBase/input-0"));
|
||||||
ImmutableList<String> domainStrings =
|
assertThat(loadedDomains).containsExactly(domain);
|
||||||
loadPropertyFromExportedEntities(
|
|
||||||
new File(exportFolder, "/all_namespaces/kind_DomainBase/input-0"),
|
ImmutableList<Object> loadedContacts =
|
||||||
DomainBase.class,
|
loadExportedEntities(
|
||||||
DomainBase::getDomainName);
|
new File(exportFolder, "/all_namespaces/kind_ContactResource/input-0"));
|
||||||
assertThat(domainStrings).containsExactly("domain1.tld1");
|
assertThat(loadedContacts).containsExactly(contact);
|
||||||
ImmutableList<String> contactIds =
|
|
||||||
loadPropertyFromExportedEntities(
|
|
||||||
new File(exportFolder, "/all_namespaces/kind_ContactResource/input-0"),
|
|
||||||
ContactResource.class,
|
|
||||||
ContactResource::getContactId);
|
|
||||||
assertThat(contactIds).containsExactly("contact_1");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void export_excludeSomeEntity() throws IOException {
|
void export_excludeSomeEntity() throws IOException {
|
||||||
store.insertOrUpdate(newRegistry("tld2", "TLD2"));
|
Registry newRegistry = newRegistry("tld2", "TLD2");
|
||||||
|
store.insertOrUpdate(newRegistry);
|
||||||
|
newRegistry = (Registry) store.loadAsOfyEntity(newRegistry);
|
||||||
|
|
||||||
String exportRootPath = tempDir.getAbsolutePath();
|
String exportRootPath = tempDir.getAbsolutePath();
|
||||||
File exportFolder =
|
File exportFolder =
|
||||||
export(
|
export(
|
||||||
exportRootPath, ImmutableSet.of(Key.create(getCrossTldKey(), Registry.class, "tld1")));
|
exportRootPath, ImmutableSet.of(Key.create(getCrossTldKey(), Registry.class, "tld1")));
|
||||||
ImmutableList<String> tlds =
|
ImmutableList<Object> loadedRegistries =
|
||||||
loadPropertyFromExportedEntities(
|
loadExportedEntities(new File(exportFolder, "/all_namespaces/kind_Registry/input-0"));
|
||||||
new File(exportFolder, "/all_namespaces/kind_Registry/input-0"),
|
assertThat(loadedRegistries).containsExactly(newRegistry);
|
||||||
Registry.class,
|
|
||||||
Registry::getTldStr);
|
|
||||||
assertThat(tlds).containsExactly("tld2");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -178,14 +172,11 @@ public class BackupTestStoreTest {
|
||||||
File commitLogFile = store.saveCommitLogs(tempDir.getAbsolutePath());
|
File commitLogFile = store.saveCommitLogs(tempDir.getAbsolutePath());
|
||||||
assertThat(commitLogFile.exists()).isTrue();
|
assertThat(commitLogFile.exists()).isTrue();
|
||||||
ImmutableList<VersionedEntity> mutations = CommitLogImports.loadEntities(commitLogFile);
|
ImmutableList<VersionedEntity> mutations = CommitLogImports.loadEntities(commitLogFile);
|
||||||
assertThat(mutations.stream().map(VersionedEntity::getEntity).map(Optional::get))
|
InitSqlTestUtils.assertContainsExactlyElementsIn(
|
||||||
.containsExactlyElementsIn(toDatastoreEntities(registry, contact, domain));
|
mutations,
|
||||||
// Registry created at -2, contract and domain created at -1.
|
KV.of(fakeClock.nowUtc().getMillis() - 2, store.loadAsDatastoreEntity(registry)),
|
||||||
assertThat(mutations.stream().map(VersionedEntity::commitTimeMills))
|
KV.of(fakeClock.nowUtc().getMillis() - 1, store.loadAsDatastoreEntity(contact)),
|
||||||
.containsExactly(
|
KV.of(fakeClock.nowUtc().getMillis() - 1, store.loadAsDatastoreEntity(domain)));
|
||||||
fakeClock.nowUtc().getMillis() - 2,
|
|
||||||
fakeClock.nowUtc().getMillis() - 1,
|
|
||||||
fakeClock.nowUtc().getMillis() - 1);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -205,18 +196,13 @@ public class BackupTestStoreTest {
|
||||||
.build();
|
.build();
|
||||||
store.insertOrUpdate(domain, newContact);
|
store.insertOrUpdate(domain, newContact);
|
||||||
store.delete(contact);
|
store.delete(contact);
|
||||||
fakeClock.advanceOneMilli();
|
|
||||||
File commitLogFile = store.saveCommitLogs(tempDir.getAbsolutePath());
|
File commitLogFile = store.saveCommitLogs(tempDir.getAbsolutePath());
|
||||||
ImmutableList<VersionedEntity> mutations = CommitLogImports.loadEntities(commitLogFile);
|
ImmutableList<VersionedEntity> mutations = CommitLogImports.loadEntities(commitLogFile);
|
||||||
assertThat(mutations.stream().filter(VersionedEntity::isDelete).map(VersionedEntity::key))
|
InitSqlTestUtils.assertContainsExactlyElementsIn(
|
||||||
.containsExactly(Key.create(contact).getRaw());
|
mutations,
|
||||||
|
KV.of(fakeClock.nowUtc().getMillis() - 1, Key.create(contact).getRaw()),
|
||||||
assertThat(
|
KV.of(fakeClock.nowUtc().getMillis() - 2, store.loadAsDatastoreEntity(domain)),
|
||||||
mutations.stream()
|
KV.of(fakeClock.nowUtc().getMillis() - 2, store.loadAsDatastoreEntity(newContact)));
|
||||||
.filter(Predicates.not(VersionedEntity::isDelete))
|
|
||||||
.map(VersionedEntity::getEntity)
|
|
||||||
.map(Optional::get))
|
|
||||||
.containsExactlyElementsIn(toDatastoreEntities(domain, newContact));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -236,27 +222,10 @@ public class BackupTestStoreTest {
|
||||||
excludes);
|
excludes);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <T> ImmutableList<String> loadPropertyFromExportedEntities(
|
private static ImmutableList<Object> loadExportedEntities(File dataFile) throws IOException {
|
||||||
File dataFile, Class<T> ofyEntityType, Function<T, String> getter) throws IOException {
|
|
||||||
return Streams.stream(LevelDbLogReader.from(dataFile.toPath()))
|
return Streams.stream(LevelDbLogReader.from(dataFile.toPath()))
|
||||||
.map(bytes -> toOfyEntity(bytes, ofyEntityType))
|
.map(InitSqlTestUtils::bytesToEntity)
|
||||||
.map(getter)
|
.map(InitSqlTestUtils::datastoreToOfyEntity)
|
||||||
.collect(ImmutableList.toImmutableList());
|
.collect(ImmutableList.toImmutableList());
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <T> T toOfyEntity(byte[] rawRecord, Class<T> ofyEntityType) {
|
|
||||||
EntityProto proto = new EntityProto();
|
|
||||||
proto.parseFrom(rawRecord);
|
|
||||||
Entity entity = EntityTranslator.createFromPb(proto);
|
|
||||||
return ofyEntityType.cast(ofy().load().fromEntity(entity));
|
|
||||||
}
|
|
||||||
|
|
||||||
@SafeVarargs
|
|
||||||
private static ImmutableList<Entity> toDatastoreEntities(Object... ofyEntities) {
|
|
||||||
return tm().transact(
|
|
||||||
() ->
|
|
||||||
Stream.of(ofyEntities)
|
|
||||||
.map(oe -> ofy().save().toEntity(oe))
|
|
||||||
.collect(ImmutableList.toImmutableList()));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,13 +14,10 @@
|
||||||
|
|
||||||
package google.registry.beam.initsql;
|
package google.registry.beam.initsql;
|
||||||
|
|
||||||
import static google.registry.model.ofy.ObjectifyService.ofy;
|
|
||||||
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
|
|
||||||
import static google.registry.testing.DatastoreHelper.newContactResource;
|
import static google.registry.testing.DatastoreHelper.newContactResource;
|
||||||
import static google.registry.testing.DatastoreHelper.newDomainBase;
|
import static google.registry.testing.DatastoreHelper.newDomainBase;
|
||||||
import static google.registry.testing.DatastoreHelper.newRegistry;
|
import static google.registry.testing.DatastoreHelper.newRegistry;
|
||||||
|
|
||||||
import com.google.appengine.api.datastore.Entity;
|
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import google.registry.backup.VersionedEntity;
|
import google.registry.backup.VersionedEntity;
|
||||||
import google.registry.model.contact.ContactResource;
|
import google.registry.model.contact.ContactResource;
|
||||||
|
@ -39,6 +36,7 @@ import org.apache.beam.sdk.testing.TestPipeline;
|
||||||
import org.apache.beam.sdk.transforms.Create;
|
import org.apache.beam.sdk.transforms.Create;
|
||||||
import org.apache.beam.sdk.transforms.DoFn;
|
import org.apache.beam.sdk.transforms.DoFn;
|
||||||
import org.apache.beam.sdk.transforms.ParDo;
|
import org.apache.beam.sdk.transforms.ParDo;
|
||||||
|
import org.apache.beam.sdk.values.KV;
|
||||||
import org.apache.beam.sdk.values.PCollection;
|
import org.apache.beam.sdk.values.PCollection;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
@ -50,7 +48,7 @@ import org.junit.rules.TemporaryFolder;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
import org.junit.runners.JUnit4;
|
import org.junit.runners.JUnit4;
|
||||||
|
|
||||||
/** Unit tests for {@link CommitLogTransforms}. */
|
/** Unit tests for {@link Transforms} related to loading CommitLogs. */
|
||||||
// TODO(weiminyu): Upgrade to JUnit5 when TestPipeline is upgraded. It is also easy to adapt with
|
// TODO(weiminyu): Upgrade to JUnit5 when TestPipeline is upgraded. It is also easy to adapt with
|
||||||
// a wrapper.
|
// a wrapper.
|
||||||
@RunWith(JUnit4.class)
|
@RunWith(JUnit4.class)
|
||||||
|
@ -69,9 +67,11 @@ public class CommitLogTransformsTest implements Serializable {
|
||||||
private transient BackupTestStore store;
|
private transient BackupTestStore store;
|
||||||
private File commitLogsDir;
|
private File commitLogsDir;
|
||||||
private File firstCommitLogFile;
|
private File firstCommitLogFile;
|
||||||
// Canned data that are persisted to Datastore, used by assertions in tests.
|
|
||||||
// TODO(weiminyu): use Ofy entity pojos directly.
|
// Canned data:
|
||||||
private transient ImmutableList<Entity> persistedEntities;
|
private transient Registry registry;
|
||||||
|
private transient ContactResource contact;
|
||||||
|
private transient DomainBase domain;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void beforeEach() throws Exception {
|
public void beforeEach() throws Exception {
|
||||||
|
@ -79,15 +79,17 @@ public class CommitLogTransformsTest implements Serializable {
|
||||||
store = new BackupTestStore(fakeClock);
|
store = new BackupTestStore(fakeClock);
|
||||||
injectRule.setStaticField(Ofy.class, "clock", fakeClock);
|
injectRule.setStaticField(Ofy.class, "clock", fakeClock);
|
||||||
|
|
||||||
Registry registry = newRegistry("tld1", "TLD1");
|
registry = newRegistry("tld1", "TLD1");
|
||||||
store.insertOrUpdate(registry);
|
store.insertOrUpdate(registry);
|
||||||
ContactResource contact1 = newContactResource("contact_1");
|
contact = newContactResource("contact_1");
|
||||||
DomainBase domain1 = newDomainBase("domain1.tld1", contact1);
|
domain = newDomainBase("domain1.tld1", contact);
|
||||||
store.insertOrUpdate(contact1, domain1);
|
store.insertOrUpdate(contact, domain);
|
||||||
persistedEntities =
|
|
||||||
ImmutableList.of(registry, contact1, domain1).stream()
|
// Save persisted data for assertions.
|
||||||
.map(ofyEntity -> tm().transact(() -> ofy().save().toEntity(ofyEntity)))
|
registry = (Registry) store.loadAsOfyEntity(registry);
|
||||||
.collect(ImmutableList.toImmutableList());
|
contact = (ContactResource) store.loadAsOfyEntity(contact);
|
||||||
|
domain = (DomainBase) store.loadAsOfyEntity(domain);
|
||||||
|
|
||||||
commitLogsDir = temporaryFolder.newFolder();
|
commitLogsDir = temporaryFolder.newFolder();
|
||||||
firstCommitLogFile = store.saveCommitLogs(commitLogsDir.getAbsolutePath());
|
firstCommitLogFile = store.saveCommitLogs(commitLogsDir.getAbsolutePath());
|
||||||
}
|
}
|
||||||
|
@ -106,7 +108,7 @@ public class CommitLogTransformsTest implements Serializable {
|
||||||
PCollection<String> patterns =
|
PCollection<String> patterns =
|
||||||
pipeline.apply(
|
pipeline.apply(
|
||||||
"Get CommitLog file patterns",
|
"Get CommitLog file patterns",
|
||||||
CommitLogTransforms.getCommitLogFilePatterns(commitLogsDir.getAbsolutePath()));
|
Transforms.getCommitLogFilePatterns(commitLogsDir.getAbsolutePath()));
|
||||||
|
|
||||||
ImmutableList<String> expectedPatterns =
|
ImmutableList<String> expectedPatterns =
|
||||||
ImmutableList.of(commitLogsDir.getAbsolutePath() + "/commit_diff_until_*");
|
ImmutableList.of(commitLogsDir.getAbsolutePath() + "/commit_diff_until_*");
|
||||||
|
@ -165,7 +167,7 @@ public class CommitLogTransformsTest implements Serializable {
|
||||||
Create.of(commitLogFilenames).withCoder(StringUtf8Coder.of()))
|
Create.of(commitLogFilenames).withCoder(StringUtf8Coder.of()))
|
||||||
.apply(
|
.apply(
|
||||||
"Filtered by Time",
|
"Filtered by Time",
|
||||||
CommitLogTransforms.filterCommitLogsByTime(
|
Transforms.filterCommitLogsByTime(
|
||||||
DateTime.parse("2000-01-01T00:00:00.001Z"),
|
DateTime.parse("2000-01-01T00:00:00.001Z"),
|
||||||
DateTime.parse("2000-01-01T00:00:00.003Z")));
|
DateTime.parse("2000-01-01T00:00:00.003Z")));
|
||||||
PAssert.that(filteredFilenames)
|
PAssert.that(filteredFilenames)
|
||||||
|
@ -183,40 +185,15 @@ public class CommitLogTransformsTest implements Serializable {
|
||||||
pipeline
|
pipeline
|
||||||
.apply(
|
.apply(
|
||||||
"Get CommitLog file patterns",
|
"Get CommitLog file patterns",
|
||||||
CommitLogTransforms.getCommitLogFilePatterns(commitLogsDir.getAbsolutePath()))
|
Transforms.getCommitLogFilePatterns(commitLogsDir.getAbsolutePath()))
|
||||||
.apply("Find CommitLogs", Transforms.getFilesByPatterns())
|
.apply("Find CommitLogs", Transforms.getFilesByPatterns())
|
||||||
.apply(CommitLogTransforms.loadCommitLogsFromFiles());
|
.apply(Transforms.loadCommitLogsFromFiles());
|
||||||
|
|
||||||
PCollection<Long> timestamps =
|
InitSqlTestUtils.assertContainsExactlyElementsIn(
|
||||||
entities.apply(
|
entities,
|
||||||
"Extract commitTimeMillis",
|
KV.of(fakeClock.nowUtc().getMillis() - 2, store.loadAsDatastoreEntity(registry)),
|
||||||
ParDo.of(
|
KV.of(fakeClock.nowUtc().getMillis() - 1, store.loadAsDatastoreEntity(contact)),
|
||||||
new DoFn<VersionedEntity, Long>() {
|
KV.of(fakeClock.nowUtc().getMillis() - 1, store.loadAsDatastoreEntity(domain)));
|
||||||
@ProcessElement
|
|
||||||
public void processElement(
|
|
||||||
@Element VersionedEntity entity, OutputReceiver<Long> out) {
|
|
||||||
out.output(entity.commitTimeMills());
|
|
||||||
}
|
|
||||||
}));
|
|
||||||
PAssert.that(timestamps)
|
|
||||||
.containsInAnyOrder(
|
|
||||||
fakeClock.nowUtc().getMillis() - 2,
|
|
||||||
fakeClock.nowUtc().getMillis() - 1,
|
|
||||||
fakeClock.nowUtc().getMillis() - 1);
|
|
||||||
|
|
||||||
PCollection<Entity> datastoreEntities =
|
|
||||||
entities.apply(
|
|
||||||
"To Datastore Entities",
|
|
||||||
ParDo.of(
|
|
||||||
new DoFn<VersionedEntity, Entity>() {
|
|
||||||
@ProcessElement
|
|
||||||
public void processElement(
|
|
||||||
@Element VersionedEntity entity, OutputReceiver<Entity> out) {
|
|
||||||
entity.getEntity().ifPresent(out::output);
|
|
||||||
}
|
|
||||||
}));
|
|
||||||
|
|
||||||
PAssert.that(datastoreEntities).containsInAnyOrder(persistedEntities);
|
|
||||||
|
|
||||||
pipeline.run();
|
pipeline.run();
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,13 +14,10 @@
|
||||||
|
|
||||||
package google.registry.beam.initsql;
|
package google.registry.beam.initsql;
|
||||||
|
|
||||||
import static google.registry.model.ofy.ObjectifyService.ofy;
|
|
||||||
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
|
|
||||||
import static google.registry.testing.DatastoreHelper.newContactResource;
|
import static google.registry.testing.DatastoreHelper.newContactResource;
|
||||||
import static google.registry.testing.DatastoreHelper.newDomainBase;
|
import static google.registry.testing.DatastoreHelper.newDomainBase;
|
||||||
import static google.registry.testing.DatastoreHelper.newRegistry;
|
import static google.registry.testing.DatastoreHelper.newRegistry;
|
||||||
|
|
||||||
import com.google.appengine.api.datastore.Entity;
|
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.googlecode.objectify.Key;
|
import com.googlecode.objectify.Key;
|
||||||
import google.registry.backup.VersionedEntity;
|
import google.registry.backup.VersionedEntity;
|
||||||
|
@ -41,6 +38,7 @@ import org.apache.beam.sdk.testing.TestPipeline;
|
||||||
import org.apache.beam.sdk.transforms.Create;
|
import org.apache.beam.sdk.transforms.Create;
|
||||||
import org.apache.beam.sdk.transforms.DoFn;
|
import org.apache.beam.sdk.transforms.DoFn;
|
||||||
import org.apache.beam.sdk.transforms.ParDo;
|
import org.apache.beam.sdk.transforms.ParDo;
|
||||||
|
import org.apache.beam.sdk.values.KV;
|
||||||
import org.apache.beam.sdk.values.PCollection;
|
import org.apache.beam.sdk.values.PCollection;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
@ -53,7 +51,7 @@ import org.junit.runner.RunWith;
|
||||||
import org.junit.runners.JUnit4;
|
import org.junit.runners.JUnit4;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unit tests for {@link ExportLoadingTransforms}.
|
* Unit tests for {@link Transforms} related to loading Datastore exports.
|
||||||
*
|
*
|
||||||
* <p>This class implements {@link Serializable} so that test {@link DoFn} classes may be inlined.
|
* <p>This class implements {@link Serializable} so that test {@link DoFn} classes may be inlined.
|
||||||
*/
|
*/
|
||||||
|
@ -79,9 +77,11 @@ public class ExportloadingTransformsTest implements Serializable {
|
||||||
private FakeClock fakeClock;
|
private FakeClock fakeClock;
|
||||||
private transient BackupTestStore store;
|
private transient BackupTestStore store;
|
||||||
private File exportDir;
|
private File exportDir;
|
||||||
// Canned data that are persisted to Datastore, used by assertions in tests.
|
|
||||||
// TODO(weiminyu): use Ofy entity pojos directly.
|
// Canned data:
|
||||||
private transient ImmutableList<Entity> persistedEntities;
|
private transient Registry registry;
|
||||||
|
private transient ContactResource contact;
|
||||||
|
private transient DomainBase domain;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void beforeEach() throws Exception {
|
public void beforeEach() throws Exception {
|
||||||
|
@ -89,15 +89,17 @@ public class ExportloadingTransformsTest implements Serializable {
|
||||||
store = new BackupTestStore(fakeClock);
|
store = new BackupTestStore(fakeClock);
|
||||||
injectRule.setStaticField(Ofy.class, "clock", fakeClock);
|
injectRule.setStaticField(Ofy.class, "clock", fakeClock);
|
||||||
|
|
||||||
Registry registry = newRegistry("tld1", "TLD1");
|
registry = newRegistry("tld1", "TLD1");
|
||||||
store.insertOrUpdate(registry);
|
store.insertOrUpdate(registry);
|
||||||
ContactResource contact1 = newContactResource("contact_1");
|
|
||||||
DomainBase domain1 = newDomainBase("domain1.tld1", contact1);
|
contact = newContactResource("contact_1");
|
||||||
store.insertOrUpdate(contact1, domain1);
|
domain = newDomainBase("domain1.tld1", contact);
|
||||||
persistedEntities =
|
store.insertOrUpdate(contact, domain);
|
||||||
ImmutableList.of(registry, contact1, domain1).stream()
|
|
||||||
.map(ofyEntity -> tm().transact(() -> ofy().save().toEntity(ofyEntity)))
|
// Save persisted data for assertions.
|
||||||
.collect(ImmutableList.toImmutableList());
|
registry = (Registry) store.loadAsOfyEntity(registry);
|
||||||
|
contact = (ContactResource) store.loadAsOfyEntity(contact);
|
||||||
|
domain = (DomainBase) store.loadAsOfyEntity(domain);
|
||||||
|
|
||||||
exportDir =
|
exportDir =
|
||||||
store.export(exportRootDir.getRoot().getAbsolutePath(), ALL_KINDS, Collections.EMPTY_SET);
|
store.export(exportRootDir.getRoot().getAbsolutePath(), ALL_KINDS, Collections.EMPTY_SET);
|
||||||
|
@ -117,8 +119,7 @@ public class ExportloadingTransformsTest implements Serializable {
|
||||||
PCollection<String> patterns =
|
PCollection<String> patterns =
|
||||||
pipeline.apply(
|
pipeline.apply(
|
||||||
"Get Datastore file patterns",
|
"Get Datastore file patterns",
|
||||||
ExportLoadingTransforms.getDatastoreExportFilePatterns(
|
Transforms.getDatastoreExportFilePatterns(exportDir.getAbsolutePath(), ALL_KIND_STRS));
|
||||||
exportDir.getAbsolutePath(), ALL_KIND_STRS));
|
|
||||||
|
|
||||||
ImmutableList<String> expectedPatterns =
|
ImmutableList<String> expectedPatterns =
|
||||||
ImmutableList.of(
|
ImmutableList.of(
|
||||||
|
@ -172,29 +173,20 @@ public class ExportloadingTransformsTest implements Serializable {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void loadDataFromFiles() {
|
public void loadDataFromFiles() {
|
||||||
PCollection<VersionedEntity> taggedRecords =
|
PCollection<VersionedEntity> entities =
|
||||||
pipeline
|
pipeline
|
||||||
.apply(
|
.apply(
|
||||||
"Get Datastore file patterns",
|
"Get Datastore file patterns",
|
||||||
ExportLoadingTransforms.getDatastoreExportFilePatterns(
|
Transforms.getDatastoreExportFilePatterns(
|
||||||
exportDir.getAbsolutePath(), ALL_KIND_STRS))
|
exportDir.getAbsolutePath(), ALL_KIND_STRS))
|
||||||
.apply("Find Datastore files", Transforms.getFilesByPatterns())
|
.apply("Find Datastore files", Transforms.getFilesByPatterns())
|
||||||
.apply("Load from Datastore files", ExportLoadingTransforms.loadExportDataFromFiles());
|
.apply("Load from Datastore files", Transforms.loadExportDataFromFiles());
|
||||||
|
|
||||||
// Transform bytes to pojo for analysis
|
InitSqlTestUtils.assertContainsExactlyElementsIn(
|
||||||
PCollection<Entity> entities =
|
entities,
|
||||||
taggedRecords.apply(
|
KV.of(Transforms.EXPORT_ENTITY_TIME_STAMP, store.loadAsDatastoreEntity(registry)),
|
||||||
"Raw records to Entity",
|
KV.of(Transforms.EXPORT_ENTITY_TIME_STAMP, store.loadAsDatastoreEntity(contact)),
|
||||||
ParDo.of(
|
KV.of(Transforms.EXPORT_ENTITY_TIME_STAMP, store.loadAsDatastoreEntity(domain)));
|
||||||
new DoFn<VersionedEntity, Entity>() {
|
|
||||||
@ProcessElement
|
|
||||||
public void processElement(
|
|
||||||
@Element VersionedEntity versionedEntity, OutputReceiver<Entity> out) {
|
|
||||||
out.output(versionedEntity.getEntity().get());
|
|
||||||
}
|
|
||||||
}));
|
|
||||||
|
|
||||||
PAssert.that(entities).containsInAnyOrder(persistedEntities);
|
|
||||||
|
|
||||||
pipeline.run();
|
pipeline.run();
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,161 @@
|
||||||
|
// Copyright 2020 The Nomulus Authors. All Rights Reserved.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package google.registry.beam.initsql;
|
||||||
|
|
||||||
|
import static com.google.common.truth.Truth8.assertThat;
|
||||||
|
import static google.registry.model.ofy.ObjectifyService.ofy;
|
||||||
|
import static org.apache.beam.sdk.values.TypeDescriptors.kvs;
|
||||||
|
import static org.apache.beam.sdk.values.TypeDescriptors.strings;
|
||||||
|
|
||||||
|
import com.google.appengine.api.datastore.Entity;
|
||||||
|
import com.google.appengine.api.datastore.EntityTranslator;
|
||||||
|
import com.google.appengine.api.datastore.Key;
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.collect.Streams;
|
||||||
|
import com.google.common.truth.Truth;
|
||||||
|
import com.google.storage.onestore.v3.OnestoreEntity.EntityProto;
|
||||||
|
import google.registry.backup.AppEngineEnvironment;
|
||||||
|
import google.registry.backup.VersionedEntity;
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
import org.apache.beam.sdk.testing.PAssert;
|
||||||
|
import org.apache.beam.sdk.transforms.DoFn;
|
||||||
|
import org.apache.beam.sdk.transforms.GroupByKey;
|
||||||
|
import org.apache.beam.sdk.transforms.MapElements;
|
||||||
|
import org.apache.beam.sdk.transforms.ParDo;
|
||||||
|
import org.apache.beam.sdk.values.KV;
|
||||||
|
import org.apache.beam.sdk.values.PCollection;
|
||||||
|
import org.apache.beam.sdk.values.TypeDescriptor;
|
||||||
|
|
||||||
|
/** Test helpers for populating SQL with Datastore backups. */
|
||||||
|
public final class InitSqlTestUtils {
|
||||||
|
|
||||||
|
/** Converts a Datastore {@link Entity} to an Objectify entity. */
|
||||||
|
public static Object datastoreToOfyEntity(Entity entity) {
|
||||||
|
return ofy().load().fromEntity(entity);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Serializes a Datastore {@link Entity} to byte array. */
|
||||||
|
public static byte[] entityToBytes(Entity entity) {
|
||||||
|
return EntityTranslator.convertToPb(entity).toByteArray();
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Deserializes raw bytes into {@link Entity}. */
|
||||||
|
public static Entity bytesToEntity(byte[] bytes) {
|
||||||
|
EntityProto proto = new EntityProto();
|
||||||
|
proto.parseFrom(bytes);
|
||||||
|
return EntityTranslator.createFromPb(proto);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Asserts that the {@code actual} {@link Collection} of {@link VersionedEntity VersionedEntities}
|
||||||
|
* contains exactly the same elements in the {@code expected} array.
|
||||||
|
*
|
||||||
|
* <p>Each {@code expected} {@link KV key-value pair} refers to a versioned state of an Ofy
|
||||||
|
* entity. The {@link KV#getKey key} is the timestamp, while the {@link KV#getValue value} is
|
||||||
|
* either a Datastore {@link Entity} (for an existing entity) or a Datastore {@link Key} (for a
|
||||||
|
* deleted entity).
|
||||||
|
*
|
||||||
|
* <p>The {@Entity} instances in both actual and expected data are converted to Objectify entities
|
||||||
|
* so that value-equality checks can be performed. Datastore {@link Entity#equals Entity's equals
|
||||||
|
* method} only checks key-equality.
|
||||||
|
*/
|
||||||
|
@SafeVarargs
|
||||||
|
public static void assertContainsExactlyElementsIn(
|
||||||
|
Collection<VersionedEntity> actual, KV<Long, Serializable>... expected) {
|
||||||
|
assertThat(actual.stream().map(InitSqlTestUtils::rawEntityToOfyWithTimestamp))
|
||||||
|
.containsExactlyElementsIn(
|
||||||
|
Stream.of(expected)
|
||||||
|
.map(InitSqlTestUtils::expectedToOfyWithTimestamp)
|
||||||
|
.collect(ImmutableList.toImmutableList()));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Asserts that the {@code actual} {@link PCollection} of {@link VersionedEntity
|
||||||
|
* VersionedEntities} contains exactly the same elements in the {@code expected} array.
|
||||||
|
*
|
||||||
|
* <p>This method makes assertions in the pipeline and only use {@link PAssert} on the result.
|
||||||
|
* This has two advantages over {@code PAssert}:
|
||||||
|
*
|
||||||
|
* <ul>
|
||||||
|
* <li>It supports assertions on 'containsExactlyElementsIn', which is not available in {@code
|
||||||
|
* PAssert}.
|
||||||
|
* <li>It supports assertions on Objectify entities, which {@code PAssert} cannot not do.
|
||||||
|
* Compared with PAssert-compatible options like {@code google.registry.tools.EntityWrapper}
|
||||||
|
* or {@link EntityProto}, Objectify entities in Java give better-formatted error messages
|
||||||
|
* when assertions fail.
|
||||||
|
* </ul>
|
||||||
|
*
|
||||||
|
* <p>Each {@code expected} {@link KV key-value pair} refers to a versioned state of an Ofy
|
||||||
|
* entity. The {@link KV#getKey key} is the timestamp, while the {@link KV#getValue value} is
|
||||||
|
* either a Datastore {@link Entity} (for an existing entity) or a Datastore {@link Key} (for a
|
||||||
|
* deleted entity).
|
||||||
|
*
|
||||||
|
* <p>The {@Entity} instances in both actual and expected data are converted to Objectify entities
|
||||||
|
* so that value-equality checks can be performed. Datastore {@link Entity#equals Entity's equals
|
||||||
|
* method} only checks key-equality.
|
||||||
|
*/
|
||||||
|
@SafeVarargs
|
||||||
|
public static void assertContainsExactlyElementsIn(
|
||||||
|
PCollection<VersionedEntity> actual, KV<Long, Serializable>... expected) {
|
||||||
|
PCollection<String> errMsgs =
|
||||||
|
actual
|
||||||
|
.apply(
|
||||||
|
MapElements.into(kvs(strings(), TypeDescriptor.of(VersionedEntity.class)))
|
||||||
|
.via(rawEntity -> KV.of("The One Key", rawEntity)))
|
||||||
|
.apply(GroupByKey.create())
|
||||||
|
.apply(
|
||||||
|
"assertContainsExactlyElementsIn",
|
||||||
|
ParDo.of(
|
||||||
|
new DoFn<KV<String, Iterable<VersionedEntity>>, String>() {
|
||||||
|
@ProcessElement
|
||||||
|
public void processElement(
|
||||||
|
@Element KV<String, Iterable<VersionedEntity>> input,
|
||||||
|
OutputReceiver<String> out) {
|
||||||
|
try (AppEngineEnvironment env = new AppEngineEnvironment()) {
|
||||||
|
ImmutableList<KV<Long, Object>> actual =
|
||||||
|
Streams.stream(input.getValue())
|
||||||
|
.map(InitSqlTestUtils::rawEntityToOfyWithTimestamp)
|
||||||
|
.collect(ImmutableList.toImmutableList());
|
||||||
|
try {
|
||||||
|
Truth.assertThat(actual)
|
||||||
|
.containsExactlyElementsIn(
|
||||||
|
Stream.of(expected)
|
||||||
|
.map(InitSqlTestUtils::expectedToOfyWithTimestamp)
|
||||||
|
.collect(ImmutableList.toImmutableList()));
|
||||||
|
} catch (AssertionError e) {
|
||||||
|
out.output(e.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
PAssert.that(errMsgs).empty();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static KV<Long, Object> rawEntityToOfyWithTimestamp(VersionedEntity rawEntity) {
|
||||||
|
return KV.of(
|
||||||
|
rawEntity.commitTimeMills(),
|
||||||
|
rawEntity.getEntity().map(InitSqlTestUtils::datastoreToOfyEntity).orElse(rawEntity.key()));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static KV<Long, Object> expectedToOfyWithTimestamp(KV<Long, Serializable> kv) {
|
||||||
|
return KV.of(
|
||||||
|
kv.getKey(),
|
||||||
|
kv.getValue() instanceof Key
|
||||||
|
? kv.getValue()
|
||||||
|
: datastoreToOfyEntity((Entity) kv.getValue()));
|
||||||
|
}
|
||||||
|
}
|
Loading…
Add table
Add a link
Reference in a new issue