mirror of
https://github.com/google/nomulus.git
synced 2025-07-06 19:23:31 +02:00
Load Datastore snapshot from backup files (#660)
* Load Datastore snapshot from backup files Defined a composite transform that loads from a Datastore export and concurrent CommitLog files, identify entities that still exist at the end of the time window, and resolve their latest states in the window.
This commit is contained in:
parent
d065ff63fc
commit
1961a5759d
7 changed files with 385 additions and 40 deletions
|
@ -23,7 +23,8 @@ import java.lang.reflect.Proxy;
|
|||
|
||||
/**
|
||||
* Sets up a placeholder {@link Environment} on a non-AppEngine platform so that Datastore Entities
|
||||
* can be deserialized. See {@code DatastoreEntityExtension} in test source for more information.
|
||||
* can be converted from/to Objectify entities. See {@code DatastoreEntityExtension} in test source
|
||||
* for more information.
|
||||
*/
|
||||
public class AppEngineEnvironment implements Closeable {
|
||||
|
||||
|
|
|
@ -16,18 +16,27 @@ package google.registry.beam.initsql;
|
|||
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
import static com.google.common.base.Preconditions.checkState;
|
||||
import static google.registry.beam.initsql.BackupPaths.getCommitLogTimestamp;
|
||||
import static google.registry.beam.initsql.BackupPaths.getExportFilePatterns;
|
||||
import static google.registry.util.DateTimeUtils.START_OF_TIME;
|
||||
import static google.registry.util.DateTimeUtils.isBeforeOrAt;
|
||||
import static java.util.Comparator.comparing;
|
||||
import static org.apache.beam.sdk.values.TypeDescriptors.kvs;
|
||||
import static org.apache.beam.sdk.values.TypeDescriptors.strings;
|
||||
|
||||
import avro.shaded.com.google.common.collect.Iterators;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Streams;
|
||||
import google.registry.backup.CommitLogImports;
|
||||
import google.registry.backup.VersionedEntity;
|
||||
import google.registry.tools.LevelDbLogReader;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import org.apache.beam.sdk.coders.StringUtf8Coder;
|
||||
import org.apache.beam.sdk.io.Compression;
|
||||
import org.apache.beam.sdk.io.FileIO;
|
||||
|
@ -36,11 +45,20 @@ import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
|
|||
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
|
||||
import org.apache.beam.sdk.transforms.Create;
|
||||
import org.apache.beam.sdk.transforms.DoFn;
|
||||
import org.apache.beam.sdk.transforms.Flatten;
|
||||
import org.apache.beam.sdk.transforms.GroupByKey;
|
||||
import org.apache.beam.sdk.transforms.MapElements;
|
||||
import org.apache.beam.sdk.transforms.PTransform;
|
||||
import org.apache.beam.sdk.transforms.ParDo;
|
||||
import org.apache.beam.sdk.transforms.ProcessFunction;
|
||||
import org.apache.beam.sdk.values.KV;
|
||||
import org.apache.beam.sdk.values.PBegin;
|
||||
import org.apache.beam.sdk.values.PCollection;
|
||||
import org.apache.beam.sdk.values.PCollectionList;
|
||||
import org.apache.beam.sdk.values.PCollectionTuple;
|
||||
import org.apache.beam.sdk.values.TupleTag;
|
||||
import org.apache.beam.sdk.values.TupleTagList;
|
||||
import org.apache.beam.sdk.values.TypeDescriptor;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
/**
|
||||
|
@ -57,6 +75,110 @@ public final class Transforms {
|
|||
*/
|
||||
@VisibleForTesting static final long EXPORT_ENTITY_TIME_STAMP = START_OF_TIME.getMillis();
|
||||
|
||||
/**
|
||||
* Returns a {@link TupleTag} that can be used to retrieve entities of the given {@code kind} from
|
||||
* the Datastore snapshot returned by {@link #loadDatastoreSnapshot}.
|
||||
*/
|
||||
public static TupleTag<VersionedEntity> createTagForKind(String kind) {
|
||||
// When used with PCollectionTuple the result must retain generic type information.
|
||||
// Both the Generic param and the empty bracket below are important.
|
||||
return new TupleTag<VersionedEntity>(Transforms.class.getSimpleName() + ":" + kind) {};
|
||||
}
|
||||
|
||||
/**
|
||||
* Composite {@link PTransform transform} that loads the Datastore snapshot at {@code
|
||||
* commitLogToTime} for caller specified {@code kinds}.
|
||||
*
|
||||
* <p>Caller must provide the location of a Datastore export that started AFTER {@code
|
||||
* commitLogFromTime} and completed BEFORE {@code commitLogToTime}, as well as the root directory
|
||||
* of all CommitLog files.
|
||||
*
|
||||
* <p>Selection of {@code commitLogFromTime} and {@code commitLogToTime} should follow the
|
||||
* guidelines below to ensure that all incremental changes concurrent with the export are covered:
|
||||
*
|
||||
* <ul>
|
||||
* <li>Two or more CommitLogs should exist between {@code commitLogFromTime} and the starting
|
||||
* time of the Datastore export. This ensures that the earlier CommitLog file was complete
|
||||
* before the export started.
|
||||
* <li>Two or more CommitLogs should exit between the export completion time and {@code
|
||||
* commitLogToTime}.
|
||||
* </ul>
|
||||
*
|
||||
* <p>The output from the returned transform is a {@link PCollectionTuple} consisting of {@link
|
||||
* VersionedEntity VersionedEntities} grouped into {@link PCollection PCollections} by {@code
|
||||
* kind}.
|
||||
*/
|
||||
public static PTransform<PBegin, PCollectionTuple> loadDatastoreSnapshot(
|
||||
String exportDir,
|
||||
String commitLogDir,
|
||||
DateTime commitLogFromTime,
|
||||
DateTime commitLogToTime,
|
||||
Set<String> kinds) {
|
||||
checkArgument(kinds != null && !kinds.isEmpty(), "At least one kind is expected.");
|
||||
|
||||
// Create tags to collect entities by kind in final step.
|
||||
final ImmutableMap<String, TupleTag<VersionedEntity>> outputTags =
|
||||
kinds.stream()
|
||||
.collect(ImmutableMap.toImmutableMap(kind -> kind, Transforms::createTagForKind));
|
||||
// Arbitrarily select one tag as mainOutTag and put the remaining ones in a TupleTagList.
|
||||
// This separation is required by ParDo's config API.
|
||||
Iterator<TupleTag<VersionedEntity>> tagsIt = outputTags.values().iterator();
|
||||
final TupleTag<VersionedEntity> mainOutputTag = tagsIt.next();
|
||||
final TupleTagList additionalTags = TupleTagList.of(ImmutableList.copyOf(tagsIt));
|
||||
|
||||
return new PTransform<PBegin, PCollectionTuple>() {
|
||||
@Override
|
||||
public PCollectionTuple expand(PBegin input) {
|
||||
PCollection<VersionedEntity> exportedEntities =
|
||||
input
|
||||
.apply("Get export file patterns", getDatastoreExportFilePatterns(exportDir, kinds))
|
||||
.apply("Find export files", getFilesByPatterns())
|
||||
.apply("Load export data", loadExportDataFromFiles());
|
||||
PCollection<VersionedEntity> commitLogEntities =
|
||||
input
|
||||
.apply("Get commitlog file patterns", getCommitLogFilePatterns(commitLogDir))
|
||||
.apply("Find commitlog files", getFilesByPatterns())
|
||||
.apply(
|
||||
"Filter commitLog by time",
|
||||
filterCommitLogsByTime(commitLogFromTime, commitLogToTime))
|
||||
.apply("Load commitlog data", loadCommitLogsFromFiles(kinds));
|
||||
return PCollectionList.of(exportedEntities)
|
||||
.and(commitLogEntities)
|
||||
.apply("Merge exports and CommitLogs", Flatten.pCollections())
|
||||
.apply(
|
||||
"Key entities by Datastore Keys",
|
||||
// Converting to KV<String, VE> instead of KV<Key, VE> b/c default coder for Key
|
||||
// (SerializableCoder) is not deterministic and cannot be used with GroupBy.
|
||||
MapElements.into(kvs(strings(), TypeDescriptor.of(VersionedEntity.class)))
|
||||
.via((VersionedEntity e) -> KV.of(e.key().toString(), e)))
|
||||
.apply("Gather entities by key", GroupByKey.create())
|
||||
.apply(
|
||||
"Output latest version per entity",
|
||||
ParDo.of(
|
||||
new DoFn<KV<String, Iterable<VersionedEntity>>, VersionedEntity>() {
|
||||
@ProcessElement
|
||||
public void processElement(
|
||||
@Element KV<String, Iterable<VersionedEntity>> kv,
|
||||
MultiOutputReceiver out) {
|
||||
Optional<VersionedEntity> latest =
|
||||
Streams.stream(kv.getValue())
|
||||
.sorted(comparing(VersionedEntity::commitTimeMills).reversed())
|
||||
.findFirst();
|
||||
// Throw to abort (after default retries). Investigate, fix, and rerun.
|
||||
checkState(
|
||||
latest.isPresent(), "Unexpected key with no data", kv.getKey());
|
||||
if (latest.get().isDelete()) {
|
||||
return;
|
||||
}
|
||||
String kind = latest.get().getEntity().get().getKind();
|
||||
out.get(outputTags.get(kind)).output(latest.get());
|
||||
}
|
||||
})
|
||||
.withOutputTags(mainOutputTag, additionalTags));
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a {@link PTransform transform} that can generate a collection of patterns that match
|
||||
* all Datastore CommitLog files.
|
||||
|
@ -96,7 +218,7 @@ public final class Transforms {
|
|||
* Returns CommitLog files with timestamps between {@code fromTime} (inclusive) and {@code
|
||||
* endTime} (exclusive).
|
||||
*/
|
||||
public static PTransform<PCollection<? extends String>, PCollection<String>>
|
||||
public static PTransform<PCollection<? extends Metadata>, PCollection<Metadata>>
|
||||
filterCommitLogsByTime(DateTime fromTime, DateTime toTime) {
|
||||
return ParDo.of(new FilterCommitLogFileByTime(fromTime, toTime));
|
||||
}
|
||||
|
@ -114,9 +236,13 @@ public final class Transforms {
|
|||
|
||||
/** Returns a {@link PTransform} from file {@link Metadata} to {@link VersionedEntity}. */
|
||||
public static PTransform<PCollection<Metadata>, PCollection<VersionedEntity>>
|
||||
loadCommitLogsFromFiles() {
|
||||
loadCommitLogsFromFiles(Set<String> kinds) {
|
||||
return processFiles(
|
||||
new BackupFileReader(file -> CommitLogImports.loadEntities(file.open()).iterator()));
|
||||
new BackupFileReader(
|
||||
file ->
|
||||
CommitLogImports.loadEntities(file.open()).stream()
|
||||
.filter(e -> kinds.contains(e.key().getKind()))
|
||||
.iterator()));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -139,12 +265,11 @@ public final class Transforms {
|
|||
return input
|
||||
.apply(FileIO.readMatches().withCompression(Compression.UNCOMPRESSED))
|
||||
.apply(transformer.getClass().getSimpleName(), ParDo.of(transformer));
|
||||
// TODO(weiminyu): reshuffle to enable dynamic work rebalance per beam dev guide
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static class FilterCommitLogFileByTime extends DoFn<String, String> {
|
||||
private static class FilterCommitLogFileByTime extends DoFn<Metadata, Metadata> {
|
||||
private final DateTime fromTime;
|
||||
private final DateTime toTime;
|
||||
|
||||
|
@ -161,10 +286,10 @@ public final class Transforms {
|
|||
}
|
||||
|
||||
@ProcessElement
|
||||
public void processElement(@Element String fileName, OutputReceiver<String> out) {
|
||||
DateTime timestamp = getCommitLogTimestamp(fileName);
|
||||
public void processElement(@Element Metadata fileMeta, OutputReceiver<Metadata> out) {
|
||||
DateTime timestamp = getCommitLogTimestamp(fileMeta.resourceId().toString());
|
||||
if (isBeforeOrAt(fromTime, timestamp) && timestamp.isBefore(toTime)) {
|
||||
out.output(fileName);
|
||||
out.output(fileMeta);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,8 +15,6 @@
|
|||
package google.registry.beam.initsql;
|
||||
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.junit.Assume.assumeThat;
|
||||
|
||||
import google.registry.persistence.NomulusPostgreSql;
|
||||
import google.registry.persistence.transaction.JpaTransactionManager;
|
||||
|
@ -25,28 +23,28 @@ import java.io.IOException;
|
|||
import java.io.PrintStream;
|
||||
import org.apache.beam.sdk.io.FileSystems;
|
||||
import org.apache.beam.sdk.options.PipelineOptionsFactory;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.JUnit4;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
|
||||
import org.junit.jupiter.api.io.TempDir;
|
||||
import org.testcontainers.containers.PostgreSQLContainer;
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
import org.testcontainers.junit.jupiter.Testcontainers;
|
||||
|
||||
/** Unit tests for {@link BeamJpaModule}. */
|
||||
@RunWith(JUnit4.class) // TODO(weiminyu): upgrade to JUnit 5.
|
||||
@Testcontainers
|
||||
public class BeamJpaModuleTest {
|
||||
|
||||
@Rule
|
||||
@Container
|
||||
public PostgreSQLContainer database = new PostgreSQLContainer(NomulusPostgreSql.getDockerTag());
|
||||
|
||||
@Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
|
||||
@TempDir File tempFolder;
|
||||
|
||||
private File credentialFile;
|
||||
|
||||
@Before
|
||||
@BeforeEach
|
||||
public void beforeEach() throws IOException {
|
||||
credentialFile = temporaryFolder.newFile();
|
||||
credentialFile = new File(tempFolder, "credential");
|
||||
new PrintStream(credentialFile)
|
||||
.printf("%s %s %s", database.getJdbcUrl(), database.getUsername(), database.getPassword())
|
||||
.close();
|
||||
|
@ -76,10 +74,9 @@ public class BeamJpaModuleTest {
|
|||
* information.
|
||||
*/
|
||||
@Test
|
||||
@EnabledIfSystemProperty(named = "test.gcp_integration.env", matches = "\\S+")
|
||||
public void getJpaTransactionManager_cloudSql_authRequired() {
|
||||
String environmentName = System.getProperty("test.gcp_integration.env");
|
||||
assumeThat(environmentName, notNullValue());
|
||||
|
||||
FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create());
|
||||
JpaTransactionManager jpa =
|
||||
DaggerBeamJpaModule_JpaTransactionManagerComponent.builder()
|
||||
|
|
|
@ -19,6 +19,7 @@ import static google.registry.testing.DatastoreHelper.newDomainBase;
|
|||
import static google.registry.testing.DatastoreHelper.newRegistry;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import google.registry.backup.VersionedEntity;
|
||||
import google.registry.model.contact.ContactResource;
|
||||
import google.registry.model.domain.DomainBase;
|
||||
|
@ -27,6 +28,7 @@ import google.registry.model.registry.Registry;
|
|||
import google.registry.testing.FakeClock;
|
||||
import google.registry.testing.InjectRule;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import org.apache.beam.sdk.coders.StringUtf8Coder;
|
||||
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
|
||||
|
@ -152,28 +154,45 @@ public class CommitLogTransformsTest implements Serializable {
|
|||
|
||||
@Test
|
||||
@Category(NeedsRunner.class)
|
||||
public void filterCommitLogsByTime() {
|
||||
public void filterCommitLogsByTime() throws IOException {
|
||||
ImmutableList<String> commitLogFilenames =
|
||||
ImmutableList.of(
|
||||
"/commit_diff_until_2000-01-01T00:00:00.000Z",
|
||||
"/commit_diff_until_2000-01-01T00:00:00.001Z",
|
||||
"/commit_diff_until_2000-01-01T00:00:00.002Z",
|
||||
"/commit_diff_until_2000-01-01T00:00:00.003Z",
|
||||
"/commit_diff_until_2000-01-01T00:00:00.004Z");
|
||||
"commit_diff_until_2000-01-01T00:00:00.000Z",
|
||||
"commit_diff_until_2000-01-01T00:00:00.001Z",
|
||||
"commit_diff_until_2000-01-01T00:00:00.002Z",
|
||||
"commit_diff_until_2000-01-01T00:00:00.003Z",
|
||||
"commit_diff_until_2000-01-01T00:00:00.004Z");
|
||||
|
||||
File commitLogDir = temporaryFolder.newFolder();
|
||||
for (String name : commitLogFilenames) {
|
||||
new File(commitLogDir, name).createNewFile();
|
||||
}
|
||||
|
||||
PCollection<String> filteredFilenames =
|
||||
pipeline
|
||||
.apply(
|
||||
"Generate All Filenames",
|
||||
Create.of(commitLogFilenames).withCoder(StringUtf8Coder.of()))
|
||||
"Get commitlog file patterns",
|
||||
Transforms.getCommitLogFilePatterns(commitLogDir.getAbsolutePath()))
|
||||
.apply("Find commitlog files", Transforms.getFilesByPatterns())
|
||||
.apply(
|
||||
"Filtered by Time",
|
||||
Transforms.filterCommitLogsByTime(
|
||||
DateTime.parse("2000-01-01T00:00:00.001Z"),
|
||||
DateTime.parse("2000-01-01T00:00:00.003Z")));
|
||||
DateTime.parse("2000-01-01T00:00:00.003Z")))
|
||||
.apply(
|
||||
"Extract path strings",
|
||||
ParDo.of(
|
||||
new DoFn<Metadata, String>() {
|
||||
@ProcessElement
|
||||
public void processElement(
|
||||
@Element Metadata fileMeta, OutputReceiver<String> out) {
|
||||
out.output(fileMeta.resourceId().getFilename());
|
||||
}
|
||||
}));
|
||||
PAssert.that(filteredFilenames)
|
||||
.containsInAnyOrder(
|
||||
"/commit_diff_until_2000-01-01T00:00:00.001Z",
|
||||
"/commit_diff_until_2000-01-01T00:00:00.002Z");
|
||||
"commit_diff_until_2000-01-01T00:00:00.001Z",
|
||||
"commit_diff_until_2000-01-01T00:00:00.002Z");
|
||||
|
||||
pipeline.run();
|
||||
}
|
||||
|
@ -187,7 +206,9 @@ public class CommitLogTransformsTest implements Serializable {
|
|||
"Get CommitLog file patterns",
|
||||
Transforms.getCommitLogFilePatterns(commitLogsDir.getAbsolutePath()))
|
||||
.apply("Find CommitLogs", Transforms.getFilesByPatterns())
|
||||
.apply(Transforms.loadCommitLogsFromFiles());
|
||||
.apply(
|
||||
Transforms.loadCommitLogsFromFiles(
|
||||
ImmutableSet.of("Registry", "ContactResource", "DomainBase")));
|
||||
|
||||
InitSqlTestUtils.assertContainsExactlyElementsIn(
|
||||
entities,
|
||||
|
@ -197,4 +218,24 @@ public class CommitLogTransformsTest implements Serializable {
|
|||
|
||||
pipeline.run();
|
||||
}
|
||||
|
||||
@Test
|
||||
@Category(NeedsRunner.class)
|
||||
public void loadOneCommitLogFile_filterByKind() {
|
||||
PCollection<VersionedEntity> entities =
|
||||
pipeline
|
||||
.apply(
|
||||
"Get CommitLog file patterns",
|
||||
Transforms.getCommitLogFilePatterns(commitLogsDir.getAbsolutePath()))
|
||||
.apply("Find CommitLogs", Transforms.getFilesByPatterns())
|
||||
.apply(
|
||||
Transforms.loadCommitLogsFromFiles(ImmutableSet.of("Registry", "ContactResource")));
|
||||
|
||||
InitSqlTestUtils.assertContainsExactlyElementsIn(
|
||||
entities,
|
||||
KV.of(fakeClock.nowUtc().getMillis() - 2, store.loadAsDatastoreEntity(registry)),
|
||||
KV.of(fakeClock.nowUtc().getMillis() - 1, store.loadAsDatastoreEntity(contact)));
|
||||
|
||||
pipeline.run();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ import static google.registry.testing.DatastoreHelper.newDomainBase;
|
|||
import static google.registry.testing.DatastoreHelper.newRegistry;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.googlecode.objectify.Key;
|
||||
import google.registry.backup.VersionedEntity;
|
||||
import google.registry.model.contact.ContactResource;
|
||||
|
@ -63,8 +64,8 @@ public class ExportloadingTransformsTest implements Serializable {
|
|||
|
||||
private static final ImmutableList<Class<?>> ALL_KINDS =
|
||||
ImmutableList.of(Registry.class, ContactResource.class, DomainBase.class);
|
||||
private static final ImmutableList<String> ALL_KIND_STRS =
|
||||
ALL_KINDS.stream().map(Key::getKind).collect(ImmutableList.toImmutableList());
|
||||
private static final ImmutableSet<String> ALL_KIND_STRS =
|
||||
ALL_KINDS.stream().map(Key::getKind).collect(ImmutableSet.toImmutableSet());
|
||||
|
||||
@Rule public final transient TemporaryFolder exportRootDir = new TemporaryFolder();
|
||||
|
||||
|
|
|
@ -30,6 +30,7 @@ import google.registry.backup.AppEngineEnvironment;
|
|||
import google.registry.backup.VersionedEntity;
|
||||
import java.io.Serializable;
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Stream;
|
||||
import org.apache.beam.sdk.testing.PAssert;
|
||||
import org.apache.beam.sdk.transforms.DoFn;
|
||||
|
@ -43,6 +44,9 @@ import org.apache.beam.sdk.values.TypeDescriptor;
|
|||
/** Test helpers for populating SQL with Datastore backups. */
|
||||
public final class InitSqlTestUtils {
|
||||
|
||||
// Generates unique ids to distinguish reused transforms.
|
||||
private static final AtomicInteger TRANSFORM_ID_GEN = new AtomicInteger(0);
|
||||
|
||||
/** Converts a Datastore {@link Entity} to an Objectify entity. */
|
||||
public static Object datastoreToOfyEntity(Entity entity) {
|
||||
return ofy().load().fromEntity(entity);
|
||||
|
@ -114,11 +118,12 @@ public final class InitSqlTestUtils {
|
|||
PCollection<String> errMsgs =
|
||||
actual
|
||||
.apply(
|
||||
"MapElements_" + TRANSFORM_ID_GEN.getAndIncrement(),
|
||||
MapElements.into(kvs(strings(), TypeDescriptor.of(VersionedEntity.class)))
|
||||
.via(rawEntity -> KV.of("The One Key", rawEntity)))
|
||||
.apply(GroupByKey.create())
|
||||
.apply("GroupByKey_" + TRANSFORM_ID_GEN.getAndIncrement(), GroupByKey.create())
|
||||
.apply(
|
||||
"assertContainsExactlyElementsIn",
|
||||
"assertContainsExactlyElementsIn_" + TRANSFORM_ID_GEN.getAndIncrement(),
|
||||
ParDo.of(
|
||||
new DoFn<KV<String, Iterable<VersionedEntity>>, String>() {
|
||||
@ProcessElement
|
||||
|
|
|
@ -0,0 +1,175 @@
|
|||
// Copyright 2020 The Nomulus Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package google.registry.beam.initsql;
|
||||
|
||||
import static google.registry.testing.DatastoreHelper.newContactResource;
|
||||
import static google.registry.testing.DatastoreHelper.newDomainBase;
|
||||
import static google.registry.testing.DatastoreHelper.newRegistry;
|
||||
|
||||
import com.google.appengine.api.datastore.Entity;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.googlecode.objectify.Key;
|
||||
import google.registry.model.contact.ContactResource;
|
||||
import google.registry.model.domain.DomainAuthInfo;
|
||||
import google.registry.model.domain.DomainBase;
|
||||
import google.registry.model.eppcommon.AuthInfo.PasswordAuth;
|
||||
import google.registry.model.ofy.Ofy;
|
||||
import google.registry.model.registry.Registry;
|
||||
import google.registry.testing.FakeClock;
|
||||
import google.registry.testing.InjectRule;
|
||||
import java.io.File;
|
||||
import org.apache.beam.sdk.testing.TestPipeline;
|
||||
import org.apache.beam.sdk.values.KV;
|
||||
import org.apache.beam.sdk.values.PCollectionTuple;
|
||||
import org.joda.time.DateTime;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.JUnit4;
|
||||
|
||||
/**
|
||||
* Unit test for {@link Transforms#loadDatastoreSnapshot}.
|
||||
*
|
||||
* <p>The test setup involves three entities, one Registry, one Domain, and two Contacts. Events
|
||||
* happen in the following order:
|
||||
*
|
||||
* <ol>
|
||||
* <li>Registry and a filler Contact are inserted to Datastore.
|
||||
* <li>A CommitLog is persisted.
|
||||
* <li>Registry is updated.
|
||||
* <li>Another Contact and Domain are inserted into Datastore.
|
||||
* <li>Datastore is exported, but misses the newly inserted Contact.
|
||||
* <li>Filler Contact is deleted.
|
||||
* <li>A second CommitLog is persisted.
|
||||
* <li>Domain is updated in the Datastore.
|
||||
* <li>The third and last CommitLog is persisted.
|
||||
* </ol>
|
||||
*
|
||||
* The final snapshot includes Registry, Domain, and Contact. This scenario verifies that:
|
||||
*
|
||||
* <ul>
|
||||
* <li>Incremental changes committed before an export does not override the exported valie.
|
||||
* <li>Entity missed by an export can be recovered from later CommitLogs.
|
||||
* <li>Multiple changes to an entity is applied in order.
|
||||
* <li>Deletes are properly handled.
|
||||
* </ul>
|
||||
*/
|
||||
@RunWith(JUnit4.class)
|
||||
public class LoadDatastoreSnapshotTest {
|
||||
private static final DateTime START_TIME = DateTime.parse("2000-01-01T00:00:00.0Z");
|
||||
|
||||
private static final ImmutableList<Class<?>> ALL_KINDS =
|
||||
ImmutableList.of(Registry.class, ContactResource.class, DomainBase.class);
|
||||
private static final ImmutableSet<String> ALL_KIND_STRS =
|
||||
ALL_KINDS.stream().map(Key::getKind).collect(ImmutableSet.toImmutableSet());
|
||||
|
||||
@Rule public final transient TemporaryFolder temporaryFolder = new TemporaryFolder();
|
||||
|
||||
@Rule public final transient InjectRule injectRule = new InjectRule();
|
||||
|
||||
@Rule
|
||||
public final transient TestPipeline pipeline =
|
||||
TestPipeline.create().enableAbandonedNodeEnforcement(true);
|
||||
|
||||
private FakeClock fakeClock;
|
||||
private File exportRootDir;
|
||||
private File exportDir;
|
||||
private File commitLogsDir;
|
||||
|
||||
// Canned data:
|
||||
private transient Entity dsRegistry;
|
||||
private transient Entity dsContact;
|
||||
private transient Entity dsDomain;
|
||||
|
||||
private transient DateTime registryLastUpdateTime;
|
||||
private transient DateTime contactLastUpdateTime;
|
||||
private transient DateTime domainLastUpdateTime;
|
||||
|
||||
@Before
|
||||
public void beforeEach() throws Exception {
|
||||
fakeClock = new FakeClock(START_TIME);
|
||||
try (BackupTestStore store = new BackupTestStore(fakeClock)) {
|
||||
injectRule.setStaticField(Ofy.class, "clock", fakeClock);
|
||||
|
||||
exportRootDir = temporaryFolder.newFolder();
|
||||
commitLogsDir = temporaryFolder.newFolder();
|
||||
|
||||
Registry registry = newRegistry("tld1", "TLD1");
|
||||
ContactResource fillerContact = newContactResource("contact_filler");
|
||||
store.insertOrUpdate(registry, fillerContact);
|
||||
store.saveCommitLogs(commitLogsDir.getAbsolutePath());
|
||||
|
||||
registry =
|
||||
registry
|
||||
.asBuilder()
|
||||
.setCreateBillingCost(registry.getStandardCreateCost().plus(1.0d))
|
||||
.build();
|
||||
registryLastUpdateTime = fakeClock.nowUtc();
|
||||
store.insertOrUpdate(registry);
|
||||
|
||||
ContactResource contact = newContactResource("contact");
|
||||
DomainBase domain = newDomainBase("domain1.tld1", contact);
|
||||
contactLastUpdateTime = fakeClock.nowUtc();
|
||||
store.insertOrUpdate(contact, domain);
|
||||
exportDir =
|
||||
store.export(
|
||||
exportRootDir.getAbsolutePath(), ALL_KINDS, ImmutableSet.of(Key.create(contact)));
|
||||
|
||||
store.delete(fillerContact);
|
||||
store.saveCommitLogs(commitLogsDir.getAbsolutePath());
|
||||
|
||||
domain =
|
||||
domain
|
||||
.asBuilder()
|
||||
.setAuthInfo(DomainAuthInfo.create(PasswordAuth.create("NewPass")))
|
||||
.build();
|
||||
domainLastUpdateTime = fakeClock.nowUtc();
|
||||
store.insertOrUpdate(domain);
|
||||
store.saveCommitLogs(commitLogsDir.getAbsolutePath());
|
||||
|
||||
fakeClock.advanceOneMilli();
|
||||
|
||||
// Save persisted data for assertions.
|
||||
dsRegistry = store.loadAsDatastoreEntity(registry);
|
||||
dsContact = store.loadAsDatastoreEntity(contact);
|
||||
dsDomain = store.loadAsDatastoreEntity(domain);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void loadDatastoreSnapshot() {
|
||||
PCollectionTuple snapshot =
|
||||
pipeline.apply(
|
||||
Transforms.loadDatastoreSnapshot(
|
||||
exportDir.getAbsolutePath(),
|
||||
commitLogsDir.getAbsolutePath(),
|
||||
START_TIME,
|
||||
fakeClock.nowUtc(),
|
||||
ALL_KIND_STRS));
|
||||
InitSqlTestUtils.assertContainsExactlyElementsIn(
|
||||
snapshot.get(Transforms.createTagForKind("DomainBase")),
|
||||
KV.of(domainLastUpdateTime.getMillis(), dsDomain));
|
||||
InitSqlTestUtils.assertContainsExactlyElementsIn(
|
||||
snapshot.get(Transforms.createTagForKind("Registry")),
|
||||
KV.of(registryLastUpdateTime.getMillis(), dsRegistry));
|
||||
InitSqlTestUtils.assertContainsExactlyElementsIn(
|
||||
snapshot.get(Transforms.createTagForKind("ContactResource")),
|
||||
KV.of(contactLastUpdateTime.getMillis(), dsContact));
|
||||
pipeline.run();
|
||||
}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue