Refactor pipline for Datastore backup loading (#628)

* Refactor pipline for Datastore backup loading

Refactored pipeline transforms.

Added testing utilities that handles assertions better.

Investigated and documented challenges in serializing Ofy entities
without side effects.
This commit is contained in:
Weimin Yu 2020-06-17 22:10:14 -04:00 committed by GitHub
parent 69a1d04c18
commit d43564172f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 517 additions and 375 deletions

View file

@ -18,15 +18,20 @@ import static com.google.common.base.Preconditions.checkState;
import static google.registry.model.ofy.ObjectifyService.ofy;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import com.google.appengine.api.datastore.DatastoreService;
import com.google.appengine.api.datastore.DatastoreServiceFactory;
import com.google.appengine.api.datastore.Entity;
import com.google.appengine.api.datastore.EntityNotFoundException;
import com.googlecode.objectify.Key;
import google.registry.backup.CommitLogExports;
import google.registry.backup.VersionedEntity;
import google.registry.model.ofy.CommitLogCheckpoint;
import google.registry.testing.AppEngineRule;
import google.registry.testing.FakeClock;
import google.registry.tools.LevelDbFileBuilder;
import java.io.File;
import java.io.IOException;
import java.util.NoSuchElementException;
import java.util.Set;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
@ -49,6 +54,8 @@ class BackupTestStore implements AutoCloseable {
private final FakeClock fakeClock;
private AppEngineRule appEngine;
/** For fetching the persisted Datastore Entity directly. */
private DatastoreService datastoreService;
private CommitLogCheckpoint prevCommitLogCheckpoint;
@ -61,31 +68,69 @@ class BackupTestStore implements AutoCloseable {
.withClock(fakeClock)
.build();
this.appEngine.beforeEach(null);
datastoreService = DatastoreServiceFactory.getDatastoreService();
}
void transact(Iterable<Object> deletes, Iterable<Object> newOrUpdated) {
/** Returns the timestamp of the transaction. */
long transact(Iterable<Object> deletes, Iterable<Object> newOrUpdated) {
long timestamp = fakeClock.nowUtc().getMillis();
tm().transact(
() -> {
ofy().delete().entities(deletes);
ofy().save().entities(newOrUpdated);
});
fakeClock.advanceOneMilli();
return timestamp;
}
/** Inserts or updates {@code entities} in the Datastore. */
/**
* Inserts or updates {@code entities} in the Datastore and returns the timestamp of this
* transaction.
*/
@SafeVarargs
final void insertOrUpdate(Object... entities) {
final long insertOrUpdate(Object... entities) {
long timestamp = fakeClock.nowUtc().getMillis();
tm().transact(() -> ofy().save().entities(entities).now());
fakeClock.advanceOneMilli();
return timestamp;
}
/** Deletes {@code entities} from the Datastore. */
/** Deletes {@code entities} from the Datastore and returns the timestamp of this transaction. */
@SafeVarargs
final void delete(Object... entities) {
final long delete(Object... entities) {
long timestamp = fakeClock.nowUtc().getMillis();
tm().transact(() -> ofy().delete().entities(entities).now());
fakeClock.advanceOneMilli();
return timestamp;
}
/**
* Returns the persisted data that corresponds to {@code ofyEntity} as a Datastore {@link Entity}.
*
* <p>A typical use case for this method is in a test, when the caller has persisted newly created
* Objectify entity and want to find out the values of certain assign-on-persist properties. See
* {@link VersionedEntity} for more information.
*/
Entity loadAsDatastoreEntity(Object ofyEntity) {
try {
return datastoreService.get(Key.create(ofyEntity).getRaw());
} catch (EntityNotFoundException e) {
throw new NoSuchElementException(e.getMessage());
}
}
/**
* Returns the persisted data that corresponds to {@code ofyEntity} as an Objectify entity.
*
* <p>See {@link #loadAsDatastoreEntity} and {@link VersionedEntity} for more information.
*/
Object loadAsOfyEntity(Object ofyEntity) {
try {
return ofy().load().fromEntity(datastoreService.get(Key.create(ofyEntity).getRaw()));
} catch (EntityNotFoundException e) {
throw new NoSuchElementException(e.getMessage());
}
}
/**
* Exports entities of the caller provided types and returns the directory where data is exported.
*
@ -96,7 +141,8 @@ class BackupTestStore implements AutoCloseable {
* to simulate an inconsistent export
* @return directory where data is exported
*/
File export(String exportRootPath, Iterable<Class<?>> pojoTypes, Set<Key<?>> excludes)
File export(
String exportRootPath, Iterable<Class<?>> pojoTypes, Set<Key<? extends Object>> excludes)
throws IOException {
File exportDirectory = getExportDirectory(exportRootPath);
for (Class<?> pojoType : pojoTypes) {
@ -119,8 +165,9 @@ class BackupTestStore implements AutoCloseable {
for (Object pojo : ofy().load().type(pojoType).iterable()) {
if (!excludes.contains(Key.create(pojo))) {
try {
builder.addEntity(toEntity(pojo));
} catch (IOException e) {
// Must preserve UpdateTimestamp. Do not use ofy().save().toEntity(pojo)!
builder.addEntity(datastoreService.get(Key.create(pojo).getRaw()));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@ -144,10 +191,6 @@ class BackupTestStore implements AutoCloseable {
}
}
private Entity toEntity(Object pojo) {
return tm().transactNew(() -> ofy().save().toEntity(pojo));
}
private File getExportDirectory(String exportRootPath) {
File exportDirectory =
new File(exportRootPath, fakeClock.nowUtc().toString(EXPORT_TIMESTAMP_FORMAT));

View file

@ -18,19 +18,13 @@ import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.Truth.assertWithMessage;
import static com.google.common.truth.Truth8.assertThat;
import static google.registry.model.common.EntityGroupRoot.getCrossTldKey;
import static google.registry.model.ofy.ObjectifyService.ofy;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import static google.registry.testing.DatastoreHelper.newContactResource;
import static google.registry.testing.DatastoreHelper.newDomainBase;
import static google.registry.testing.DatastoreHelper.newRegistry;
import com.google.appengine.api.datastore.Entity;
import com.google.appengine.api.datastore.EntityTranslator;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Streams;
import com.google.storage.onestore.v3.OnestoreEntity.EntityProto;
import com.googlecode.objectify.Key;
import google.registry.backup.CommitLogImports;
import google.registry.backup.VersionedEntity;
@ -48,10 +42,9 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Stream;
import org.apache.beam.sdk.values.KV;
import org.joda.time.DateTime;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@ -63,17 +56,18 @@ import org.junit.jupiter.api.io.TempDir;
public class BackupTestStoreTest {
private static final DateTime START_TIME = DateTime.parse("2000-01-01T00:00:00.0Z");
private FakeClock fakeClock;
private BackupTestStore store;
private Registry registry;
private ContactResource contact;
private DomainBase domain;
@TempDir File tempDir;
@RegisterExtension InjectRule injectRule = new InjectRule();
private FakeClock fakeClock;
private BackupTestStore store;
// Test data:
private Registry registry;
private ContactResource contact;
private DomainBase domain;
@BeforeEach
void beforeEach() throws Exception {
fakeClock = new FakeClock(START_TIME);
@ -82,9 +76,15 @@ public class BackupTestStoreTest {
registry = newRegistry("tld1", "TLD1");
store.insertOrUpdate(registry);
contact = newContactResource("contact_1");
domain = newDomainBase("domain1.tld1", contact);
store.insertOrUpdate(contact, domain);
// Save persisted data for assertions.
registry = (Registry) store.loadAsOfyEntity(registry);
contact = (ContactResource) store.loadAsOfyEntity(contact);
domain = (DomainBase) store.loadAsOfyEntity(domain);
}
@AfterEach
@ -131,39 +131,33 @@ public class BackupTestStoreTest {
void export_dataReadBack() throws IOException {
String exportRootPath = tempDir.getAbsolutePath();
File exportFolder = export(exportRootPath, Collections.EMPTY_SET);
ImmutableList<String> tldStrings =
loadPropertyFromExportedEntities(
new File(exportFolder, "/all_namespaces/kind_Registry/input-0"),
Registry.class,
Registry::getTldStr);
assertThat(tldStrings).containsExactly("tld1");
ImmutableList<String> domainStrings =
loadPropertyFromExportedEntities(
new File(exportFolder, "/all_namespaces/kind_DomainBase/input-0"),
DomainBase.class,
DomainBase::getDomainName);
assertThat(domainStrings).containsExactly("domain1.tld1");
ImmutableList<String> contactIds =
loadPropertyFromExportedEntities(
new File(exportFolder, "/all_namespaces/kind_ContactResource/input-0"),
ContactResource.class,
ContactResource::getContactId);
assertThat(contactIds).containsExactly("contact_1");
ImmutableList<Object> loadedRegistries =
loadExportedEntities(new File(exportFolder, "/all_namespaces/kind_Registry/input-0"));
assertThat(loadedRegistries).containsExactly(registry);
ImmutableList<Object> loadedDomains =
loadExportedEntities(new File(exportFolder, "/all_namespaces/kind_DomainBase/input-0"));
assertThat(loadedDomains).containsExactly(domain);
ImmutableList<Object> loadedContacts =
loadExportedEntities(
new File(exportFolder, "/all_namespaces/kind_ContactResource/input-0"));
assertThat(loadedContacts).containsExactly(contact);
}
@Test
void export_excludeSomeEntity() throws IOException {
store.insertOrUpdate(newRegistry("tld2", "TLD2"));
Registry newRegistry = newRegistry("tld2", "TLD2");
store.insertOrUpdate(newRegistry);
newRegistry = (Registry) store.loadAsOfyEntity(newRegistry);
String exportRootPath = tempDir.getAbsolutePath();
File exportFolder =
export(
exportRootPath, ImmutableSet.of(Key.create(getCrossTldKey(), Registry.class, "tld1")));
ImmutableList<String> tlds =
loadPropertyFromExportedEntities(
new File(exportFolder, "/all_namespaces/kind_Registry/input-0"),
Registry.class,
Registry::getTldStr);
assertThat(tlds).containsExactly("tld2");
ImmutableList<Object> loadedRegistries =
loadExportedEntities(new File(exportFolder, "/all_namespaces/kind_Registry/input-0"));
assertThat(loadedRegistries).containsExactly(newRegistry);
}
@Test
@ -178,14 +172,11 @@ public class BackupTestStoreTest {
File commitLogFile = store.saveCommitLogs(tempDir.getAbsolutePath());
assertThat(commitLogFile.exists()).isTrue();
ImmutableList<VersionedEntity> mutations = CommitLogImports.loadEntities(commitLogFile);
assertThat(mutations.stream().map(VersionedEntity::getEntity).map(Optional::get))
.containsExactlyElementsIn(toDatastoreEntities(registry, contact, domain));
// Registry created at -2, contract and domain created at -1.
assertThat(mutations.stream().map(VersionedEntity::commitTimeMills))
.containsExactly(
fakeClock.nowUtc().getMillis() - 2,
fakeClock.nowUtc().getMillis() - 1,
fakeClock.nowUtc().getMillis() - 1);
InitSqlTestUtils.assertContainsExactlyElementsIn(
mutations,
KV.of(fakeClock.nowUtc().getMillis() - 2, store.loadAsDatastoreEntity(registry)),
KV.of(fakeClock.nowUtc().getMillis() - 1, store.loadAsDatastoreEntity(contact)),
KV.of(fakeClock.nowUtc().getMillis() - 1, store.loadAsDatastoreEntity(domain)));
}
@Test
@ -205,18 +196,13 @@ public class BackupTestStoreTest {
.build();
store.insertOrUpdate(domain, newContact);
store.delete(contact);
fakeClock.advanceOneMilli();
File commitLogFile = store.saveCommitLogs(tempDir.getAbsolutePath());
ImmutableList<VersionedEntity> mutations = CommitLogImports.loadEntities(commitLogFile);
assertThat(mutations.stream().filter(VersionedEntity::isDelete).map(VersionedEntity::key))
.containsExactly(Key.create(contact).getRaw());
assertThat(
mutations.stream()
.filter(Predicates.not(VersionedEntity::isDelete))
.map(VersionedEntity::getEntity)
.map(Optional::get))
.containsExactlyElementsIn(toDatastoreEntities(domain, newContact));
InitSqlTestUtils.assertContainsExactlyElementsIn(
mutations,
KV.of(fakeClock.nowUtc().getMillis() - 1, Key.create(contact).getRaw()),
KV.of(fakeClock.nowUtc().getMillis() - 2, store.loadAsDatastoreEntity(domain)),
KV.of(fakeClock.nowUtc().getMillis() - 2, store.loadAsDatastoreEntity(newContact)));
}
@Test
@ -236,27 +222,10 @@ public class BackupTestStoreTest {
excludes);
}
private static <T> ImmutableList<String> loadPropertyFromExportedEntities(
File dataFile, Class<T> ofyEntityType, Function<T, String> getter) throws IOException {
private static ImmutableList<Object> loadExportedEntities(File dataFile) throws IOException {
return Streams.stream(LevelDbLogReader.from(dataFile.toPath()))
.map(bytes -> toOfyEntity(bytes, ofyEntityType))
.map(getter)
.map(InitSqlTestUtils::bytesToEntity)
.map(InitSqlTestUtils::datastoreToOfyEntity)
.collect(ImmutableList.toImmutableList());
}
private static <T> T toOfyEntity(byte[] rawRecord, Class<T> ofyEntityType) {
EntityProto proto = new EntityProto();
proto.parseFrom(rawRecord);
Entity entity = EntityTranslator.createFromPb(proto);
return ofyEntityType.cast(ofy().load().fromEntity(entity));
}
@SafeVarargs
private static ImmutableList<Entity> toDatastoreEntities(Object... ofyEntities) {
return tm().transact(
() ->
Stream.of(ofyEntities)
.map(oe -> ofy().save().toEntity(oe))
.collect(ImmutableList.toImmutableList()));
}
}

View file

@ -14,13 +14,10 @@
package google.registry.beam.initsql;
import static google.registry.model.ofy.ObjectifyService.ofy;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import static google.registry.testing.DatastoreHelper.newContactResource;
import static google.registry.testing.DatastoreHelper.newDomainBase;
import static google.registry.testing.DatastoreHelper.newRegistry;
import com.google.appengine.api.datastore.Entity;
import com.google.common.collect.ImmutableList;
import google.registry.backup.VersionedEntity;
import google.registry.model.contact.ContactResource;
@ -39,6 +36,7 @@ import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.DateTime;
import org.junit.After;
@ -50,7 +48,7 @@ import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Unit tests for {@link CommitLogTransforms}. */
/** Unit tests for {@link Transforms} related to loading CommitLogs. */
// TODO(weiminyu): Upgrade to JUnit5 when TestPipeline is upgraded. It is also easy to adapt with
// a wrapper.
@RunWith(JUnit4.class)
@ -69,9 +67,11 @@ public class CommitLogTransformsTest implements Serializable {
private transient BackupTestStore store;
private File commitLogsDir;
private File firstCommitLogFile;
// Canned data that are persisted to Datastore, used by assertions in tests.
// TODO(weiminyu): use Ofy entity pojos directly.
private transient ImmutableList<Entity> persistedEntities;
// Canned data:
private transient Registry registry;
private transient ContactResource contact;
private transient DomainBase domain;
@Before
public void beforeEach() throws Exception {
@ -79,15 +79,17 @@ public class CommitLogTransformsTest implements Serializable {
store = new BackupTestStore(fakeClock);
injectRule.setStaticField(Ofy.class, "clock", fakeClock);
Registry registry = newRegistry("tld1", "TLD1");
registry = newRegistry("tld1", "TLD1");
store.insertOrUpdate(registry);
ContactResource contact1 = newContactResource("contact_1");
DomainBase domain1 = newDomainBase("domain1.tld1", contact1);
store.insertOrUpdate(contact1, domain1);
persistedEntities =
ImmutableList.of(registry, contact1, domain1).stream()
.map(ofyEntity -> tm().transact(() -> ofy().save().toEntity(ofyEntity)))
.collect(ImmutableList.toImmutableList());
contact = newContactResource("contact_1");
domain = newDomainBase("domain1.tld1", contact);
store.insertOrUpdate(contact, domain);
// Save persisted data for assertions.
registry = (Registry) store.loadAsOfyEntity(registry);
contact = (ContactResource) store.loadAsOfyEntity(contact);
domain = (DomainBase) store.loadAsOfyEntity(domain);
commitLogsDir = temporaryFolder.newFolder();
firstCommitLogFile = store.saveCommitLogs(commitLogsDir.getAbsolutePath());
}
@ -106,7 +108,7 @@ public class CommitLogTransformsTest implements Serializable {
PCollection<String> patterns =
pipeline.apply(
"Get CommitLog file patterns",
CommitLogTransforms.getCommitLogFilePatterns(commitLogsDir.getAbsolutePath()));
Transforms.getCommitLogFilePatterns(commitLogsDir.getAbsolutePath()));
ImmutableList<String> expectedPatterns =
ImmutableList.of(commitLogsDir.getAbsolutePath() + "/commit_diff_until_*");
@ -165,7 +167,7 @@ public class CommitLogTransformsTest implements Serializable {
Create.of(commitLogFilenames).withCoder(StringUtf8Coder.of()))
.apply(
"Filtered by Time",
CommitLogTransforms.filterCommitLogsByTime(
Transforms.filterCommitLogsByTime(
DateTime.parse("2000-01-01T00:00:00.001Z"),
DateTime.parse("2000-01-01T00:00:00.003Z")));
PAssert.that(filteredFilenames)
@ -183,40 +185,15 @@ public class CommitLogTransformsTest implements Serializable {
pipeline
.apply(
"Get CommitLog file patterns",
CommitLogTransforms.getCommitLogFilePatterns(commitLogsDir.getAbsolutePath()))
Transforms.getCommitLogFilePatterns(commitLogsDir.getAbsolutePath()))
.apply("Find CommitLogs", Transforms.getFilesByPatterns())
.apply(CommitLogTransforms.loadCommitLogsFromFiles());
.apply(Transforms.loadCommitLogsFromFiles());
PCollection<Long> timestamps =
entities.apply(
"Extract commitTimeMillis",
ParDo.of(
new DoFn<VersionedEntity, Long>() {
@ProcessElement
public void processElement(
@Element VersionedEntity entity, OutputReceiver<Long> out) {
out.output(entity.commitTimeMills());
}
}));
PAssert.that(timestamps)
.containsInAnyOrder(
fakeClock.nowUtc().getMillis() - 2,
fakeClock.nowUtc().getMillis() - 1,
fakeClock.nowUtc().getMillis() - 1);
PCollection<Entity> datastoreEntities =
entities.apply(
"To Datastore Entities",
ParDo.of(
new DoFn<VersionedEntity, Entity>() {
@ProcessElement
public void processElement(
@Element VersionedEntity entity, OutputReceiver<Entity> out) {
entity.getEntity().ifPresent(out::output);
}
}));
PAssert.that(datastoreEntities).containsInAnyOrder(persistedEntities);
InitSqlTestUtils.assertContainsExactlyElementsIn(
entities,
KV.of(fakeClock.nowUtc().getMillis() - 2, store.loadAsDatastoreEntity(registry)),
KV.of(fakeClock.nowUtc().getMillis() - 1, store.loadAsDatastoreEntity(contact)),
KV.of(fakeClock.nowUtc().getMillis() - 1, store.loadAsDatastoreEntity(domain)));
pipeline.run();
}

View file

@ -14,13 +14,10 @@
package google.registry.beam.initsql;
import static google.registry.model.ofy.ObjectifyService.ofy;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import static google.registry.testing.DatastoreHelper.newContactResource;
import static google.registry.testing.DatastoreHelper.newDomainBase;
import static google.registry.testing.DatastoreHelper.newRegistry;
import com.google.appengine.api.datastore.Entity;
import com.google.common.collect.ImmutableList;
import com.googlecode.objectify.Key;
import google.registry.backup.VersionedEntity;
@ -41,6 +38,7 @@ import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.DateTime;
import org.junit.After;
@ -53,7 +51,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/**
* Unit tests for {@link ExportLoadingTransforms}.
* Unit tests for {@link Transforms} related to loading Datastore exports.
*
* <p>This class implements {@link Serializable} so that test {@link DoFn} classes may be inlined.
*/
@ -79,9 +77,11 @@ public class ExportloadingTransformsTest implements Serializable {
private FakeClock fakeClock;
private transient BackupTestStore store;
private File exportDir;
// Canned data that are persisted to Datastore, used by assertions in tests.
// TODO(weiminyu): use Ofy entity pojos directly.
private transient ImmutableList<Entity> persistedEntities;
// Canned data:
private transient Registry registry;
private transient ContactResource contact;
private transient DomainBase domain;
@Before
public void beforeEach() throws Exception {
@ -89,15 +89,17 @@ public class ExportloadingTransformsTest implements Serializable {
store = new BackupTestStore(fakeClock);
injectRule.setStaticField(Ofy.class, "clock", fakeClock);
Registry registry = newRegistry("tld1", "TLD1");
registry = newRegistry("tld1", "TLD1");
store.insertOrUpdate(registry);
ContactResource contact1 = newContactResource("contact_1");
DomainBase domain1 = newDomainBase("domain1.tld1", contact1);
store.insertOrUpdate(contact1, domain1);
persistedEntities =
ImmutableList.of(registry, contact1, domain1).stream()
.map(ofyEntity -> tm().transact(() -> ofy().save().toEntity(ofyEntity)))
.collect(ImmutableList.toImmutableList());
contact = newContactResource("contact_1");
domain = newDomainBase("domain1.tld1", contact);
store.insertOrUpdate(contact, domain);
// Save persisted data for assertions.
registry = (Registry) store.loadAsOfyEntity(registry);
contact = (ContactResource) store.loadAsOfyEntity(contact);
domain = (DomainBase) store.loadAsOfyEntity(domain);
exportDir =
store.export(exportRootDir.getRoot().getAbsolutePath(), ALL_KINDS, Collections.EMPTY_SET);
@ -117,8 +119,7 @@ public class ExportloadingTransformsTest implements Serializable {
PCollection<String> patterns =
pipeline.apply(
"Get Datastore file patterns",
ExportLoadingTransforms.getDatastoreExportFilePatterns(
exportDir.getAbsolutePath(), ALL_KIND_STRS));
Transforms.getDatastoreExportFilePatterns(exportDir.getAbsolutePath(), ALL_KIND_STRS));
ImmutableList<String> expectedPatterns =
ImmutableList.of(
@ -172,29 +173,20 @@ public class ExportloadingTransformsTest implements Serializable {
@Test
public void loadDataFromFiles() {
PCollection<VersionedEntity> taggedRecords =
PCollection<VersionedEntity> entities =
pipeline
.apply(
"Get Datastore file patterns",
ExportLoadingTransforms.getDatastoreExportFilePatterns(
Transforms.getDatastoreExportFilePatterns(
exportDir.getAbsolutePath(), ALL_KIND_STRS))
.apply("Find Datastore files", Transforms.getFilesByPatterns())
.apply("Load from Datastore files", ExportLoadingTransforms.loadExportDataFromFiles());
.apply("Load from Datastore files", Transforms.loadExportDataFromFiles());
// Transform bytes to pojo for analysis
PCollection<Entity> entities =
taggedRecords.apply(
"Raw records to Entity",
ParDo.of(
new DoFn<VersionedEntity, Entity>() {
@ProcessElement
public void processElement(
@Element VersionedEntity versionedEntity, OutputReceiver<Entity> out) {
out.output(versionedEntity.getEntity().get());
}
}));
PAssert.that(entities).containsInAnyOrder(persistedEntities);
InitSqlTestUtils.assertContainsExactlyElementsIn(
entities,
KV.of(Transforms.EXPORT_ENTITY_TIME_STAMP, store.loadAsDatastoreEntity(registry)),
KV.of(Transforms.EXPORT_ENTITY_TIME_STAMP, store.loadAsDatastoreEntity(contact)),
KV.of(Transforms.EXPORT_ENTITY_TIME_STAMP, store.loadAsDatastoreEntity(domain)));
pipeline.run();
}

View file

@ -0,0 +1,161 @@
// Copyright 2020 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.beam.initsql;
import static com.google.common.truth.Truth8.assertThat;
import static google.registry.model.ofy.ObjectifyService.ofy;
import static org.apache.beam.sdk.values.TypeDescriptors.kvs;
import static org.apache.beam.sdk.values.TypeDescriptors.strings;
import com.google.appengine.api.datastore.Entity;
import com.google.appengine.api.datastore.EntityTranslator;
import com.google.appengine.api.datastore.Key;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Streams;
import com.google.common.truth.Truth;
import com.google.storage.onestore.v3.OnestoreEntity.EntityProto;
import google.registry.backup.AppEngineEnvironment;
import google.registry.backup.VersionedEntity;
import java.io.Serializable;
import java.util.Collection;
import java.util.stream.Stream;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
/** Test helpers for populating SQL with Datastore backups. */
public final class InitSqlTestUtils {
/** Converts a Datastore {@link Entity} to an Objectify entity. */
public static Object datastoreToOfyEntity(Entity entity) {
return ofy().load().fromEntity(entity);
}
/** Serializes a Datastore {@link Entity} to byte array. */
public static byte[] entityToBytes(Entity entity) {
return EntityTranslator.convertToPb(entity).toByteArray();
}
/** Deserializes raw bytes into {@link Entity}. */
public static Entity bytesToEntity(byte[] bytes) {
EntityProto proto = new EntityProto();
proto.parseFrom(bytes);
return EntityTranslator.createFromPb(proto);
}
/**
* Asserts that the {@code actual} {@link Collection} of {@link VersionedEntity VersionedEntities}
* contains exactly the same elements in the {@code expected} array.
*
* <p>Each {@code expected} {@link KV key-value pair} refers to a versioned state of an Ofy
* entity. The {@link KV#getKey key} is the timestamp, while the {@link KV#getValue value} is
* either a Datastore {@link Entity} (for an existing entity) or a Datastore {@link Key} (for a
* deleted entity).
*
* <p>The {@Entity} instances in both actual and expected data are converted to Objectify entities
* so that value-equality checks can be performed. Datastore {@link Entity#equals Entity's equals
* method} only checks key-equality.
*/
@SafeVarargs
public static void assertContainsExactlyElementsIn(
Collection<VersionedEntity> actual, KV<Long, Serializable>... expected) {
assertThat(actual.stream().map(InitSqlTestUtils::rawEntityToOfyWithTimestamp))
.containsExactlyElementsIn(
Stream.of(expected)
.map(InitSqlTestUtils::expectedToOfyWithTimestamp)
.collect(ImmutableList.toImmutableList()));
}
/**
* Asserts that the {@code actual} {@link PCollection} of {@link VersionedEntity
* VersionedEntities} contains exactly the same elements in the {@code expected} array.
*
* <p>This method makes assertions in the pipeline and only use {@link PAssert} on the result.
* This has two advantages over {@code PAssert}:
*
* <ul>
* <li>It supports assertions on 'containsExactlyElementsIn', which is not available in {@code
* PAssert}.
* <li>It supports assertions on Objectify entities, which {@code PAssert} cannot not do.
* Compared with PAssert-compatible options like {@code google.registry.tools.EntityWrapper}
* or {@link EntityProto}, Objectify entities in Java give better-formatted error messages
* when assertions fail.
* </ul>
*
* <p>Each {@code expected} {@link KV key-value pair} refers to a versioned state of an Ofy
* entity. The {@link KV#getKey key} is the timestamp, while the {@link KV#getValue value} is
* either a Datastore {@link Entity} (for an existing entity) or a Datastore {@link Key} (for a
* deleted entity).
*
* <p>The {@Entity} instances in both actual and expected data are converted to Objectify entities
* so that value-equality checks can be performed. Datastore {@link Entity#equals Entity's equals
* method} only checks key-equality.
*/
@SafeVarargs
public static void assertContainsExactlyElementsIn(
PCollection<VersionedEntity> actual, KV<Long, Serializable>... expected) {
PCollection<String> errMsgs =
actual
.apply(
MapElements.into(kvs(strings(), TypeDescriptor.of(VersionedEntity.class)))
.via(rawEntity -> KV.of("The One Key", rawEntity)))
.apply(GroupByKey.create())
.apply(
"assertContainsExactlyElementsIn",
ParDo.of(
new DoFn<KV<String, Iterable<VersionedEntity>>, String>() {
@ProcessElement
public void processElement(
@Element KV<String, Iterable<VersionedEntity>> input,
OutputReceiver<String> out) {
try (AppEngineEnvironment env = new AppEngineEnvironment()) {
ImmutableList<KV<Long, Object>> actual =
Streams.stream(input.getValue())
.map(InitSqlTestUtils::rawEntityToOfyWithTimestamp)
.collect(ImmutableList.toImmutableList());
try {
Truth.assertThat(actual)
.containsExactlyElementsIn(
Stream.of(expected)
.map(InitSqlTestUtils::expectedToOfyWithTimestamp)
.collect(ImmutableList.toImmutableList()));
} catch (AssertionError e) {
out.output(e.toString());
}
}
}
}));
PAssert.that(errMsgs).empty();
}
private static KV<Long, Object> rawEntityToOfyWithTimestamp(VersionedEntity rawEntity) {
return KV.of(
rawEntity.commitTimeMills(),
rawEntity.getEntity().map(InitSqlTestUtils::datastoreToOfyEntity).orElse(rawEntity.key()));
}
private static KV<Long, Object> expectedToOfyWithTimestamp(KV<Long, Serializable> kv) {
return KV.of(
kv.getKey(),
kv.getValue() instanceof Key
? kv.getValue()
: datastoreToOfyEntity((Entity) kv.getValue()));
}
}