diff --git a/core/src/main/java/google/registry/backup/AppEngineEnvironment.java b/core/src/main/java/google/registry/backup/AppEngineEnvironment.java index f22bf3621..c4ca47466 100644 --- a/core/src/main/java/google/registry/backup/AppEngineEnvironment.java +++ b/core/src/main/java/google/registry/backup/AppEngineEnvironment.java @@ -23,7 +23,8 @@ import java.lang.reflect.Proxy; /** * Sets up a placeholder {@link Environment} on a non-AppEngine platform so that Datastore Entities - * can be deserialized. See {@code DatastoreEntityExtension} in test source for more information. + * can be converted from/to Objectify entities. See {@code DatastoreEntityExtension} in test source + * for more information. */ public class AppEngineEnvironment implements Closeable { diff --git a/core/src/main/java/google/registry/beam/initsql/Transforms.java b/core/src/main/java/google/registry/beam/initsql/Transforms.java index 3f05997dc..1afb034c5 100644 --- a/core/src/main/java/google/registry/beam/initsql/Transforms.java +++ b/core/src/main/java/google/registry/beam/initsql/Transforms.java @@ -16,18 +16,27 @@ package google.registry.beam.initsql; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; import static google.registry.beam.initsql.BackupPaths.getCommitLogTimestamp; import static google.registry.beam.initsql.BackupPaths.getExportFilePatterns; import static google.registry.util.DateTimeUtils.START_OF_TIME; import static google.registry.util.DateTimeUtils.isBeforeOrAt; +import static java.util.Comparator.comparing; +import static org.apache.beam.sdk.values.TypeDescriptors.kvs; +import static org.apache.beam.sdk.values.TypeDescriptors.strings; import avro.shaded.com.google.common.collect.Iterators; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Streams; import google.registry.backup.CommitLogImports; import google.registry.backup.VersionedEntity; import google.registry.tools.LevelDbLogReader; import java.util.Collection; import java.util.Iterator; +import java.util.Optional; +import java.util.Set; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.Compression; import org.apache.beam.sdk.io.FileIO; @@ -36,11 +45,20 @@ import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.ProcessFunction; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.sdk.values.TypeDescriptor; import org.joda.time.DateTime; /** @@ -57,6 +75,110 @@ public final class Transforms { */ @VisibleForTesting static final long EXPORT_ENTITY_TIME_STAMP = START_OF_TIME.getMillis(); + /** + * Returns a {@link TupleTag} that can be used to retrieve entities of the given {@code kind} from + * the Datastore snapshot returned by {@link #loadDatastoreSnapshot}. + */ + public static TupleTag createTagForKind(String kind) { + // When used with PCollectionTuple the result must retain generic type information. + // Both the Generic param and the empty bracket below are important. + return new TupleTag(Transforms.class.getSimpleName() + ":" + kind) {}; + } + + /** + * Composite {@link PTransform transform} that loads the Datastore snapshot at {@code + * commitLogToTime} for caller specified {@code kinds}. + * + *

Caller must provide the location of a Datastore export that started AFTER {@code + * commitLogFromTime} and completed BEFORE {@code commitLogToTime}, as well as the root directory + * of all CommitLog files. + * + *

Selection of {@code commitLogFromTime} and {@code commitLogToTime} should follow the + * guidelines below to ensure that all incremental changes concurrent with the export are covered: + * + *

+ * + *

The output from the returned transform is a {@link PCollectionTuple} consisting of {@link + * VersionedEntity VersionedEntities} grouped into {@link PCollection PCollections} by {@code + * kind}. + */ + public static PTransform loadDatastoreSnapshot( + String exportDir, + String commitLogDir, + DateTime commitLogFromTime, + DateTime commitLogToTime, + Set kinds) { + checkArgument(kinds != null && !kinds.isEmpty(), "At least one kind is expected."); + + // Create tags to collect entities by kind in final step. + final ImmutableMap> outputTags = + kinds.stream() + .collect(ImmutableMap.toImmutableMap(kind -> kind, Transforms::createTagForKind)); + // Arbitrarily select one tag as mainOutTag and put the remaining ones in a TupleTagList. + // This separation is required by ParDo's config API. + Iterator> tagsIt = outputTags.values().iterator(); + final TupleTag mainOutputTag = tagsIt.next(); + final TupleTagList additionalTags = TupleTagList.of(ImmutableList.copyOf(tagsIt)); + + return new PTransform() { + @Override + public PCollectionTuple expand(PBegin input) { + PCollection exportedEntities = + input + .apply("Get export file patterns", getDatastoreExportFilePatterns(exportDir, kinds)) + .apply("Find export files", getFilesByPatterns()) + .apply("Load export data", loadExportDataFromFiles()); + PCollection commitLogEntities = + input + .apply("Get commitlog file patterns", getCommitLogFilePatterns(commitLogDir)) + .apply("Find commitlog files", getFilesByPatterns()) + .apply( + "Filter commitLog by time", + filterCommitLogsByTime(commitLogFromTime, commitLogToTime)) + .apply("Load commitlog data", loadCommitLogsFromFiles(kinds)); + return PCollectionList.of(exportedEntities) + .and(commitLogEntities) + .apply("Merge exports and CommitLogs", Flatten.pCollections()) + .apply( + "Key entities by Datastore Keys", + // Converting to KV instead of KV b/c default coder for Key + // (SerializableCoder) is not deterministic and cannot be used with GroupBy. + MapElements.into(kvs(strings(), TypeDescriptor.of(VersionedEntity.class))) + .via((VersionedEntity e) -> KV.of(e.key().toString(), e))) + .apply("Gather entities by key", GroupByKey.create()) + .apply( + "Output latest version per entity", + ParDo.of( + new DoFn>, VersionedEntity>() { + @ProcessElement + public void processElement( + @Element KV> kv, + MultiOutputReceiver out) { + Optional latest = + Streams.stream(kv.getValue()) + .sorted(comparing(VersionedEntity::commitTimeMills).reversed()) + .findFirst(); + // Throw to abort (after default retries). Investigate, fix, and rerun. + checkState( + latest.isPresent(), "Unexpected key with no data", kv.getKey()); + if (latest.get().isDelete()) { + return; + } + String kind = latest.get().getEntity().get().getKind(); + out.get(outputTags.get(kind)).output(latest.get()); + } + }) + .withOutputTags(mainOutputTag, additionalTags)); + } + }; + } + /** * Returns a {@link PTransform transform} that can generate a collection of patterns that match * all Datastore CommitLog files. @@ -96,7 +218,7 @@ public final class Transforms { * Returns CommitLog files with timestamps between {@code fromTime} (inclusive) and {@code * endTime} (exclusive). */ - public static PTransform, PCollection> + public static PTransform, PCollection> filterCommitLogsByTime(DateTime fromTime, DateTime toTime) { return ParDo.of(new FilterCommitLogFileByTime(fromTime, toTime)); } @@ -114,9 +236,13 @@ public final class Transforms { /** Returns a {@link PTransform} from file {@link Metadata} to {@link VersionedEntity}. */ public static PTransform, PCollection> - loadCommitLogsFromFiles() { + loadCommitLogsFromFiles(Set kinds) { return processFiles( - new BackupFileReader(file -> CommitLogImports.loadEntities(file.open()).iterator())); + new BackupFileReader( + file -> + CommitLogImports.loadEntities(file.open()).stream() + .filter(e -> kinds.contains(e.key().getKind())) + .iterator())); } /** @@ -139,12 +265,11 @@ public final class Transforms { return input .apply(FileIO.readMatches().withCompression(Compression.UNCOMPRESSED)) .apply(transformer.getClass().getSimpleName(), ParDo.of(transformer)); - // TODO(weiminyu): reshuffle to enable dynamic work rebalance per beam dev guide } }; } - private static class FilterCommitLogFileByTime extends DoFn { + private static class FilterCommitLogFileByTime extends DoFn { private final DateTime fromTime; private final DateTime toTime; @@ -161,10 +286,10 @@ public final class Transforms { } @ProcessElement - public void processElement(@Element String fileName, OutputReceiver out) { - DateTime timestamp = getCommitLogTimestamp(fileName); + public void processElement(@Element Metadata fileMeta, OutputReceiver out) { + DateTime timestamp = getCommitLogTimestamp(fileMeta.resourceId().toString()); if (isBeforeOrAt(fromTime, timestamp) && timestamp.isBefore(toTime)) { - out.output(fileName); + out.output(fileMeta); } } } diff --git a/core/src/test/java/google/registry/beam/initsql/BeamJpaModuleTest.java b/core/src/test/java/google/registry/beam/initsql/BeamJpaModuleTest.java index 347c967f3..57249f5e5 100644 --- a/core/src/test/java/google/registry/beam/initsql/BeamJpaModuleTest.java +++ b/core/src/test/java/google/registry/beam/initsql/BeamJpaModuleTest.java @@ -15,8 +15,6 @@ package google.registry.beam.initsql; import static com.google.common.truth.Truth.assertThat; -import static org.hamcrest.Matchers.notNullValue; -import static org.junit.Assume.assumeThat; import google.registry.persistence.NomulusPostgreSql; import google.registry.persistence.transaction.JpaTransactionManager; @@ -25,28 +23,28 @@ import java.io.IOException; import java.io.PrintStream; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIfSystemProperty; +import org.junit.jupiter.api.io.TempDir; import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; /** Unit tests for {@link BeamJpaModule}. */ -@RunWith(JUnit4.class) // TODO(weiminyu): upgrade to JUnit 5. +@Testcontainers public class BeamJpaModuleTest { - @Rule + @Container public PostgreSQLContainer database = new PostgreSQLContainer(NomulusPostgreSql.getDockerTag()); - @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); + @TempDir File tempFolder; private File credentialFile; - @Before + @BeforeEach public void beforeEach() throws IOException { - credentialFile = temporaryFolder.newFile(); + credentialFile = new File(tempFolder, "credential"); new PrintStream(credentialFile) .printf("%s %s %s", database.getJdbcUrl(), database.getUsername(), database.getPassword()) .close(); @@ -76,10 +74,9 @@ public class BeamJpaModuleTest { * information. */ @Test + @EnabledIfSystemProperty(named = "test.gcp_integration.env", matches = "\\S+") public void getJpaTransactionManager_cloudSql_authRequired() { String environmentName = System.getProperty("test.gcp_integration.env"); - assumeThat(environmentName, notNullValue()); - FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create()); JpaTransactionManager jpa = DaggerBeamJpaModule_JpaTransactionManagerComponent.builder() diff --git a/core/src/test/java/google/registry/beam/initsql/CommitLogTransformsTest.java b/core/src/test/java/google/registry/beam/initsql/CommitLogTransformsTest.java index 63df45316..9d2b9215f 100644 --- a/core/src/test/java/google/registry/beam/initsql/CommitLogTransformsTest.java +++ b/core/src/test/java/google/registry/beam/initsql/CommitLogTransformsTest.java @@ -19,6 +19,7 @@ import static google.registry.testing.DatastoreHelper.newDomainBase; import static google.registry.testing.DatastoreHelper.newRegistry; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import google.registry.backup.VersionedEntity; import google.registry.model.contact.ContactResource; import google.registry.model.domain.DomainBase; @@ -27,6 +28,7 @@ import google.registry.model.registry.Registry; import google.registry.testing.FakeClock; import google.registry.testing.InjectRule; import java.io.File; +import java.io.IOException; import java.io.Serializable; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; @@ -152,28 +154,45 @@ public class CommitLogTransformsTest implements Serializable { @Test @Category(NeedsRunner.class) - public void filterCommitLogsByTime() { + public void filterCommitLogsByTime() throws IOException { ImmutableList commitLogFilenames = ImmutableList.of( - "/commit_diff_until_2000-01-01T00:00:00.000Z", - "/commit_diff_until_2000-01-01T00:00:00.001Z", - "/commit_diff_until_2000-01-01T00:00:00.002Z", - "/commit_diff_until_2000-01-01T00:00:00.003Z", - "/commit_diff_until_2000-01-01T00:00:00.004Z"); + "commit_diff_until_2000-01-01T00:00:00.000Z", + "commit_diff_until_2000-01-01T00:00:00.001Z", + "commit_diff_until_2000-01-01T00:00:00.002Z", + "commit_diff_until_2000-01-01T00:00:00.003Z", + "commit_diff_until_2000-01-01T00:00:00.004Z"); + + File commitLogDir = temporaryFolder.newFolder(); + for (String name : commitLogFilenames) { + new File(commitLogDir, name).createNewFile(); + } + PCollection filteredFilenames = pipeline .apply( - "Generate All Filenames", - Create.of(commitLogFilenames).withCoder(StringUtf8Coder.of())) + "Get commitlog file patterns", + Transforms.getCommitLogFilePatterns(commitLogDir.getAbsolutePath())) + .apply("Find commitlog files", Transforms.getFilesByPatterns()) .apply( "Filtered by Time", Transforms.filterCommitLogsByTime( DateTime.parse("2000-01-01T00:00:00.001Z"), - DateTime.parse("2000-01-01T00:00:00.003Z"))); + DateTime.parse("2000-01-01T00:00:00.003Z"))) + .apply( + "Extract path strings", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement( + @Element Metadata fileMeta, OutputReceiver out) { + out.output(fileMeta.resourceId().getFilename()); + } + })); PAssert.that(filteredFilenames) .containsInAnyOrder( - "/commit_diff_until_2000-01-01T00:00:00.001Z", - "/commit_diff_until_2000-01-01T00:00:00.002Z"); + "commit_diff_until_2000-01-01T00:00:00.001Z", + "commit_diff_until_2000-01-01T00:00:00.002Z"); pipeline.run(); } @@ -187,7 +206,9 @@ public class CommitLogTransformsTest implements Serializable { "Get CommitLog file patterns", Transforms.getCommitLogFilePatterns(commitLogsDir.getAbsolutePath())) .apply("Find CommitLogs", Transforms.getFilesByPatterns()) - .apply(Transforms.loadCommitLogsFromFiles()); + .apply( + Transforms.loadCommitLogsFromFiles( + ImmutableSet.of("Registry", "ContactResource", "DomainBase"))); InitSqlTestUtils.assertContainsExactlyElementsIn( entities, @@ -197,4 +218,24 @@ public class CommitLogTransformsTest implements Serializable { pipeline.run(); } + + @Test + @Category(NeedsRunner.class) + public void loadOneCommitLogFile_filterByKind() { + PCollection entities = + pipeline + .apply( + "Get CommitLog file patterns", + Transforms.getCommitLogFilePatterns(commitLogsDir.getAbsolutePath())) + .apply("Find CommitLogs", Transforms.getFilesByPatterns()) + .apply( + Transforms.loadCommitLogsFromFiles(ImmutableSet.of("Registry", "ContactResource"))); + + InitSqlTestUtils.assertContainsExactlyElementsIn( + entities, + KV.of(fakeClock.nowUtc().getMillis() - 2, store.loadAsDatastoreEntity(registry)), + KV.of(fakeClock.nowUtc().getMillis() - 1, store.loadAsDatastoreEntity(contact))); + + pipeline.run(); + } } diff --git a/core/src/test/java/google/registry/beam/initsql/ExportloadingTransformsTest.java b/core/src/test/java/google/registry/beam/initsql/ExportloadingTransformsTest.java index c61038304..8a19ef600 100644 --- a/core/src/test/java/google/registry/beam/initsql/ExportloadingTransformsTest.java +++ b/core/src/test/java/google/registry/beam/initsql/ExportloadingTransformsTest.java @@ -19,6 +19,7 @@ import static google.registry.testing.DatastoreHelper.newDomainBase; import static google.registry.testing.DatastoreHelper.newRegistry; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.googlecode.objectify.Key; import google.registry.backup.VersionedEntity; import google.registry.model.contact.ContactResource; @@ -63,8 +64,8 @@ public class ExportloadingTransformsTest implements Serializable { private static final ImmutableList> ALL_KINDS = ImmutableList.of(Registry.class, ContactResource.class, DomainBase.class); - private static final ImmutableList ALL_KIND_STRS = - ALL_KINDS.stream().map(Key::getKind).collect(ImmutableList.toImmutableList()); + private static final ImmutableSet ALL_KIND_STRS = + ALL_KINDS.stream().map(Key::getKind).collect(ImmutableSet.toImmutableSet()); @Rule public final transient TemporaryFolder exportRootDir = new TemporaryFolder(); diff --git a/core/src/test/java/google/registry/beam/initsql/InitSqlTestUtils.java b/core/src/test/java/google/registry/beam/initsql/InitSqlTestUtils.java index 6e3f1fb95..15cf11996 100644 --- a/core/src/test/java/google/registry/beam/initsql/InitSqlTestUtils.java +++ b/core/src/test/java/google/registry/beam/initsql/InitSqlTestUtils.java @@ -30,6 +30,7 @@ import google.registry.backup.AppEngineEnvironment; import google.registry.backup.VersionedEntity; import java.io.Serializable; import java.util.Collection; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.DoFn; @@ -43,6 +44,9 @@ import org.apache.beam.sdk.values.TypeDescriptor; /** Test helpers for populating SQL with Datastore backups. */ public final class InitSqlTestUtils { + // Generates unique ids to distinguish reused transforms. + private static final AtomicInteger TRANSFORM_ID_GEN = new AtomicInteger(0); + /** Converts a Datastore {@link Entity} to an Objectify entity. */ public static Object datastoreToOfyEntity(Entity entity) { return ofy().load().fromEntity(entity); @@ -114,11 +118,12 @@ public final class InitSqlTestUtils { PCollection errMsgs = actual .apply( + "MapElements_" + TRANSFORM_ID_GEN.getAndIncrement(), MapElements.into(kvs(strings(), TypeDescriptor.of(VersionedEntity.class))) .via(rawEntity -> KV.of("The One Key", rawEntity))) - .apply(GroupByKey.create()) + .apply("GroupByKey_" + TRANSFORM_ID_GEN.getAndIncrement(), GroupByKey.create()) .apply( - "assertContainsExactlyElementsIn", + "assertContainsExactlyElementsIn_" + TRANSFORM_ID_GEN.getAndIncrement(), ParDo.of( new DoFn>, String>() { @ProcessElement diff --git a/core/src/test/java/google/registry/beam/initsql/LoadDatastoreSnapshotTest.java b/core/src/test/java/google/registry/beam/initsql/LoadDatastoreSnapshotTest.java new file mode 100644 index 000000000..44599bcd9 --- /dev/null +++ b/core/src/test/java/google/registry/beam/initsql/LoadDatastoreSnapshotTest.java @@ -0,0 +1,175 @@ +// Copyright 2020 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam.initsql; + +import static google.registry.testing.DatastoreHelper.newContactResource; +import static google.registry.testing.DatastoreHelper.newDomainBase; +import static google.registry.testing.DatastoreHelper.newRegistry; + +import com.google.appengine.api.datastore.Entity; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.googlecode.objectify.Key; +import google.registry.model.contact.ContactResource; +import google.registry.model.domain.DomainAuthInfo; +import google.registry.model.domain.DomainBase; +import google.registry.model.eppcommon.AuthInfo.PasswordAuth; +import google.registry.model.ofy.Ofy; +import google.registry.model.registry.Registry; +import google.registry.testing.FakeClock; +import google.registry.testing.InjectRule; +import java.io.File; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.joda.time.DateTime; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Unit test for {@link Transforms#loadDatastoreSnapshot}. + * + *

The test setup involves three entities, one Registry, one Domain, and two Contacts. Events + * happen in the following order: + * + *

    + *
  1. Registry and a filler Contact are inserted to Datastore. + *
  2. A CommitLog is persisted. + *
  3. Registry is updated. + *
  4. Another Contact and Domain are inserted into Datastore. + *
  5. Datastore is exported, but misses the newly inserted Contact. + *
  6. Filler Contact is deleted. + *
  7. A second CommitLog is persisted. + *
  8. Domain is updated in the Datastore. + *
  9. The third and last CommitLog is persisted. + *
+ * + * The final snapshot includes Registry, Domain, and Contact. This scenario verifies that: + * + * + */ +@RunWith(JUnit4.class) +public class LoadDatastoreSnapshotTest { + private static final DateTime START_TIME = DateTime.parse("2000-01-01T00:00:00.0Z"); + + private static final ImmutableList> ALL_KINDS = + ImmutableList.of(Registry.class, ContactResource.class, DomainBase.class); + private static final ImmutableSet ALL_KIND_STRS = + ALL_KINDS.stream().map(Key::getKind).collect(ImmutableSet.toImmutableSet()); + + @Rule public final transient TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Rule public final transient InjectRule injectRule = new InjectRule(); + + @Rule + public final transient TestPipeline pipeline = + TestPipeline.create().enableAbandonedNodeEnforcement(true); + + private FakeClock fakeClock; + private File exportRootDir; + private File exportDir; + private File commitLogsDir; + + // Canned data: + private transient Entity dsRegistry; + private transient Entity dsContact; + private transient Entity dsDomain; + + private transient DateTime registryLastUpdateTime; + private transient DateTime contactLastUpdateTime; + private transient DateTime domainLastUpdateTime; + + @Before + public void beforeEach() throws Exception { + fakeClock = new FakeClock(START_TIME); + try (BackupTestStore store = new BackupTestStore(fakeClock)) { + injectRule.setStaticField(Ofy.class, "clock", fakeClock); + + exportRootDir = temporaryFolder.newFolder(); + commitLogsDir = temporaryFolder.newFolder(); + + Registry registry = newRegistry("tld1", "TLD1"); + ContactResource fillerContact = newContactResource("contact_filler"); + store.insertOrUpdate(registry, fillerContact); + store.saveCommitLogs(commitLogsDir.getAbsolutePath()); + + registry = + registry + .asBuilder() + .setCreateBillingCost(registry.getStandardCreateCost().plus(1.0d)) + .build(); + registryLastUpdateTime = fakeClock.nowUtc(); + store.insertOrUpdate(registry); + + ContactResource contact = newContactResource("contact"); + DomainBase domain = newDomainBase("domain1.tld1", contact); + contactLastUpdateTime = fakeClock.nowUtc(); + store.insertOrUpdate(contact, domain); + exportDir = + store.export( + exportRootDir.getAbsolutePath(), ALL_KINDS, ImmutableSet.of(Key.create(contact))); + + store.delete(fillerContact); + store.saveCommitLogs(commitLogsDir.getAbsolutePath()); + + domain = + domain + .asBuilder() + .setAuthInfo(DomainAuthInfo.create(PasswordAuth.create("NewPass"))) + .build(); + domainLastUpdateTime = fakeClock.nowUtc(); + store.insertOrUpdate(domain); + store.saveCommitLogs(commitLogsDir.getAbsolutePath()); + + fakeClock.advanceOneMilli(); + + // Save persisted data for assertions. + dsRegistry = store.loadAsDatastoreEntity(registry); + dsContact = store.loadAsDatastoreEntity(contact); + dsDomain = store.loadAsDatastoreEntity(domain); + } + } + + @Test + public void loadDatastoreSnapshot() { + PCollectionTuple snapshot = + pipeline.apply( + Transforms.loadDatastoreSnapshot( + exportDir.getAbsolutePath(), + commitLogsDir.getAbsolutePath(), + START_TIME, + fakeClock.nowUtc(), + ALL_KIND_STRS)); + InitSqlTestUtils.assertContainsExactlyElementsIn( + snapshot.get(Transforms.createTagForKind("DomainBase")), + KV.of(domainLastUpdateTime.getMillis(), dsDomain)); + InitSqlTestUtils.assertContainsExactlyElementsIn( + snapshot.get(Transforms.createTagForKind("Registry")), + KV.of(registryLastUpdateTime.getMillis(), dsRegistry)); + InitSqlTestUtils.assertContainsExactlyElementsIn( + snapshot.get(Transforms.createTagForKind("ContactResource")), + KV.of(contactLastUpdateTime.getMillis(), dsContact)); + pipeline.run(); + } +}