End-to-end Datastore to SQL pipeline (#707)

* End-to-end Datastore to SQL pipeline

Defined InitSqlPipeline that performs end-to-end migration from
a Datastore backup to a SQL database.

Also fixed/refined multiple tests related to this migration.
This commit is contained in:
Weimin Yu 2020-07-24 09:57:43 -04:00 committed by GitHub
parent 91b7d92cf8
commit 6591e0672a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
26 changed files with 1925 additions and 114 deletions

View file

@ -0,0 +1,72 @@
// Copyright 2020 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.beam.initsql;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.function.Supplier;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.testcontainers.containers.JdbcDatabaseContainer;
/**
* Helpers for setting up {@link BeamJpaModule} in tests.
*
* <p>This extension is often used with a Database container and/or temporary file folder. User must
* make sure that all dependent extensions are set up before this extension, e.g., by assigning
* {@link org.junit.jupiter.api.Order orders}.
*/
public final class BeamJpaExtension implements BeforeEachCallback, AfterEachCallback, Serializable {
private final transient JdbcDatabaseContainer<?> database;
private final transient Supplier<Path> credentialPathSupplier;
private transient BeamJpaModule beamJpaModule;
private File credentialFile;
public BeamJpaExtension(Supplier<Path> credentialPathSupplier, JdbcDatabaseContainer database) {
this.database = database;
this.credentialPathSupplier = credentialPathSupplier;
}
public File getCredentialFile() {
return credentialFile;
}
public BeamJpaModule getBeamJpaModule() {
if (beamJpaModule != null) {
return beamJpaModule;
}
return beamJpaModule = new BeamJpaModule(credentialFile.getAbsolutePath());
}
@Override
public void beforeEach(ExtensionContext context) throws IOException {
credentialFile = Files.createFile(credentialPathSupplier.get()).toFile();
new PrintStream(credentialFile)
.printf("%s %s %s", database.getJdbcUrl(), database.getUsername(), database.getPassword())
.close();
}
@Override
public void afterEach(ExtensionContext context) {
credentialFile.delete();
}
}

View file

@ -19,12 +19,10 @@ import static com.google.common.truth.Truth.assertThat;
import google.registry.persistence.NomulusPostgreSql;
import google.registry.persistence.transaction.JpaTransactionManager;
import google.registry.testing.DatastoreEntityExtension;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.file.Path;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
import org.junit.jupiter.api.extension.RegisterExtension;
@ -35,31 +33,28 @@ import org.testcontainers.junit.jupiter.Testcontainers;
/** Unit tests for {@link BeamJpaModule}. */
@Testcontainers
public class BeamJpaModuleTest {
@Container
public PostgreSQLContainer database = new PostgreSQLContainer(NomulusPostgreSql.getDockerTag());
class BeamJpaModuleTest {
@RegisterExtension
public DatastoreEntityExtension datastoreEntityExtension = new DatastoreEntityExtension();
final DatastoreEntityExtension datastoreEntityExtension = new DatastoreEntityExtension();
@TempDir File tempFolder;
@Container
final PostgreSQLContainer database = new PostgreSQLContainer(NomulusPostgreSql.getDockerTag());
private File credentialFile;
@SuppressWarnings("WeakerAccess")
@TempDir
Path tmpDir;
@BeforeEach
public void beforeEach() throws IOException {
credentialFile = new File(tempFolder, "credential");
new PrintStream(credentialFile)
.printf("%s %s %s", database.getJdbcUrl(), database.getUsername(), database.getPassword())
.close();
}
@RegisterExtension
@Order(Order.DEFAULT + 1)
final BeamJpaExtension beamJpaExtension =
new BeamJpaExtension(() -> tmpDir.resolve("credential.dat"), database);
@Test
void getJpaTransactionManager_local() {
JpaTransactionManager jpa =
DaggerBeamJpaModule_JpaTransactionManagerComponent.builder()
.beamJpaModule(new BeamJpaModule(credentialFile.getAbsolutePath()))
.beamJpaModule(beamJpaExtension.getBeamJpaModule())
.build()
.localDbJpaTransactionManager();
assertThat(
@ -80,7 +75,7 @@ public class BeamJpaModuleTest {
*/
@Test
@EnabledIfSystemProperty(named = "test.gcp_integration.env", matches = "\\S+")
public void getJpaTransactionManager_cloudSql_authRequired() {
void getJpaTransactionManager_cloudSql_authRequired() {
String environmentName = System.getProperty("test.gcp_integration.env");
FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create());
JpaTransactionManager jpa =

View file

@ -0,0 +1,223 @@
// Copyright 2020 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.beam.initsql;
import static google.registry.model.ImmutableObjectSubject.assertAboutImmutableObjects;
import static google.registry.model.ofy.ObjectifyService.ofy;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import static google.registry.testing.DatastoreHelper.cloneAndSetAutoTimestamps;
import static google.registry.testing.DatastoreHelper.createTld;
import static google.registry.testing.DatastoreHelper.persistResource;
import static google.registry.util.DateTimeUtils.START_OF_TIME;
import static org.junit.Assert.assertThrows;
import com.google.appengine.api.datastore.Entity;
import com.google.common.collect.ImmutableSet;
import com.googlecode.objectify.Key;
import google.registry.model.billing.BillingEvent;
import google.registry.model.billing.BillingEvent.OneTime;
import google.registry.model.contact.ContactResource;
import google.registry.model.domain.DesignatedContact;
import google.registry.model.domain.DomainAuthInfo;
import google.registry.model.domain.DomainBase;
import google.registry.model.domain.GracePeriod;
import google.registry.model.domain.launch.LaunchNotice;
import google.registry.model.domain.rgp.GracePeriodStatus;
import google.registry.model.domain.secdns.DelegationSignerData;
import google.registry.model.eppcommon.AuthInfo.PasswordAuth;
import google.registry.model.eppcommon.StatusValue;
import google.registry.model.eppcommon.Trid;
import google.registry.model.host.HostResource;
import google.registry.model.ofy.Ofy;
import google.registry.model.poll.PollMessage;
import google.registry.model.reporting.HistoryEntry;
import google.registry.model.transfer.DomainTransferData;
import google.registry.model.transfer.TransferStatus;
import google.registry.persistence.VKey;
import google.registry.testing.AppEngineRule;
import google.registry.testing.DatastoreHelper;
import google.registry.testing.FakeClock;
import google.registry.testing.InjectRule;
import org.joda.time.Instant;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
/** Unit tests for {@link DomainBaseUtil}. */
public class DomainBaseUtilTest {
private final FakeClock fakeClock = new FakeClock(Instant.now());
private DomainBase domain;
private Entity domainEntity;
private Key<OneTime> oneTimeBillKey;
private Key<BillingEvent.Recurring> recurringBillKey;
private Key<DomainBase> domainKey;
@RegisterExtension
AppEngineRule appEngineRule =
AppEngineRule.builder().withDatastore().withClock(fakeClock).build();
@RegisterExtension InjectRule injectRule = new InjectRule();
@BeforeEach
void beforeEach() {
injectRule.setStaticField(Ofy.class, "clock", fakeClock);
createTld("com");
domainKey = Key.create(null, DomainBase.class, "4-COM");
VKey<HostResource> hostKey =
persistResource(
new HostResource.Builder()
.setHostName("ns1.example.com")
.setSuperordinateDomain(VKey.from(domainKey))
.setRepoId("1-COM")
.build())
.createVKey();
VKey<ContactResource> contact1Key =
persistResource(
new ContactResource.Builder()
.setContactId("contact_id1")
.setRepoId("2-COM")
.build())
.createVKey();
VKey<ContactResource> contact2Key =
persistResource(
new ContactResource.Builder()
.setContactId("contact_id2")
.setRepoId("3-COM")
.build())
.createVKey();
Key<HistoryEntry> historyEntryKey =
Key.create(persistResource(new HistoryEntry.Builder().setParent(domainKey).build()));
oneTimeBillKey = Key.create(historyEntryKey, BillingEvent.OneTime.class, 1);
recurringBillKey = Key.create(historyEntryKey, BillingEvent.Recurring.class, 2);
Key<PollMessage.Autorenew> autorenewPollKey =
Key.create(historyEntryKey, PollMessage.Autorenew.class, 3);
Key<PollMessage.OneTime> onetimePollKey =
Key.create(historyEntryKey, PollMessage.OneTime.class, 1);
// Set up a new persisted domain entity.
domain =
persistResource(
cloneAndSetAutoTimestamps(
new DomainBase.Builder()
.setDomainName("example.com")
.setRepoId("4-COM")
.setCreationClientId("a registrar")
.setLastEppUpdateTime(fakeClock.nowUtc())
.setLastEppUpdateClientId("AnotherRegistrar")
.setLastTransferTime(fakeClock.nowUtc())
.setStatusValues(
ImmutableSet.of(
StatusValue.CLIENT_DELETE_PROHIBITED,
StatusValue.SERVER_DELETE_PROHIBITED,
StatusValue.SERVER_TRANSFER_PROHIBITED,
StatusValue.SERVER_UPDATE_PROHIBITED,
StatusValue.SERVER_RENEW_PROHIBITED,
StatusValue.SERVER_HOLD))
.setRegistrant(contact1Key)
.setContacts(
ImmutableSet.of(
DesignatedContact.create(DesignatedContact.Type.ADMIN, contact2Key)))
.setNameservers(ImmutableSet.of(hostKey))
.setSubordinateHosts(ImmutableSet.of("ns1.example.com"))
.setPersistedCurrentSponsorClientId("losing")
.setRegistrationExpirationTime(fakeClock.nowUtc().plusYears(1))
.setAuthInfo(DomainAuthInfo.create(PasswordAuth.create("password")))
.setDsData(
ImmutableSet.of(DelegationSignerData.create(1, 2, 3, new byte[] {0, 1, 2})))
.setLaunchNotice(
LaunchNotice.create("tcnid", "validatorId", START_OF_TIME, START_OF_TIME))
.setTransferData(
new DomainTransferData.Builder()
.setGainingClientId("gaining")
.setLosingClientId("losing")
.setPendingTransferExpirationTime(fakeClock.nowUtc())
.setServerApproveEntities(
ImmutableSet.of(
VKey.from(oneTimeBillKey),
VKey.from(recurringBillKey),
VKey.from(autorenewPollKey)))
.setServerApproveBillingEvent(VKey.from(oneTimeBillKey))
.setServerApproveAutorenewEvent(VKey.from(recurringBillKey))
.setServerApproveAutorenewPollMessage(VKey.from(autorenewPollKey))
.setTransferRequestTime(fakeClock.nowUtc().plusDays(1))
.setTransferStatus(TransferStatus.SERVER_APPROVED)
.setTransferRequestTrid(Trid.create("client-trid", "server-trid"))
.build())
.setDeletePollMessage(onetimePollKey)
.setAutorenewBillingEvent(recurringBillKey)
.setAutorenewPollMessage(autorenewPollKey)
.setSmdId("smdid")
.addGracePeriod(
GracePeriod.create(
GracePeriodStatus.ADD,
fakeClock.nowUtc().plusDays(1),
"registrar",
null))
.build()));
domainEntity = tm().transact(() -> ofy().toEntity(domain));
}
@Test
void removeBillingAndPollAndHosts_allFkeysPresent() {
DomainBase domainTransformedByOfy =
domain
.asBuilder()
.setAutorenewBillingEvent(null)
.setAutorenewPollMessage(null)
.setNameservers(ImmutableSet.of())
.setDeletePollMessage(null)
.setTransferData(null)
.build();
DomainBase domainTransformedByUtil =
(DomainBase) ofy().toPojo(DomainBaseUtil.removeBillingAndPollAndHosts(domainEntity));
// Compensates for the missing INACTIVE status.
domainTransformedByUtil = domainTransformedByUtil.asBuilder().build();
assertAboutImmutableObjects()
.that(domainTransformedByUtil)
.isEqualExceptFields(domainTransformedByOfy, "revisions");
}
@Test
void removeBillingAndPollAndHosts_noFkeysPresent() {
DomainBase domainWithoutFKeys =
domain
.asBuilder()
.setAutorenewBillingEvent(null)
.setAutorenewPollMessage(null)
.setNameservers(ImmutableSet.of())
.setDeletePollMessage(null)
.setTransferData(null)
.build();
Entity entityWithoutFkeys = tm().transact(() -> ofy().toEntity(domainWithoutFKeys));
DomainBase domainTransformedByUtil =
(DomainBase) ofy().toPojo(DomainBaseUtil.removeBillingAndPollAndHosts(entityWithoutFkeys));
// Compensates for the missing INACTIVE status.
domainTransformedByUtil = domainTransformedByUtil.asBuilder().build();
assertAboutImmutableObjects()
.that(domainTransformedByUtil)
.isEqualExceptFields(domainWithoutFKeys, "revisions");
}
@Test
void removeBillingAndPollAndHosts_notDomainBase() {
Entity contactEntity =
tm().transact(() -> ofy().toEntity(DatastoreHelper.newContactResource("contact")));
assertThrows(
IllegalArgumentException.class,
() -> DomainBaseUtil.removeBillingAndPollAndHosts(contactEntity));
}
}

View file

@ -0,0 +1,67 @@
// Copyright 2020 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.beam.initsql;
import static google.registry.testing.truth.TextDiffSubject.assertWithMessageAboutUrlSource;
import com.google.common.io.Resources;
import google.registry.beam.TestPipelineExtension;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.net.URL;
import org.apache.beam.runners.core.construction.renderer.PipelineDotRenderer;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
/** Manages visualization of {@link InitSqlPipeline}. */
class InitSqlPipelineGraphTest {
private static final String GOLDEN_DOT_FILE = "pipeline_golden.dot";
private static final String[] OPTIONS_ARGS =
new String[] {
"--commitLogStartTimestamp=2000-01-01TZ",
"--commitLogEndTimestamp=2000-01-02TZ",
"--datastoreExportDir=/somedir",
"--commitLogDir=/someotherdir",
"--environment=alpha"
};
private static final transient InitSqlPipelineOptions options =
PipelineOptionsFactory.fromArgs(OPTIONS_ARGS)
.withValidation()
.as(InitSqlPipelineOptions.class);
@RegisterExtension
final transient TestPipelineExtension testPipeline =
TestPipelineExtension.create().enableAbandonedNodeEnforcement(false);
@Test
public void createPipeline_compareGraph() throws IOException {
new InitSqlPipeline(options, testPipeline).setupPipeline();
String dotString = PipelineDotRenderer.toDotString(testPipeline);
URL goldenDotUrl = Resources.getResource(InitSqlPipelineGraphTest.class, GOLDEN_DOT_FILE);
File outputFile = new File(new File(goldenDotUrl.getFile()).getParent(), "pipeline_curr.dot");
try (PrintStream ps = new PrintStream(outputFile)) {
ps.print(dotString);
}
assertWithMessageAboutUrlSource(
"InitSqlPipeline graph changed. Run :core:updateInitSqlPipelineGraph to update.")
.that(outputFile.toURI().toURL())
.hasSameContentAs(goldenDotUrl);
}
}

View file

@ -12,24 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.model;
package google.registry.beam.initsql;
import static com.google.common.truth.Truth.assertThat;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.junit.jupiter.api.Test;
import java.util.Objects;
/** Unit tests for {@link google.registry.beam.initsql.InitSqlPipelineOptions}. * */
public class InitSqlPipelineOptionsTest {
/** Test helpers for {@link EppResource}. */
public final class EppResourceTestUtils {
private EppResourceTestUtils() {}
public static <E extends EppResource> void assertEqualsIgnoreLastUpdateTime(
E actual, E expected) {
if (Objects.equals(actual, expected)) {
return;
}
actual = (E) actual.asBuilder().build();
actual.updateTimestamp = expected.getUpdateTimestamp();
assertThat(actual).isEqualTo(expected);
@Test
void registerToValidate() {
PipelineOptionsFactory.register(InitSqlPipelineOptions.class);
}
}

View file

@ -0,0 +1,283 @@
// Copyright 2020 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.beam.initsql;
import static com.google.common.truth.Truth.assertThat;
import static google.registry.model.ImmutableObjectSubject.assertAboutImmutableObjects;
import static google.registry.model.ImmutableObjectSubject.immutableObjectCorrespondence;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import static google.registry.testing.DatastoreHelper.newRegistry;
import static google.registry.testing.DatastoreHelper.persistResource;
import static google.registry.util.DateTimeUtils.START_OF_TIME;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.googlecode.objectify.Key;
import google.registry.backup.AppEngineEnvironment;
import google.registry.beam.TestPipelineExtension;
import google.registry.model.billing.BillingEvent;
import google.registry.model.contact.ContactResource;
import google.registry.model.domain.DesignatedContact;
import google.registry.model.domain.DomainAuthInfo;
import google.registry.model.domain.DomainBase;
import google.registry.model.domain.GracePeriod;
import google.registry.model.domain.launch.LaunchNotice;
import google.registry.model.domain.rgp.GracePeriodStatus;
import google.registry.model.domain.secdns.DelegationSignerData;
import google.registry.model.eppcommon.AuthInfo.PasswordAuth;
import google.registry.model.eppcommon.StatusValue;
import google.registry.model.eppcommon.Trid;
import google.registry.model.host.HostResource;
import google.registry.model.ofy.Ofy;
import google.registry.model.poll.PollMessage;
import google.registry.model.registrar.Registrar;
import google.registry.model.registry.Registry;
import google.registry.model.reporting.HistoryEntry;
import google.registry.model.transfer.DomainTransferData;
import google.registry.model.transfer.TransferStatus;
import google.registry.persistence.VKey;
import google.registry.persistence.transaction.JpaTestRules;
import google.registry.persistence.transaction.JpaTestRules.JpaIntegrationTestRule;
import google.registry.testing.AppEngineRule;
import google.registry.testing.DatastoreEntityExtension;
import google.registry.testing.FakeClock;
import google.registry.testing.InjectRule;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.joda.time.DateTime;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;
/** Unit tests for {@link InitSqlPipeline}. */
class InitSqlPipelineTest {
private static final DateTime START_TIME = DateTime.parse("2000-01-01T00:00:00.0Z");
private static final ImmutableList<Class<?>> ALL_KINDS =
ImmutableList.of(
Registry.class,
Registrar.class,
ContactResource.class,
HostResource.class,
DomainBase.class,
HistoryEntry.class);
private transient FakeClock fakeClock = new FakeClock(START_TIME);
@RegisterExtension
@Order(Order.DEFAULT - 1)
final transient DatastoreEntityExtension datastore = new DatastoreEntityExtension();
@RegisterExtension final transient InjectRule injectRule = new InjectRule();
@SuppressWarnings("WeakerAccess")
@TempDir
transient Path tmpDir;
@RegisterExtension
final transient TestPipelineExtension testPipeline =
TestPipelineExtension.create().enableAbandonedNodeEnforcement(true);
@RegisterExtension
final transient JpaIntegrationTestRule database =
new JpaTestRules.Builder().withClock(fakeClock).buildIntegrationTestRule();
// Must not be transient!
@RegisterExtension
@Order(Order.DEFAULT + 1)
final BeamJpaExtension beamJpaExtension =
new BeamJpaExtension(() -> tmpDir.resolve("credential.dat"), database.getDatabase());
private File exportRootDir;
private File exportDir;
private File commitLogDir;
private transient Registrar registrar1;
private transient Registrar registrar2;
private transient DomainBase domain;
private transient ContactResource contact1;
private transient ContactResource contact2;
private transient HostResource hostResource;
private transient HistoryEntry historyEntry;
@BeforeEach
public void beforeEach() throws Exception {
try (BackupTestStore store = new BackupTestStore(fakeClock)) {
injectRule.setStaticField(Ofy.class, "clock", fakeClock);
exportRootDir = Files.createDirectory(tmpDir.resolve("exports")).toFile();
persistResource(newRegistry("com", "COM"));
registrar1 = persistResource(AppEngineRule.makeRegistrar1());
registrar2 = persistResource(AppEngineRule.makeRegistrar2());
Key<DomainBase> domainKey = Key.create(null, DomainBase.class, "4-COM");
hostResource =
persistResource(
new HostResource.Builder()
.setHostName("ns1.example.com")
.setSuperordinateDomain(VKey.from(domainKey))
.setRepoId("1-COM")
.setCreationClientId(registrar1.getClientId())
.setPersistedCurrentSponsorClientId(registrar2.getClientId())
.build());
contact1 =
persistResource(
new ContactResource.Builder()
.setContactId("contact_id1")
.setRepoId("2-COM")
.setCreationClientId(registrar1.getClientId())
.setPersistedCurrentSponsorClientId(registrar2.getClientId())
.build());
contact2 =
persistResource(
new ContactResource.Builder()
.setContactId("contact_id2")
.setRepoId("3-COM")
.setCreationClientId(registrar1.getClientId())
.setPersistedCurrentSponsorClientId(registrar1.getClientId())
.build());
historyEntry = persistResource(new HistoryEntry.Builder().setParent(domainKey).build());
Key<HistoryEntry> historyEntryKey = Key.create(historyEntry);
Key<BillingEvent.OneTime> oneTimeBillKey =
Key.create(historyEntryKey, BillingEvent.OneTime.class, 1);
Key<BillingEvent.Recurring> recurringBillKey =
Key.create(historyEntryKey, BillingEvent.Recurring.class, 2);
Key<PollMessage.Autorenew> autorenewPollKey =
Key.create(historyEntryKey, PollMessage.Autorenew.class, 3);
Key<PollMessage.OneTime> onetimePollKey =
Key.create(historyEntryKey, PollMessage.OneTime.class, 1);
domain =
persistResource(
new DomainBase.Builder()
.setDomainName("example.com")
.setRepoId("4-COM")
.setCreationClientId(registrar1.getClientId())
.setLastEppUpdateTime(fakeClock.nowUtc())
.setLastEppUpdateClientId(registrar2.getClientId())
.setLastTransferTime(fakeClock.nowUtc())
.setStatusValues(
ImmutableSet.of(
StatusValue.CLIENT_DELETE_PROHIBITED,
StatusValue.SERVER_DELETE_PROHIBITED,
StatusValue.SERVER_TRANSFER_PROHIBITED,
StatusValue.SERVER_UPDATE_PROHIBITED,
StatusValue.SERVER_RENEW_PROHIBITED,
StatusValue.SERVER_HOLD))
.setRegistrant(contact1.createVKey())
.setContacts(
ImmutableSet.of(
DesignatedContact.create(
DesignatedContact.Type.ADMIN, contact2.createVKey())))
.setNameservers(ImmutableSet.of(hostResource.createVKey()))
.setSubordinateHosts(ImmutableSet.of("ns1.example.com"))
.setPersistedCurrentSponsorClientId(registrar2.getClientId())
.setRegistrationExpirationTime(fakeClock.nowUtc().plusYears(1))
.setAuthInfo(DomainAuthInfo.create(PasswordAuth.create("password")))
.setDsData(
ImmutableSet.of(DelegationSignerData.create(1, 2, 3, new byte[] {0, 1, 2})))
.setLaunchNotice(
LaunchNotice.create("tcnid", "validatorId", START_OF_TIME, START_OF_TIME))
.setTransferData(
new DomainTransferData.Builder()
.setGainingClientId(registrar1.getClientId())
.setLosingClientId(registrar2.getClientId())
.setPendingTransferExpirationTime(fakeClock.nowUtc())
.setServerApproveEntities(
ImmutableSet.of(
VKey.from(oneTimeBillKey),
VKey.from(recurringBillKey),
VKey.from(autorenewPollKey)))
.setServerApproveBillingEvent(VKey.from(oneTimeBillKey))
.setServerApproveAutorenewEvent(VKey.from(recurringBillKey))
.setServerApproveAutorenewPollMessage(VKey.from(autorenewPollKey))
.setTransferRequestTime(fakeClock.nowUtc().plusDays(1))
.setTransferStatus(TransferStatus.SERVER_APPROVED)
.setTransferRequestTrid(Trid.create("client-trid", "server-trid"))
.build())
.setDeletePollMessage(onetimePollKey)
.setAutorenewBillingEvent(recurringBillKey)
.setAutorenewPollMessage(autorenewPollKey)
.setSmdId("smdid")
.addGracePeriod(
GracePeriod.create(
GracePeriodStatus.ADD, fakeClock.nowUtc().plusDays(1), "registrar", null))
.build());
exportDir = store.export(exportRootDir.getAbsolutePath(), ALL_KINDS, ImmutableSet.of());
commitLogDir = Files.createDirectory(tmpDir.resolve("commits")).toFile();
}
}
@Test
public void runPipeline() {
InitSqlPipelineOptions options =
PipelineOptionsFactory.fromArgs(
"--sqlCredentialUrlOverride="
+ beamJpaExtension.getCredentialFile().getAbsolutePath(),
"--commitLogStartTimestamp=" + START_TIME,
"--commitLogEndTimestamp=" + fakeClock.nowUtc().plusMillis(1),
"--datastoreExportDir=" + exportDir.getAbsolutePath(),
"--commitLogDir=" + commitLogDir.getAbsolutePath())
.withValidation()
.as(InitSqlPipelineOptions.class);
InitSqlPipeline initSqlPipeline = new InitSqlPipeline(options, testPipeline);
initSqlPipeline.run().waitUntilFinish();
try (AppEngineEnvironment env = new AppEngineEnvironment("test")) {
assertHostResourceEquals(
jpaTm().transact(() -> jpaTm().load(hostResource.createVKey())), hostResource);
assertThat(jpaTm().transact(() -> jpaTm().loadAll(Registrar.class)))
.comparingElementsUsing(immutableObjectCorrespondence("lastUpdateTime"))
.containsExactly(registrar1, registrar2);
assertThat(jpaTm().transact(() -> jpaTm().loadAll(ContactResource.class)))
.comparingElementsUsing(immutableObjectCorrespondence("revisions", "updateTimestamp"))
.containsExactly(contact1, contact2);
assertCleansedDomainEquals(jpaTm().transact(() -> jpaTm().load(domain.createVKey())), domain);
}
}
private static void assertHostResourceEquals(HostResource actual, HostResource expected) {
assertAboutImmutableObjects()
.that(actual)
.isEqualExceptFields(expected, "superordinateDomain", "revisions", "updateTimestamp");
assertThat(actual.getSuperordinateDomain().getSqlKey())
.isEqualTo(expected.getSuperordinateDomain().getSqlKey());
}
private static void assertCleansedDomainEquals(DomainBase actual, DomainBase expected) {
assertAboutImmutableObjects()
.that(actual)
.isEqualExceptFields(
expected,
"adminContact",
"registrantContact",
"gracePeriods",
"dsData",
"allContacts",
"revisions",
"updateTimestamp",
"autorenewBillingEvent",
"autorenewPollMessage",
"deletePollMessage",
"nsHosts",
"transferData");
assertThat(actual.getAdminContact().getSqlKey())
.isEqualTo(expected.getAdminContact().getSqlKey());
assertThat(actual.getRegistrant().getSqlKey()).isEqualTo(expected.getRegistrant().getSqlKey());
// TODO(weiminyu): compare gracePeriods, allContacts and dsData, when SQL model supports them.
}
}

View file

@ -15,12 +15,14 @@
package google.registry.beam.initsql;
import static com.google.common.truth.Truth.assertThat;
import static google.registry.model.ImmutableObjectSubject.immutableObjectCorrespondence;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import com.google.appengine.api.datastore.Entity;
import com.google.common.collect.ImmutableList;
import google.registry.backup.VersionedEntity;
import google.registry.beam.TestPipelineExtension;
import google.registry.model.ImmutableObject;
import google.registry.model.contact.ContactResource;
import google.registry.model.ofy.Ofy;
import google.registry.model.registrar.Registrar;
@ -31,10 +33,7 @@ import google.registry.testing.DatastoreEntityExtension;
import google.registry.testing.DatastoreHelper;
import google.registry.testing.FakeClock;
import google.registry.testing.InjectRule;
import java.io.File;
import java.io.PrintStream;
import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.stream.Collectors;
import org.apache.beam.sdk.transforms.Create;
@ -52,16 +51,16 @@ class WriteToSqlTest implements Serializable {
private final FakeClock fakeClock = new FakeClock(START_TIME);
@RegisterExtension
@Order(Order.DEFAULT - 1)
final transient DatastoreEntityExtension datastore = new DatastoreEntityExtension();
@RegisterExtension final transient InjectRule injectRule = new InjectRule();
@RegisterExtension
final transient JpaIntegrationTestRule database =
new JpaTestRules.Builder().withClock(fakeClock).buildIntegrationTestRule();
@RegisterExtension
@Order(value = 1)
final transient DatastoreEntityExtension datastore = new DatastoreEntityExtension();
@SuppressWarnings("WeakerAccess")
@TempDir
transient Path tmpDir;
@ -70,9 +69,13 @@ class WriteToSqlTest implements Serializable {
final transient TestPipelineExtension testPipeline =
TestPipelineExtension.create().enableAbandonedNodeEnforcement(true);
private ImmutableList<Entity> contacts;
// Must not be transient!
@RegisterExtension
@Order(Order.DEFAULT + 1)
public final BeamJpaExtension beamJpaExtension =
new BeamJpaExtension(() -> tmpDir.resolve("credential.dat"), database.getDatabase());
private File credentialFile;
private ImmutableList<Entity> contacts;
@BeforeEach
void beforeEach() throws Exception {
@ -93,14 +96,6 @@ class WriteToSqlTest implements Serializable {
}
contacts = builder.build();
}
credentialFile = Files.createFile(tmpDir.resolve("credential.dat")).toFile();
new PrintStream(credentialFile)
.printf(
"%s %s %s",
database.getDatabaseUrl(),
database.getDatabaseUsername(),
database.getDatabasePassword())
.close();
}
@Test
@ -119,14 +114,18 @@ class WriteToSqlTest implements Serializable {
4,
() ->
DaggerBeamJpaModule_JpaTransactionManagerComponent.builder()
.beamJpaModule(new BeamJpaModule(credentialFile.getAbsolutePath()))
.beamJpaModule(beamJpaExtension.getBeamJpaModule())
.build()
.localDbJpaTransactionManager()));
testPipeline.run().waitUntilFinish();
ImmutableList<?> sqlContacts = jpaTm().transact(() -> jpaTm().loadAll(ContactResource.class));
// TODO(weiminyu): compare load entities with originals. Note: lastUpdateTimes won't match by
// design. Need an elegant way to deal with this.bbq
assertThat(sqlContacts).hasSize(3);
assertThat(sqlContacts)
.comparingElementsUsing(immutableObjectCorrespondence("revisions", "updateTimestamp"))
.containsExactlyElementsIn(
contacts.stream()
.map(InitSqlTestUtils::datastoreToOfyEntity)
.map(ImmutableObject.class::cast)
.collect(ImmutableList.toImmutableList()));
}
}

View file

@ -16,7 +16,6 @@ package google.registry.model.contact;
import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.Truth8.assertThat;
import static google.registry.model.EppResourceTestUtils.assertEqualsIgnoreLastUpdateTime;
import static google.registry.model.EppResourceUtils.loadByForeignKey;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import static google.registry.testing.ContactResourceSubject.assertAboutContacts;
@ -31,6 +30,7 @@ import static org.junit.Assert.assertThrows;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import google.registry.model.EntityTestCase;
import google.registry.model.ImmutableObjectSubject;
import google.registry.model.billing.BillingEvent;
import google.registry.model.contact.Disclose.PostalInfoChoice;
import google.registry.model.contact.PostalInfo.Type;
@ -155,7 +155,9 @@ public class ContactResourceTest extends EntityTestCase {
.setServerApproveEntities(null)
.build())
.build();
assertEqualsIgnoreLastUpdateTime(persisted, fixed);
ImmutableObjectSubject.assertAboutImmutableObjects()
.that(persisted)
.isEqualExceptFields(fixed, "updateTimestamp");
}
@Test

View file

@ -14,7 +14,7 @@
package google.registry.model.domain;
import static google.registry.model.EppResourceTestUtils.assertEqualsIgnoreLastUpdateTime;
import static google.registry.model.ImmutableObjectSubject.assertAboutImmutableObjects;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import static google.registry.testing.SqlHelper.assertThrowForeignKeyViolation;
import static google.registry.testing.SqlHelper.saveRegistrar;
@ -154,7 +154,9 @@ public class DomainBaseSqlTest {
DomainBase org = domain.asBuilder().setCreationTime(result.getCreationTime()).build();
// Note that the equality comparison forces a lazy load of all fields.
assertEqualsIgnoreLastUpdateTime(result, org);
assertAboutImmutableObjects()
.that(result)
.isEqualExceptFields(org, "updateTimestamp");
});
}

View file

@ -135,14 +135,14 @@ public class JpaTestRules {
@Override
public void beforeEach(ExtensionContext context) throws Exception {
this.currentTestClassName = context.getRequiredTestClass().getName();
integrationTestRule.before();
integrationTestRule.beforeEach(null);
jpaEntityCoverage.before();
}
@Override
public void afterEach(ExtensionContext context) throws Exception {
jpaEntityCoverage.after();
integrationTestRule.after();
integrationTestRule.afterEach(null);
this.currentTestClassName = null;
}
}

View file

@ -216,25 +216,17 @@ abstract class JpaTransactionManagerRule extends ExternalResource
}
@Override
public void before() throws Exception {
protected void before() throws Exception {
beforeEach(null);
}
@Override
public void after() {
protected void after() {
afterEach(null);
}
public String getDatabaseUrl() {
return database.getJdbcUrl();
}
public String getDatabaseUsername() {
return database.getUsername();
}
public String getDatabasePassword() {
return database.getPassword();
public JdbcDatabaseContainer getDatabase() {
return database;
}
private void resetTablesAndSequences() {

View file

@ -30,6 +30,7 @@ import com.google.appengine.tools.development.testing.LocalServiceTestHelper;
import com.google.appengine.tools.development.testing.LocalTaskQueueTestConfig;
import com.google.appengine.tools.development.testing.LocalURLFetchServiceTestConfig;
import com.google.appengine.tools.development.testing.LocalUserServiceTestConfig;
import com.google.apphosting.api.ApiProxy;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@ -521,6 +522,8 @@ public final class AppEngineRule extends ExternalResource
} finally {
temporaryFolder.delete();
}
// Clean up environment setting left behind by AppEngine test instance.
ApiProxy.setEnvironmentForCurrentThread(null);
}
/**

View file

@ -0,0 +1,584 @@
digraph {
rankdir=LR
subgraph cluster_0 {
label = ""
subgraph cluster_1 {
label = "Load Datastore snapshot"
subgraph cluster_2 {
label = "Load Datastore snapshot/Get export file patterns"
3 [label="Read(CreateSource)"]
}
subgraph cluster_4 {
label = "Load Datastore snapshot/Find export files"
subgraph cluster_5 {
label = "Load Datastore snapshot/Find export files/FileIO.MatchAll"
subgraph cluster_6 {
label = "Load Datastore snapshot/Find export files/FileIO.MatchAll/Match filepatterns"
7 [label="ParMultiDo(Match)"]
3 -> 7 [style=solid label=""]
}
subgraph cluster_8 {
label = "Load Datastore snapshot/Find export files/FileIO.MatchAll/Reshuffle.ViaRandomKey"
subgraph cluster_9 {
label = "Load Datastore snapshot/Find export files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Pair with random key"
10 [label="ParMultiDo(AssignShard)"]
7 -> 10 [style=solid label=""]
}
subgraph cluster_11 {
label = "Load Datastore snapshot/Find export files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Reshuffle"
subgraph cluster_12 {
label = "Load Datastore snapshot/Find export files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Reshuffle/Window.Into()"
13 [label="Window.Assign"]
10 -> 13 [style=solid label=""]
}
subgraph cluster_14 {
label = "Load Datastore snapshot/Find export files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Reshuffle/ReifyOriginalTimestamps"
subgraph cluster_15 {
label = "Load Datastore snapshot/Find export files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Reshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)"
16 [label="ParMultiDo(Anonymous)"]
13 -> 16 [style=solid label=""]
}
}
17 [label="GroupByKey"]
16 -> 17 [style=solid label=""]
subgraph cluster_18 {
label = "Load Datastore snapshot/Find export files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Reshuffle/ExpandIterable"
19 [label="ParMultiDo(Anonymous)"]
17 -> 19 [style=solid label=""]
}
subgraph cluster_20 {
label = "Load Datastore snapshot/Find export files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Reshuffle/RestoreOriginalTimestamps"
subgraph cluster_21 {
label = "Load Datastore snapshot/Find export files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Reshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard"
subgraph cluster_22 {
label = "Load Datastore snapshot/Find export files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Reshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)"
23 [label="ParMultiDo(Anonymous)"]
19 -> 23 [style=solid label=""]
}
}
subgraph cluster_24 {
label = "Load Datastore snapshot/Find export files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Reshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues"
subgraph cluster_25 {
label = "Load Datastore snapshot/Find export files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Reshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)"
26 [label="ParMultiDo(Anonymous)"]
23 -> 26 [style=solid label=""]
}
}
}
}
subgraph cluster_27 {
label = "Load Datastore snapshot/Find export files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Values"
subgraph cluster_28 {
label = "Load Datastore snapshot/Find export files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Values/Values"
subgraph cluster_29 {
label = "Load Datastore snapshot/Find export files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Values/Values/Map"
30 [label="ParMultiDo(Anonymous)"]
26 -> 30 [style=solid label=""]
}
}
}
}
}
}
subgraph cluster_31 {
label = "Load Datastore snapshot/Load export data"
subgraph cluster_32 {
label = "Load Datastore snapshot/Load export data/FileIO.ReadMatches"
subgraph cluster_33 {
label = "Load Datastore snapshot/Load export data/FileIO.ReadMatches/ParDo(ToReadableFile)"
34 [label="ParMultiDo(ToReadableFile)"]
30 -> 34 [style=solid label=""]
}
}
subgraph cluster_35 {
label = "Load Datastore snapshot/Load export data/BackupFileReader"
36 [label="ParMultiDo(BackupFileReader)"]
34 -> 36 [style=solid label=""]
}
}
subgraph cluster_37 {
label = "Load Datastore snapshot/Get commitlog file patterns"
38 [label="Read(CreateSource)"]
}
subgraph cluster_39 {
label = "Load Datastore snapshot/Find commitlog files"
subgraph cluster_40 {
label = "Load Datastore snapshot/Find commitlog files/FileIO.MatchAll"
subgraph cluster_41 {
label = "Load Datastore snapshot/Find commitlog files/FileIO.MatchAll/Match filepatterns"
42 [label="ParMultiDo(Match)"]
38 -> 42 [style=solid label=""]
}
subgraph cluster_43 {
label = "Load Datastore snapshot/Find commitlog files/FileIO.MatchAll/Reshuffle.ViaRandomKey"
subgraph cluster_44 {
label = "Load Datastore snapshot/Find commitlog files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Pair with random key"
45 [label="ParMultiDo(AssignShard)"]
42 -> 45 [style=solid label=""]
}
subgraph cluster_46 {
label = "Load Datastore snapshot/Find commitlog files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Reshuffle"
subgraph cluster_47 {
label = "Load Datastore snapshot/Find commitlog files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Reshuffle/Window.Into()"
48 [label="Window.Assign"]
45 -> 48 [style=solid label=""]
}
subgraph cluster_49 {
label = "Load Datastore snapshot/Find commitlog files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Reshuffle/ReifyOriginalTimestamps"
subgraph cluster_50 {
label = "Load Datastore snapshot/Find commitlog files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Reshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)"
51 [label="ParMultiDo(Anonymous)"]
48 -> 51 [style=solid label=""]
}
}
52 [label="GroupByKey"]
51 -> 52 [style=solid label=""]
subgraph cluster_53 {
label = "Load Datastore snapshot/Find commitlog files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Reshuffle/ExpandIterable"
54 [label="ParMultiDo(Anonymous)"]
52 -> 54 [style=solid label=""]
}
subgraph cluster_55 {
label = "Load Datastore snapshot/Find commitlog files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Reshuffle/RestoreOriginalTimestamps"
subgraph cluster_56 {
label = "Load Datastore snapshot/Find commitlog files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Reshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard"
subgraph cluster_57 {
label = "Load Datastore snapshot/Find commitlog files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Reshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)"
58 [label="ParMultiDo(Anonymous)"]
54 -> 58 [style=solid label=""]
}
}
subgraph cluster_59 {
label = "Load Datastore snapshot/Find commitlog files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Reshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues"
subgraph cluster_60 {
label = "Load Datastore snapshot/Find commitlog files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Reshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)"
61 [label="ParMultiDo(Anonymous)"]
58 -> 61 [style=solid label=""]
}
}
}
}
subgraph cluster_62 {
label = "Load Datastore snapshot/Find commitlog files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Values"
subgraph cluster_63 {
label = "Load Datastore snapshot/Find commitlog files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Values/Values"
subgraph cluster_64 {
label = "Load Datastore snapshot/Find commitlog files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Values/Values/Map"
65 [label="ParMultiDo(Anonymous)"]
61 -> 65 [style=solid label=""]
}
}
}
}
}
}
subgraph cluster_66 {
label = "Load Datastore snapshot/Filter commitLog by time"
67 [label="ParMultiDo(FilterCommitLogFileByTime)"]
65 -> 67 [style=solid label=""]
}
subgraph cluster_68 {
label = "Load Datastore snapshot/Load commitlog data"
subgraph cluster_69 {
label = "Load Datastore snapshot/Load commitlog data/FileIO.ReadMatches"
subgraph cluster_70 {
label = "Load Datastore snapshot/Load commitlog data/FileIO.ReadMatches/ParDo(ToReadableFile)"
71 [label="ParMultiDo(ToReadableFile)"]
67 -> 71 [style=solid label=""]
}
}
subgraph cluster_72 {
label = "Load Datastore snapshot/Load commitlog data/BackupFileReader"
73 [label="ParMultiDo(BackupFileReader)"]
71 -> 73 [style=solid label=""]
}
}
74 [label="Flatten.PCollections"]
36 -> 74 [style=solid label=""]
73 -> 74 [style=solid label=""]
subgraph cluster_75 {
label = "Load Datastore snapshot/Key entities by Datastore Keys"
subgraph cluster_76 {
label = "Load Datastore snapshot/Key entities by Datastore Keys/Map"
77 [label="ParMultiDo(Anonymous)"]
74 -> 77 [style=solid label=""]
}
}
78 [label="GroupByKey"]
77 -> 78 [style=solid label=""]
79 [label="ParMultiDo(Anonymous)"]
78 -> 79 [style=solid label=""]
}
subgraph cluster_80 {
label = "Write to sql: Transforms:Registrar"
subgraph cluster_81 {
label = "Write to sql: Transforms:Registrar/Shard data for Transforms:Registrar"
subgraph cluster_82 {
label = "Write to sql: Transforms:Registrar/Shard data for Transforms:Registrar/Map"
83 [label="ParMultiDo(Anonymous)"]
79 -> 83 [style=solid label=""]
}
}
subgraph cluster_84 {
label = "Write to sql: Transforms:Registrar/Batch output by shard Transforms:Registrar"
subgraph cluster_85 {
label = "Write to sql: Transforms:Registrar/Batch output by shard Transforms:Registrar/ParDo(GroupIntoBatches)"
86 [label="ParMultiDo(GroupIntoBatches)"]
83 -> 86 [style=solid label=""]
}
}
subgraph cluster_87 {
label = "Write to sql: Transforms:Registrar/Write in batch for Transforms:Registrar"
88 [label="ParMultiDo(SqlBatchWriter)"]
86 -> 88 [style=solid label=""]
}
}
subgraph cluster_89 {
label = "Wait on Transforms:Registrar"
subgraph cluster_90 {
label = "Wait on Transforms:Registrar/To wait view 0"
subgraph cluster_91 {
label = "Wait on Transforms:Registrar/To wait view 0/Window.Into()"
92 [label="Flatten.PCollections"]
88 -> 92 [style=solid label=""]
}
subgraph cluster_93 {
label = "Wait on Transforms:Registrar/To wait view 0/ParDo(CollectWindows)"
94 [label="ParMultiDo(CollectWindows)"]
92 -> 94 [style=solid label=""]
}
subgraph cluster_95 {
label = "Wait on Transforms:Registrar/To wait view 0/Sample.Any"
subgraph cluster_96 {
label = "Wait on Transforms:Registrar/To wait view 0/Sample.Any/Combine.globally(SampleAny)"
subgraph cluster_97 {
label = "Wait on Transforms:Registrar/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys"
subgraph cluster_98 {
label = "Wait on Transforms:Registrar/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys"
subgraph cluster_99 {
label = "Wait on Transforms:Registrar/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys/Map"
100 [label="ParMultiDo(Anonymous)"]
94 -> 100 [style=solid label=""]
}
}
}
subgraph cluster_101 {
label = "Wait on Transforms:Registrar/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)"
102 [label="GroupByKey"]
100 -> 102 [style=solid label=""]
subgraph cluster_103 {
label = "Wait on Transforms:Registrar/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues"
subgraph cluster_104 {
label = "Wait on Transforms:Registrar/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues/ParDo(Anonymous)"
105 [label="ParMultiDo(Anonymous)"]
102 -> 105 [style=solid label=""]
}
}
}
subgraph cluster_106 {
label = "Wait on Transforms:Registrar/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values"
subgraph cluster_107 {
label = "Wait on Transforms:Registrar/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values"
subgraph cluster_108 {
label = "Wait on Transforms:Registrar/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values/Map"
109 [label="ParMultiDo(Anonymous)"]
105 -> 109 [style=solid label=""]
}
}
}
}
subgraph cluster_110 {
label = "Wait on Transforms:Registrar/To wait view 0/Sample.Any/Flatten.Iterables"
subgraph cluster_111 {
label = "Wait on Transforms:Registrar/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables"
subgraph cluster_112 {
label = "Wait on Transforms:Registrar/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables/FlatMap"
113 [label="ParMultiDo(Anonymous)"]
109 -> 113 [style=solid label=""]
}
}
}
}
subgraph cluster_114 {
label = "Wait on Transforms:Registrar/To wait view 0/View.AsList"
subgraph cluster_115 {
label = "Wait on Transforms:Registrar/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization"
subgraph cluster_116 {
label = "Wait on Transforms:Registrar/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)"
117 [label="ParMultiDo(VoidKeyToMultimapMaterialization)"]
113 -> 117 [style=solid label=""]
}
}
118 [label="View.CreatePCollectionView"]
117 -> 118 [style=solid label=""]
}
}
subgraph cluster_119 {
label = "Wait on Transforms:Registrar/Wait"
subgraph cluster_120 {
label = "Wait on Transforms:Registrar/Wait/Map"
121 [label="ParMultiDo(Anonymous)"]
79 -> 121 [style=solid label=""]
117 -> 121 [style=dashed label=""]
}
}
}
subgraph cluster_122 {
label = "Write to sql: Transforms:ContactResource"
subgraph cluster_123 {
label = "Write to sql: Transforms:ContactResource/Shard data for Transforms:ContactResource"
subgraph cluster_124 {
label = "Write to sql: Transforms:ContactResource/Shard data for Transforms:ContactResource/Map"
125 [label="ParMultiDo(Anonymous)"]
121 -> 125 [style=solid label=""]
}
}
subgraph cluster_126 {
label = "Write to sql: Transforms:ContactResource/Batch output by shard Transforms:ContactResource"
subgraph cluster_127 {
label = "Write to sql: Transforms:ContactResource/Batch output by shard Transforms:ContactResource/ParDo(GroupIntoBatches)"
128 [label="ParMultiDo(GroupIntoBatches)"]
125 -> 128 [style=solid label=""]
}
}
subgraph cluster_129 {
label = "Write to sql: Transforms:ContactResource/Write in batch for Transforms:ContactResource"
130 [label="ParMultiDo(SqlBatchWriter)"]
128 -> 130 [style=solid label=""]
}
}
subgraph cluster_131 {
label = "Remove circular foreign keys from DomainBase"
132 [label="ParMultiDo(RemoveDomainBaseForeignKeys)"]
79 -> 132 [style=solid label=""]
}
subgraph cluster_133 {
label = "Wait on phase one"
subgraph cluster_134 {
label = "Wait on phase one/To wait view 0"
subgraph cluster_135 {
label = "Wait on phase one/To wait view 0/Window.Into()"
136 [label="Flatten.PCollections"]
130 -> 136 [style=solid label=""]
}
subgraph cluster_137 {
label = "Wait on phase one/To wait view 0/ParDo(CollectWindows)"
138 [label="ParMultiDo(CollectWindows)"]
136 -> 138 [style=solid label=""]
}
subgraph cluster_139 {
label = "Wait on phase one/To wait view 0/Sample.Any"
subgraph cluster_140 {
label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)"
subgraph cluster_141 {
label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys"
subgraph cluster_142 {
label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys"
subgraph cluster_143 {
label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys/Map"
144 [label="ParMultiDo(Anonymous)"]
138 -> 144 [style=solid label=""]
}
}
}
subgraph cluster_145 {
label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)"
146 [label="GroupByKey"]
144 -> 146 [style=solid label=""]
subgraph cluster_147 {
label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues"
subgraph cluster_148 {
label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues/ParDo(Anonymous)"
149 [label="ParMultiDo(Anonymous)"]
146 -> 149 [style=solid label=""]
}
}
}
subgraph cluster_150 {
label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values"
subgraph cluster_151 {
label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values"
subgraph cluster_152 {
label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values/Map"
153 [label="ParMultiDo(Anonymous)"]
149 -> 153 [style=solid label=""]
}
}
}
}
subgraph cluster_154 {
label = "Wait on phase one/To wait view 0/Sample.Any/Flatten.Iterables"
subgraph cluster_155 {
label = "Wait on phase one/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables"
subgraph cluster_156 {
label = "Wait on phase one/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables/FlatMap"
157 [label="ParMultiDo(Anonymous)"]
153 -> 157 [style=solid label=""]
}
}
}
}
subgraph cluster_158 {
label = "Wait on phase one/To wait view 0/View.AsList"
subgraph cluster_159 {
label = "Wait on phase one/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization"
subgraph cluster_160 {
label = "Wait on phase one/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)"
161 [label="ParMultiDo(VoidKeyToMultimapMaterialization)"]
157 -> 161 [style=solid label=""]
}
}
162 [label="View.CreatePCollectionView"]
161 -> 162 [style=solid label=""]
}
}
subgraph cluster_163 {
label = "Wait on phase one/Wait"
subgraph cluster_164 {
label = "Wait on phase one/Wait/Map"
165 [label="ParMultiDo(Anonymous)"]
132 -> 165 [style=solid label=""]
161 -> 165 [style=dashed label=""]
}
}
}
subgraph cluster_166 {
label = "Write to sql: DomainBase without circular foreign keys"
subgraph cluster_167 {
label = "Write to sql: DomainBase without circular foreign keys/Shard data for DomainBase without circular foreign keys"
subgraph cluster_168 {
label = "Write to sql: DomainBase without circular foreign keys/Shard data for DomainBase without circular foreign keys/Map"
169 [label="ParMultiDo(Anonymous)"]
165 -> 169 [style=solid label=""]
}
}
subgraph cluster_170 {
label = "Write to sql: DomainBase without circular foreign keys/Batch output by shard DomainBase without circular foreign keys"
subgraph cluster_171 {
label = "Write to sql: DomainBase without circular foreign keys/Batch output by shard DomainBase without circular foreign keys/ParDo(GroupIntoBatches)"
172 [label="ParMultiDo(GroupIntoBatches)"]
169 -> 172 [style=solid label=""]
}
}
subgraph cluster_173 {
label = "Write to sql: DomainBase without circular foreign keys/Write in batch for DomainBase without circular foreign keys"
174 [label="ParMultiDo(SqlBatchWriter)"]
172 -> 174 [style=solid label=""]
}
}
subgraph cluster_175 {
label = "Wait on DomainBaseNoFkeys"
subgraph cluster_176 {
label = "Wait on DomainBaseNoFkeys/To wait view 0"
subgraph cluster_177 {
label = "Wait on DomainBaseNoFkeys/To wait view 0/Window.Into()"
178 [label="Flatten.PCollections"]
174 -> 178 [style=solid label=""]
}
subgraph cluster_179 {
label = "Wait on DomainBaseNoFkeys/To wait view 0/ParDo(CollectWindows)"
180 [label="ParMultiDo(CollectWindows)"]
178 -> 180 [style=solid label=""]
}
subgraph cluster_181 {
label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any"
subgraph cluster_182 {
label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)"
subgraph cluster_183 {
label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys"
subgraph cluster_184 {
label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys"
subgraph cluster_185 {
label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys/Map"
186 [label="ParMultiDo(Anonymous)"]
180 -> 186 [style=solid label=""]
}
}
}
subgraph cluster_187 {
label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)"
188 [label="GroupByKey"]
186 -> 188 [style=solid label=""]
subgraph cluster_189 {
label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues"
subgraph cluster_190 {
label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues/ParDo(Anonymous)"
191 [label="ParMultiDo(Anonymous)"]
188 -> 191 [style=solid label=""]
}
}
}
subgraph cluster_192 {
label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values"
subgraph cluster_193 {
label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values"
subgraph cluster_194 {
label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values/Map"
195 [label="ParMultiDo(Anonymous)"]
191 -> 195 [style=solid label=""]
}
}
}
}
subgraph cluster_196 {
label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Flatten.Iterables"
subgraph cluster_197 {
label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables"
subgraph cluster_198 {
label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables/FlatMap"
199 [label="ParMultiDo(Anonymous)"]
195 -> 199 [style=solid label=""]
}
}
}
}
subgraph cluster_200 {
label = "Wait on DomainBaseNoFkeys/To wait view 0/View.AsList"
subgraph cluster_201 {
label = "Wait on DomainBaseNoFkeys/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization"
subgraph cluster_202 {
label = "Wait on DomainBaseNoFkeys/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)"
203 [label="ParMultiDo(VoidKeyToMultimapMaterialization)"]
199 -> 203 [style=solid label=""]
}
}
204 [label="View.CreatePCollectionView"]
203 -> 204 [style=solid label=""]
}
}
subgraph cluster_205 {
label = "Wait on DomainBaseNoFkeys/Wait"
subgraph cluster_206 {
label = "Wait on DomainBaseNoFkeys/Wait/Map"
207 [label="ParMultiDo(Anonymous)"]
79 -> 207 [style=solid label=""]
203 -> 207 [style=dashed label=""]
}
}
}
subgraph cluster_208 {
label = "Write to sql: Transforms:HostResource"
subgraph cluster_209 {
label = "Write to sql: Transforms:HostResource/Shard data for Transforms:HostResource"
subgraph cluster_210 {
label = "Write to sql: Transforms:HostResource/Shard data for Transforms:HostResource/Map"
211 [label="ParMultiDo(Anonymous)"]
207 -> 211 [style=solid label=""]
}
}
subgraph cluster_212 {
label = "Write to sql: Transforms:HostResource/Batch output by shard Transforms:HostResource"
subgraph cluster_213 {
label = "Write to sql: Transforms:HostResource/Batch output by shard Transforms:HostResource/ParDo(GroupIntoBatches)"
214 [label="ParMultiDo(GroupIntoBatches)"]
211 -> 214 [style=solid label=""]
}
}
subgraph cluster_215 {
label = "Write to sql: Transforms:HostResource/Write in batch for Transforms:HostResource"
216 [label="ParMultiDo(SqlBatchWriter)"]
214 -> 216 [style=solid label=""]
}
}
}
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 929 KiB