Load CommitLog into BEAM pipeline (#618)

* Load CommitLog into BEAM pipeline

Created tools that can generate CommitLogs on a test
datastore.

Defined BEAM transforms that load from CommitLog files and
added a few simple tests.

This is work-in-progress. Next step is to build a consistent
backup by merging Datastore exports and CommitLogs.
This commit is contained in:
Weimin Yu 2020-06-11 11:38:53 -04:00 committed by GitHub
parent 1c62728886
commit d3fd826dc1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 1111 additions and 149 deletions

View file

@ -0,0 +1,231 @@
// Copyright 2017 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.backup;
import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterables.concat;
import static com.google.common.collect.Lists.partition;
import static google.registry.backup.BackupUtils.serializeEntity;
import static google.registry.model.ofy.CommitLogBucket.getBucketKey;
import static google.registry.model.ofy.ObjectifyService.ofy;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import static google.registry.util.DateTimeUtils.START_OF_TIME;
import static google.registry.util.DateTimeUtils.isAtOrAfter;
import static java.util.Comparator.comparingLong;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Streams;
import com.googlecode.objectify.Key;
import google.registry.model.ImmutableObject;
import google.registry.model.ofy.CommitLogBucket;
import google.registry.model.ofy.CommitLogCheckpoint;
import google.registry.model.ofy.CommitLogCheckpointRoot;
import google.registry.model.ofy.CommitLogManifest;
import google.registry.model.ofy.CommitLogMutation;
import google.registry.util.Clock;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import org.joda.time.DateTime;
/**
* Helpers for exporting the diff between two commit log checkpoints to a local file.
*
* <p>In production, CommitLogs are saved periodically by cron jobs. During each job, the {@link
* CommitLogCheckpointAction} is invoked first to compute a {@link CommitLogCheckpoint} and persist
* it in Datastore. Then the {@link ExportCommitLogDiffAction} is invoked to export the diffs
* accumulated between the previous and current checkpoints to a file.
*
* <p>The {@link #computeCheckpoint(Clock)} method is copied with simplification from {@link
* CommitLogCheckpointAction}, and the {@link #saveCommitLogs(String, CommitLogCheckpoint,
* CommitLogCheckpoint)} method is copied with simplification from {@link
* ExportCommitLogDiffAction}. We opted for copying instead of refactoring to reduce risk to
* production code.
*/
public final class CommitLogExports {
public static final String DIFF_FILE_PREFIX = "commit_diff_until_";
private static final int EXPORT_DIFF_BATCH_SIZE = 100;
private CommitLogExports() {}
/**
* Returns the next {@link CommitLogCheckpoint} for Commit logs. Please refer to the class javadoc
* for background.
*/
public static CommitLogCheckpoint computeCheckpoint(Clock clock) {
CommitLogCheckpointStrategy strategy = new CommitLogCheckpointStrategy();
strategy.clock = clock;
strategy.ofy = ofy();
CommitLogCheckpoint checkpoint = strategy.computeCheckpoint();
tm().transact(
() -> {
DateTime lastWrittenTime = CommitLogCheckpointRoot.loadRoot().getLastWrittenTime();
checkState(
checkpoint.getCheckpointTime().isAfter(lastWrittenTime),
"Newer checkpoint already written at time: %s",
lastWrittenTime);
ofy()
.saveWithoutBackup()
.entities(
checkpoint, CommitLogCheckpointRoot.create(checkpoint.getCheckpointTime()));
});
return checkpoint;
}
/**
* Saves the incremental changes between {@code prevCheckpoint} and {@code checkpoint} and returns
* the {@link File}. Please refer to class javadoc for background.
*/
public static File saveCommitLogs(
String commitLogDir,
@Nullable CommitLogCheckpoint prevCheckpoint,
CommitLogCheckpoint checkpoint) {
checkArgument(
prevCheckpoint == null
|| (isAtOrAfter(prevCheckpoint.getCheckpointTime(), START_OF_TIME)
&& prevCheckpoint.getCheckpointTime().isBefore(checkpoint.getCheckpointTime())),
"Inversed checkpoint: prev is %s, current is %s.",
Optional.ofNullable(prevCheckpoint)
.map(CommitLogCheckpoint::getCheckpointTime)
.map(DateTime::toString)
.orElse("null"),
checkpoint.getCheckpointTime().toString());
// Load the keys of all the manifests to include in this diff.
List<Key<CommitLogManifest>> sortedKeys = loadAllDiffKeys(prevCheckpoint, checkpoint);
// Open an output channel to GCS, wrapped in a stream for convenience.
File commitLogFile =
new File(commitLogDir + "/" + DIFF_FILE_PREFIX + checkpoint.getCheckpointTime());
try (OutputStream commitLogStream =
new BufferedOutputStream(new FileOutputStream(commitLogFile))) {
// Export the upper checkpoint itself.
serializeEntity(checkpoint, commitLogStream);
// If there are no manifests to export, stop early, now that we've written out the file with
// the checkpoint itself (which is needed for restores, even if it's empty).
if (sortedKeys.isEmpty()) {
return commitLogFile;
}
// Export to GCS in chunks, one per fixed batch of commit logs. While processing one batch,
// asynchronously load the entities for the next one.
List<List<Key<CommitLogManifest>>> keyChunks = partition(sortedKeys, EXPORT_DIFF_BATCH_SIZE);
// Objectify's map return type is asynchronous. Calling .values() will block until it loads.
Map<?, CommitLogManifest> nextChunkToExport = ofy().load().keys(keyChunks.get(0));
for (int i = 0; i < keyChunks.size(); i++) {
// Force the async load to finish.
Collection<CommitLogManifest> chunkValues = nextChunkToExport.values();
// Since there is no hard bound on how much data this might be, take care not to let the
// Objectify session cache fill up and potentially run out of memory. This is the only safe
// point to do this since at this point there is no async load in progress.
ofy().clearSessionCache();
// Kick off the next async load, which can happen in parallel to the current GCS export.
if (i + 1 < keyChunks.size()) {
nextChunkToExport = ofy().load().keys(keyChunks.get(i + 1));
}
exportChunk(commitLogStream, chunkValues);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
return commitLogFile;
}
/**
* Loads all the diff keys, sorted in a transaction-consistent chronological order.
*
* @param lowerCheckpoint exclusive lower bound on keys in this diff, or null if no lower bound
* @param upperCheckpoint inclusive upper bound on keys in this diff
*/
private static ImmutableList<Key<CommitLogManifest>> loadAllDiffKeys(
@Nullable final CommitLogCheckpoint lowerCheckpoint,
final CommitLogCheckpoint upperCheckpoint) {
// Fetch the keys (no data) between these checkpoints, and sort by timestamp. This ordering is
// transaction-consistent by virtue of our checkpoint strategy and our customized Ofy; see
// CommitLogCheckpointStrategy for the proof. We break ties by sorting on bucket ID to ensure
// a deterministic order.
return upperCheckpoint.getBucketTimestamps().keySet().stream()
.flatMap(
bucketNum ->
Streams.stream(loadDiffKeysFromBucket(lowerCheckpoint, upperCheckpoint, bucketNum)))
.sorted(
comparingLong(Key<CommitLogManifest>::getId)
.thenComparingLong(a -> a.getParent().getId()))
.collect(toImmutableList());
}
/**
* Loads the diff keys for one bucket.
*
* @param lowerCheckpoint exclusive lower bound on keys in this diff, or null if no lower bound
* @param upperCheckpoint inclusive upper bound on keys in this diff
* @param bucketNum the bucket to load diff keys from
*/
private static Iterable<Key<CommitLogManifest>> loadDiffKeysFromBucket(
@Nullable CommitLogCheckpoint lowerCheckpoint,
CommitLogCheckpoint upperCheckpoint,
int bucketNum) {
// If no lower checkpoint exists, or if it exists but had no timestamp for this bucket number
// (because the bucket count was increased between these checkpoints), then use START_OF_TIME
// as the effective exclusive lower bound.
DateTime lowerCheckpointBucketTime =
firstNonNull(
(lowerCheckpoint == null) ? null : lowerCheckpoint.getBucketTimestamps().get(bucketNum),
START_OF_TIME);
// Since START_OF_TIME=0 is not a valid id in a key, add 1 to both bounds. Then instead of
// loading lowerBound < x <= upperBound, we can load lowerBound <= x < upperBound.
DateTime lowerBound = lowerCheckpointBucketTime.plusMillis(1);
DateTime upperBound = upperCheckpoint.getBucketTimestamps().get(bucketNum).plusMillis(1);
// If the lower and upper bounds are equal, there can't be any results, so skip the query.
if (lowerBound.equals(upperBound)) {
return ImmutableSet.of();
}
Key<CommitLogBucket> bucketKey = getBucketKey(bucketNum);
return ofy()
.load()
.type(CommitLogManifest.class)
.ancestor(bucketKey)
.filterKey(">=", CommitLogManifest.createKey(bucketKey, lowerBound))
.filterKey("<", CommitLogManifest.createKey(bucketKey, upperBound))
.keys();
}
/** Writes a chunks-worth of manifests and associated mutations to GCS. */
private static void exportChunk(OutputStream gcsStream, Collection<CommitLogManifest> chunk)
throws IOException {
// Kickoff async loads for all the manifests in the chunk.
ImmutableList.Builder<Iterable<? extends ImmutableObject>> entities =
new ImmutableList.Builder<>();
for (CommitLogManifest manifest : chunk) {
entities.add(ImmutableList.of(manifest));
entities.add(ofy().load().type(CommitLogMutation.class).ancestor(manifest));
}
for (ImmutableObject entity : concat(entities.build())) {
serializeEntity(entity, gcsStream);
}
}
}

View file

@ -1,50 +0,0 @@
// Copyright 2020 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.beam.initsql;
import static com.google.common.truth.Truth.assertThat;
import static google.registry.beam.initsql.BackupPaths.getKindFromFileName;
import static org.junit.jupiter.api.Assertions.assertThrows;
import org.junit.jupiter.api.Test;
/** Unit tests for {@link google.registry.beam.initsql.BackupPaths}. */
public class BackupPathsTest {
@Test
void getKindFromFileName_empty() {
assertThrows(IllegalArgumentException.class, () -> getKindFromFileName(""));
}
@Test
void getKindFromFileName_notMatch() {
assertThrows(
IllegalArgumentException.class,
() -> getKindFromFileName("/tmp/all_namespaces/kind_/input-0"));
}
@Test
void getKindFromFileName_success() {
assertThat(getKindFromFileName("scheme:/somepath/all_namespaces/kind_mykind/input-something"))
.isEqualTo("mykind");
}
@Test
void getKindFromFileName_specialChar_success() {
assertThat(
getKindFromFileName("scheme:/somepath/all_namespaces/kind_.*+? /(a)/input-something"))
.isEqualTo(".*+? /(a)");
}
}

View file

@ -20,6 +20,8 @@ import static google.registry.persistence.transaction.TransactionManagerFactory.
import com.google.appengine.api.datastore.Entity;
import com.googlecode.objectify.Key;
import google.registry.backup.CommitLogExports;
import google.registry.model.ofy.CommitLogCheckpoint;
import google.registry.testing.AppEngineRule;
import google.registry.testing.FakeClock;
import google.registry.tools.LevelDbFileBuilder;
@ -35,6 +37,10 @@ import org.joda.time.format.DateTimeFormatter;
* <p>A Datastore backup consists of an unsynchronized data export and a sequence of incremental
* Commit Logs that overlap with the export process. Together they can be used to recreate a
* consistent snapshot of the Datastore.
*
* <p>For convenience of test-writing, the {@link #fakeClock} is advanced by 1 millisecond after
* every transaction is invoked on this store, ensuring strictly increasing timestamps on causally
* dependent transactions. In production, the same ordering is ensured by sleep and retry.
*/
class BackupTestStore implements AutoCloseable {
@ -44,6 +50,8 @@ class BackupTestStore implements AutoCloseable {
private final FakeClock fakeClock;
private AppEngineRule appEngine;
private CommitLogCheckpoint prevCommitLogCheckpoint;
BackupTestStore(FakeClock fakeClock) throws Exception {
this.fakeClock = fakeClock;
this.appEngine =
@ -55,16 +63,27 @@ class BackupTestStore implements AutoCloseable {
this.appEngine.beforeEach(null);
}
void transact(Iterable<Object> deletes, Iterable<Object> newOrUpdated) {
tm().transact(
() -> {
ofy().delete().entities(deletes);
ofy().save().entities(newOrUpdated);
});
fakeClock.advanceOneMilli();
}
/** Inserts or updates {@code entities} in the Datastore. */
@SafeVarargs
final void insertOrUpdate(Object... entities) {
tm().transact(() -> ofy().save().entities(entities).now());
fakeClock.advanceOneMilli();
}
/** Deletes {@code entities} from the Datastore. */
@SafeVarargs
final void delete(Object... entities) {
tm().transact(() -> ofy().delete().entities(entities).now());
fakeClock.advanceOneMilli();
}
/**
@ -109,8 +128,12 @@ class BackupTestStore implements AutoCloseable {
builder.build();
}
void saveCommitLog() {
throw new UnsupportedOperationException("Not implemented yet");
File saveCommitLogs(String commitLogDir) {
CommitLogCheckpoint checkpoint = CommitLogExports.computeCheckpoint(fakeClock);
File commitLogFile =
CommitLogExports.saveCommitLogs(commitLogDir, prevCommitLogCheckpoint, checkpoint);
prevCommitLogCheckpoint = checkpoint;
return commitLogFile;
}
@Override

View file

@ -19,27 +19,36 @@ import static com.google.common.truth.Truth.assertWithMessage;
import static com.google.common.truth.Truth8.assertThat;
import static google.registry.model.common.EntityGroupRoot.getCrossTldKey;
import static google.registry.model.ofy.ObjectifyService.ofy;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import static google.registry.testing.DatastoreHelper.newContactResource;
import static google.registry.testing.DatastoreHelper.newDomainBase;
import static google.registry.testing.DatastoreHelper.newRegistry;
import com.google.appengine.api.datastore.Entity;
import com.google.appengine.api.datastore.EntityTranslator;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Streams;
import com.google.storage.onestore.v3.OnestoreEntity.EntityProto;
import com.googlecode.objectify.Key;
import google.registry.backup.CommitLogImports;
import google.registry.backup.VersionedEntity;
import google.registry.model.contact.ContactResource;
import google.registry.model.domain.DesignatedContact;
import google.registry.model.domain.DomainBase;
import google.registry.model.ofy.Ofy;
import google.registry.model.registry.Registry;
import google.registry.persistence.VKey;
import google.registry.testing.FakeClock;
import google.registry.testing.InjectRule;
import google.registry.tools.LevelDbLogReader;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Stream;
@ -47,6 +56,7 @@ import org.joda.time.DateTime;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;
/** Unit tests for {@link BackupTestStore}. */
@ -56,17 +66,25 @@ public class BackupTestStoreTest {
private FakeClock fakeClock;
private BackupTestStore store;
private Registry registry;
private ContactResource contact;
private DomainBase domain;
@TempDir File tempDir;
@RegisterExtension InjectRule injectRule = new InjectRule();
@BeforeEach
void beforeEach() throws Exception {
fakeClock = new FakeClock(START_TIME);
store = new BackupTestStore(fakeClock);
injectRule.setStaticField(Ofy.class, "clock", fakeClock);
store.insertOrUpdate(newRegistry("tld1", "TLD1"));
ContactResource contact1 = newContactResource("contact_1");
DomainBase domain1 = newDomainBase("domain1.tld1", contact1);
store.insertOrUpdate(contact1, domain1);
registry = newRegistry("tld1", "TLD1");
store.insertOrUpdate(registry);
contact = newContactResource("contact_1");
domain = newDomainBase("domain1.tld1", contact);
store.insertOrUpdate(contact, domain);
}
@AfterEach
@ -77,7 +95,8 @@ public class BackupTestStoreTest {
@Test
void export_filesCreated() throws IOException {
String exportRootPath = tempDir.getAbsolutePath();
File exportFolder = new File(exportRootPath, "2000-01-01T00:00:00_000");
assertThat(fakeClock.nowUtc().toString()).isEqualTo("2000-01-01T00:00:00.002Z");
File exportFolder = new File(exportRootPath, "2000-01-01T00:00:00_002");
assertWithMessage("Directory %s should not exist.", exportFolder.getAbsoluteFile())
.that(exportFolder.exists())
.isFalse();
@ -100,7 +119,7 @@ public class BackupTestStoreTest {
void export_folderNameChangesWithTime() throws IOException {
String exportRootPath = tempDir.getAbsolutePath();
fakeClock.advanceOneMilli();
File exportFolder = new File(exportRootPath, "2000-01-01T00:00:00_001");
File exportFolder = new File(exportRootPath, "2000-01-01T00:00:00_003");
assertWithMessage("Directory %s should not exist.", exportFolder.getAbsoluteFile())
.that(exportFolder.exists())
.isFalse();
@ -147,6 +166,69 @@ public class BackupTestStoreTest {
assertThat(tlds).containsExactly("tld2");
}
@Test
void saveCommitLogs_fileCreated() {
File commitLogFile = store.saveCommitLogs(tempDir.getAbsolutePath());
assertThat(commitLogFile.exists()).isTrue();
assertThat(commitLogFile.getName()).isEqualTo("commit_diff_until_2000-01-01T00:00:00.002Z");
}
@Test
void saveCommitLogs_inserts() {
File commitLogFile = store.saveCommitLogs(tempDir.getAbsolutePath());
assertThat(commitLogFile.exists()).isTrue();
ImmutableList<VersionedEntity> mutations = CommitLogImports.loadEntities(commitLogFile);
assertThat(mutations.stream().map(VersionedEntity::getEntity).map(Optional::get))
.containsExactlyElementsIn(toDatastoreEntities(registry, contact, domain));
// Registry created at -2, contract and domain created at -1.
assertThat(mutations.stream().map(VersionedEntity::commitTimeMills))
.containsExactly(
fakeClock.nowUtc().getMillis() - 2,
fakeClock.nowUtc().getMillis() - 1,
fakeClock.nowUtc().getMillis() - 1);
}
@Test
void saveCommitLogs_deletes() {
fakeClock.advanceOneMilli();
store.saveCommitLogs(tempDir.getAbsolutePath());
ContactResource newContact = newContactResource("contact2");
VKey<ContactResource> vKey = newContact.createVKey();
domain =
domain
.asBuilder()
.setRegistrant(vKey)
.setContacts(
ImmutableSet.of(
DesignatedContact.create(DesignatedContact.Type.ADMIN, vKey),
DesignatedContact.create(DesignatedContact.Type.TECH, vKey)))
.build();
store.insertOrUpdate(domain, newContact);
store.delete(contact);
fakeClock.advanceOneMilli();
File commitLogFile = store.saveCommitLogs(tempDir.getAbsolutePath());
ImmutableList<VersionedEntity> mutations = CommitLogImports.loadEntities(commitLogFile);
assertThat(mutations.stream().filter(VersionedEntity::isDelete).map(VersionedEntity::key))
.containsExactly(Key.create(contact).getRaw());
assertThat(
mutations.stream()
.filter(Predicates.not(VersionedEntity::isDelete))
.map(VersionedEntity::getEntity)
.map(Optional::get))
.containsExactlyElementsIn(toDatastoreEntities(domain, newContact));
}
@Test
void saveCommitLogs_empty() {
fakeClock.advanceOneMilli();
store.saveCommitLogs(tempDir.getAbsolutePath());
fakeClock.advanceOneMilli();
File commitLogFile = store.saveCommitLogs(tempDir.getAbsolutePath());
assertThat(commitLogFile.exists()).isTrue();
assertThat(CommitLogImports.loadEntities(commitLogFile)).isEmpty();
}
private File export(String exportRootPath, Set<Key<?>> excludes) throws IOException {
return store.export(
exportRootPath,
@ -168,4 +250,13 @@ public class BackupTestStoreTest {
Entity entity = EntityTranslator.createFromPb(proto);
return ofyEntityType.cast(ofy().load().fromEntity(entity));
}
@SafeVarargs
private static ImmutableList<Entity> toDatastoreEntities(Object... ofyEntities) {
return tm().transact(
() ->
Stream.of(ofyEntities)
.map(oe -> ofy().save().toEntity(oe))
.collect(ImmutableList.toImmutableList()));
}
}

View file

@ -0,0 +1,223 @@
// Copyright 2020 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.beam.initsql;
import static google.registry.model.ofy.ObjectifyService.ofy;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import static google.registry.testing.DatastoreHelper.newContactResource;
import static google.registry.testing.DatastoreHelper.newDomainBase;
import static google.registry.testing.DatastoreHelper.newRegistry;
import com.google.appengine.api.datastore.Entity;
import com.google.common.collect.ImmutableList;
import google.registry.backup.VersionedEntity;
import google.registry.model.contact.ContactResource;
import google.registry.model.domain.DomainBase;
import google.registry.model.ofy.Ofy;
import google.registry.model.registry.Registry;
import google.registry.testing.FakeClock;
import google.registry.testing.InjectRule;
import java.io.File;
import java.io.Serializable;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.DateTime;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Unit tests for {@link CommitLogTransforms}. */
// TODO(weiminyu): Upgrade to JUnit5 when TestPipeline is upgraded. It is also easy to adapt with
// a wrapper.
@RunWith(JUnit4.class)
public class CommitLogTransformsTest implements Serializable {
private static final DateTime START_TIME = DateTime.parse("2000-01-01T00:00:00.0Z");
@Rule public final transient TemporaryFolder temporaryFolder = new TemporaryFolder();
@Rule public final transient InjectRule injectRule = new InjectRule();
@Rule
public final transient TestPipeline pipeline =
TestPipeline.create().enableAbandonedNodeEnforcement(true);
private FakeClock fakeClock;
private transient BackupTestStore store;
private File commitLogsDir;
private File firstCommitLogFile;
// Canned data that are persisted to Datastore, used by assertions in tests.
// TODO(weiminyu): use Ofy entity pojos directly.
private transient ImmutableList<Entity> persistedEntities;
@Before
public void beforeEach() throws Exception {
fakeClock = new FakeClock(START_TIME);
store = new BackupTestStore(fakeClock);
injectRule.setStaticField(Ofy.class, "clock", fakeClock);
Registry registry = newRegistry("tld1", "TLD1");
store.insertOrUpdate(registry);
ContactResource contact1 = newContactResource("contact_1");
DomainBase domain1 = newDomainBase("domain1.tld1", contact1);
store.insertOrUpdate(contact1, domain1);
persistedEntities =
ImmutableList.of(registry, contact1, domain1).stream()
.map(ofyEntity -> tm().transact(() -> ofy().save().toEntity(ofyEntity)))
.collect(ImmutableList.toImmutableList());
commitLogsDir = temporaryFolder.newFolder();
firstCommitLogFile = store.saveCommitLogs(commitLogsDir.getAbsolutePath());
}
@After
public void afterEach() throws Exception {
if (store != null) {
store.close();
store = null;
}
}
@Test
@Category(NeedsRunner.class)
public void getCommitLogFilePatterns() {
PCollection<String> patterns =
pipeline.apply(
"Get CommitLog file patterns",
CommitLogTransforms.getCommitLogFilePatterns(commitLogsDir.getAbsolutePath()));
ImmutableList<String> expectedPatterns =
ImmutableList.of(commitLogsDir.getAbsolutePath() + "/commit_diff_until_*");
PAssert.that(patterns).containsInAnyOrder(expectedPatterns);
pipeline.run();
}
@Test
@Category(NeedsRunner.class)
public void getFilesByPatterns() {
PCollection<Metadata> fileMetas =
pipeline
.apply(
"File patterns to metadata",
Create.of(commitLogsDir.getAbsolutePath() + "/commit_diff_until_*")
.withCoder(StringUtf8Coder.of()))
.apply(Transforms.getFilesByPatterns());
// Transform fileMetas to file names for assertions.
PCollection<String> fileNames =
fileMetas.apply(
"File metadata to path string",
ParDo.of(
new DoFn<Metadata, String>() {
@ProcessElement
public void processElement(
@Element Metadata metadata, OutputReceiver<String> out) {
out.output(metadata.resourceId().toString());
}
}));
ImmutableList<String> expectedFilenames =
ImmutableList.of(firstCommitLogFile.getAbsolutePath());
PAssert.that(fileNames).containsInAnyOrder(expectedFilenames);
pipeline.run();
}
@Test
@Category(NeedsRunner.class)
public void filterCommitLogsByTime() {
ImmutableList<String> commitLogFilenames =
ImmutableList.of(
"/commit_diff_until_2000-01-01T00:00:00.000Z",
"/commit_diff_until_2000-01-01T00:00:00.001Z",
"/commit_diff_until_2000-01-01T00:00:00.002Z",
"/commit_diff_until_2000-01-01T00:00:00.003Z",
"/commit_diff_until_2000-01-01T00:00:00.004Z");
PCollection<String> filteredFilenames =
pipeline
.apply(
"Generate All Filenames",
Create.of(commitLogFilenames).withCoder(StringUtf8Coder.of()))
.apply(
"Filtered by Time",
CommitLogTransforms.filterCommitLogsByTime(
DateTime.parse("2000-01-01T00:00:00.001Z"),
DateTime.parse("2000-01-01T00:00:00.003Z")));
PAssert.that(filteredFilenames)
.containsInAnyOrder(
"/commit_diff_until_2000-01-01T00:00:00.001Z",
"/commit_diff_until_2000-01-01T00:00:00.002Z");
pipeline.run();
}
@Test
@Category(NeedsRunner.class)
public void loadOneCommitLogFile() {
PCollection<VersionedEntity> entities =
pipeline
.apply(
"Get CommitLog file patterns",
CommitLogTransforms.getCommitLogFilePatterns(commitLogsDir.getAbsolutePath()))
.apply("Find CommitLogs", Transforms.getFilesByPatterns())
.apply(CommitLogTransforms.loadCommitLogsFromFiles());
PCollection<Long> timestamps =
entities.apply(
"Extract commitTimeMillis",
ParDo.of(
new DoFn<VersionedEntity, Long>() {
@ProcessElement
public void processElement(
@Element VersionedEntity entity, OutputReceiver<Long> out) {
out.output(entity.commitTimeMills());
}
}));
PAssert.that(timestamps)
.containsInAnyOrder(
fakeClock.nowUtc().getMillis() - 2,
fakeClock.nowUtc().getMillis() - 1,
fakeClock.nowUtc().getMillis() - 1);
PCollection<Entity> datastoreEntities =
entities.apply(
"To Datastore Entities",
ParDo.of(
new DoFn<VersionedEntity, Entity>() {
@ProcessElement
public void processElement(
@Element VersionedEntity entity, OutputReceiver<Entity> out) {
entity.getEntity().ifPresent(out::output);
}
}));
PAssert.that(datastoreEntities).containsInAnyOrder(persistedEntities);
pipeline.run();
}
}

View file

@ -21,14 +21,15 @@ import static google.registry.testing.DatastoreHelper.newDomainBase;
import static google.registry.testing.DatastoreHelper.newRegistry;
import com.google.appengine.api.datastore.Entity;
import com.google.appengine.api.datastore.EntityTranslator;
import com.google.common.collect.ImmutableList;
import com.google.storage.onestore.v3.OnestoreEntity.EntityProto;
import com.googlecode.objectify.Key;
import google.registry.backup.VersionedEntity;
import google.registry.model.contact.ContactResource;
import google.registry.model.domain.DomainBase;
import google.registry.model.ofy.Ofy;
import google.registry.model.registry.Registry;
import google.registry.testing.FakeClock;
import google.registry.testing.InjectRule;
import java.io.File;
import java.io.Serializable;
import java.util.Collections;
@ -40,8 +41,8 @@ import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.DateTime;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@ -60,6 +61,8 @@ import org.junit.runners.JUnit4;
// a wrapper.
@RunWith(JUnit4.class)
public class ExportloadingTransformsTest implements Serializable {
private static final DateTime START_TIME = DateTime.parse("2000-01-01T00:00:00.0Z");
private static final ImmutableList<Class<?>> ALL_KINDS =
ImmutableList.of(Registry.class, ContactResource.class, DomainBase.class);
private static final ImmutableList<String> ALL_KIND_STRS =
@ -67,6 +70,8 @@ public class ExportloadingTransformsTest implements Serializable {
@Rule public final transient TemporaryFolder exportRootDir = new TemporaryFolder();
@Rule public final transient InjectRule injectRule = new InjectRule();
@Rule
public final transient TestPipeline pipeline =
TestPipeline.create().enableAbandonedNodeEnforcement(true);
@ -80,8 +85,9 @@ public class ExportloadingTransformsTest implements Serializable {
@Before
public void beforeEach() throws Exception {
fakeClock = new FakeClock();
fakeClock = new FakeClock(START_TIME);
store = new BackupTestStore(fakeClock);
injectRule.setStaticField(Ofy.class, "clock", fakeClock);
Registry registry = newRegistry("tld1", "TLD1");
store.insertOrUpdate(registry);
@ -107,7 +113,7 @@ public class ExportloadingTransformsTest implements Serializable {
@Test
@Category(NeedsRunner.class)
public void getBackupDataFilePatterns() {
public void getExportFilePatterns() {
PCollection<String> patterns =
pipeline.apply(
"Get Datastore file patterns",
@ -138,7 +144,7 @@ public class ExportloadingTransformsTest implements Serializable {
exportDir.getAbsolutePath()
+ "/all_namespaces/kind_ContactResource/input-*")
.withCoder(StringUtf8Coder.of()))
.apply(ExportLoadingTransforms.getFilesByPatterns());
.apply(Transforms.getFilesByPatterns());
// Transform fileMetas to file names for assertions.
PCollection<String> fileNames =
@ -166,25 +172,25 @@ public class ExportloadingTransformsTest implements Serializable {
@Test
public void loadDataFromFiles() {
PCollection<KV<String, byte[]>> taggedRecords =
PCollection<VersionedEntity> taggedRecords =
pipeline
.apply(
"Get Datastore file patterns",
ExportLoadingTransforms.getDatastoreExportFilePatterns(
exportDir.getAbsolutePath(), ALL_KIND_STRS))
.apply("Find Datastore files", ExportLoadingTransforms.getFilesByPatterns())
.apply("Load from Datastore files", ExportLoadingTransforms.loadDataFromFiles());
.apply("Find Datastore files", Transforms.getFilesByPatterns())
.apply("Load from Datastore files", ExportLoadingTransforms.loadExportDataFromFiles());
// Transform bytes to pojo for analysis
PCollection<Entity> entities =
taggedRecords.apply(
"Raw records to Entity",
ParDo.of(
new DoFn<KV<String, byte[]>, Entity>() {
new DoFn<VersionedEntity, Entity>() {
@ProcessElement
public void processElement(
@Element KV<String, byte[]> kv, OutputReceiver<Entity> out) {
out.output(parseBytes(kv.getValue()));
@Element VersionedEntity versionedEntity, OutputReceiver<Entity> out) {
out.output(versionedEntity.getEntity().get());
}
}));
@ -192,10 +198,4 @@ public class ExportloadingTransformsTest implements Serializable {
pipeline.run();
}
private static Entity parseBytes(byte[] record) {
EntityProto proto = new EntityProto();
proto.parseFrom(record);
return EntityTranslator.createFromPb(proto);
}
}