diff --git a/core/build.gradle b/core/build.gradle index 0e0df1ec1..e0e5062e8 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -828,9 +828,8 @@ task buildToolImage(dependsOn: nomulus, type: Exec) { commandLine 'docker', 'build', '-t', 'nomulus-tool', '.' } -task generateInitSqlPipelineGraph(type: Test) { - include "**/InitSqlPipelineGraphTest.*" - testNameIncludePatterns = ["**createPipeline_compareGraph"] +task generateInitSqlPipelineGraph(type: FilteringTest) { + tests = ['InitSqlPipelineGraphTest.createPipeline_compareGraph'] ignoreFailures = true } diff --git a/core/src/main/java/google/registry/beam/initsql/DomainBaseUtil.java b/core/src/main/java/google/registry/beam/initsql/DomainBaseUtil.java index c81f01197..a83daedcb 100644 --- a/core/src/main/java/google/registry/beam/initsql/DomainBaseUtil.java +++ b/core/src/main/java/google/registry/beam/initsql/DomainBaseUtil.java @@ -26,17 +26,15 @@ final class DomainBaseUtil { private DomainBaseUtil() {} /** - * Removes {@link google.registry.model.billing.BillingEvent.Recurring}, {@link - * google.registry.model.poll.PollMessage PollMessages} and {@link - * google.registry.model.host.HostResource name servers} from a Datastore {@link Entity} that - * represents an Ofy {@link google.registry.model.domain.DomainBase}. This breaks the cycle of - * foreign key constraints between these entity kinds, allowing {@code DomainBases} to be inserted - * into the SQL database. See {@link InitSqlPipeline} for a use case, where the full {@code - * DomainBases} are written again during the last stage of the pipeline. + * Removes properties that contain foreign keys from a Datastore {@link Entity} that represents an + * Ofy {@link google.registry.model.domain.DomainBase}. This breaks the cycle of foreign key + * constraints between entity kinds, allowing {@code DomainBases} to be inserted into the SQL + * database. See {@link InitSqlPipeline} for a use case, where the full {@code DomainBases} are + * written again during the last stage of the pipeline. * *

The returned object may be in bad state. Specifically, {@link * google.registry.model.eppcommon.StatusValue#INACTIVE} is not added after name servers are - * removed. This only impacts tests. + * removed. This only impacts tests that manipulate Datastore entities directly. * *

This operation is performed on an Datastore {@link Entity} instead of Ofy Java object * because Objectify requires access to a Datastore service when converting an Ofy object to a @@ -70,6 +68,9 @@ final class DomainBaseUtil { domainBase.getProperties().keySet().stream() .filter(s -> s.startsWith("transferData.")) .forEach(s -> clone.removeProperty(s)); + domainBase.getProperties().keySet().stream() + .filter(s -> s.startsWith("gracePeriods.")) + .forEach(s -> clone.removeProperty(s)); return clone; } } diff --git a/core/src/main/java/google/registry/beam/initsql/InitSqlPipeline.java b/core/src/main/java/google/registry/beam/initsql/InitSqlPipeline.java index 1d2593d93..970414fa4 100644 --- a/core/src/main/java/google/registry/beam/initsql/InitSqlPipeline.java +++ b/core/src/main/java/google/registry/beam/initsql/InitSqlPipeline.java @@ -34,6 +34,7 @@ import google.registry.model.registrar.Registrar; import google.registry.model.registrar.RegistrarContact; import google.registry.model.registry.Registry; import google.registry.model.reporting.HistoryEntry; +import google.registry.persistence.PersistenceModule.TransactionIsolationLevel; import google.registry.persistence.transaction.JpaTransactionManager; import java.io.Serializable; import java.util.Collection; @@ -77,14 +78,22 @@ import org.joda.time.DateTime; * HistoryEntry}. *

  • {@link BillingEvent.OneTime}: references {@code Registrar}, {@code DomainBase}, {@code * BillingEvent.Recurring}, {@code HistoryEntry} and {@code AllocationToken}. - *
  • {@link BillingEvent.Modification}: SQL model TBD. Will reference {@code Registrar}, {@code - * DomainBase} and {@code BillingEvent.OneTime}. *
  • {@link BillingEvent.Cancellation}: references {@code Registrar}, {@code DomainBase}, {@code * BillingEvent.Recurring}, {@code BillingEvent.OneTime}, and {@code HistoryEntry}. *
  • {@link PollMessage}: references {@code Registrar}, {@code DomainBase}, {@code * ContactResource}, {@code HostResource}, and {@code HistoryEntry}. *
  • {@link DomainBase}, original copy from Datastore. * + * + *

    This pipeline expects that the source Datastore has at least one entity in each of the types + * above. This assumption allows us to construct a simpler pipeline graph that can be visually + * examined, and is true in all intended use cases. However, tests must not violate this assumption + * when setting up data, otherwise they may run into foreign key constraint violations. The reason + * is that this pipeline uses the {@link Wait} transform to order the persistence by entity type. + * However, the wait is skipped if the target type has no data, resulting in subsequent entity types + * starting prematurely. E.g., if a Datastore has no {@code RegistrarContact} entities, the pipeline + * may start writing {@code DomainBase} entities before all {@code Registry}, {@code Registrar} and + * {@code ContactResource} entities have been persisted. */ public class InitSqlPipeline implements Serializable { @@ -93,24 +102,23 @@ public class InitSqlPipeline implements Serializable { * DomainBase}. */ private static final ImmutableList> PHASE_ONE_ORDERED = - ImmutableList.of(Registry.class, Registrar.class, ContactResource.class); + ImmutableList.of( + Registry.class, Registrar.class, ContactResource.class, RegistrarContact.class); /** * Datastore kinds to be written to the SQL database after the cleansed version of {@link * DomainBase}. - * - *

    The following entities are missing from the list: - * - *

    */ - // TODO(weiminyu): add more entities when available. private static final ImmutableList> PHASE_TWO_ORDERED = - ImmutableList.of(HostResource.class); + ImmutableList.of( + HostResource.class, + HistoryEntry.class, + AllocationToken.class, + BillingEvent.Recurring.class, + BillingEvent.OneTime.class, + BillingEvent.Cancellation.class, + PollMessage.class, + DomainBase.class); private final InitSqlPipelineOptions options; @@ -226,7 +234,11 @@ public class InitSqlPipeline implements Serializable { transformId, options.getMaxConcurrentSqlWriters(), options.getSqlWriteBatchSize(), - new JpaSupplierFactory(credentialFileUrl, options.getCloudKmsProjectId(), jpaGetter))); + new JpaSupplierFactory( + credentialFileUrl, + options.getCloudKmsProjectId(), + jpaGetter, + TransactionIsolationLevel.TRANSACTION_READ_UNCOMMITTED))); } private static ImmutableList toKindStrings(Collection> entityClasses) { diff --git a/core/src/main/java/google/registry/beam/initsql/JpaSupplierFactory.java b/core/src/main/java/google/registry/beam/initsql/JpaSupplierFactory.java index 19607b76f..8d2c6bc19 100644 --- a/core/src/main/java/google/registry/beam/initsql/JpaSupplierFactory.java +++ b/core/src/main/java/google/registry/beam/initsql/JpaSupplierFactory.java @@ -16,6 +16,7 @@ package google.registry.beam.initsql; import google.registry.beam.initsql.BeamJpaModule.JpaTransactionManagerComponent; import google.registry.beam.initsql.Transforms.SerializableSupplier; +import google.registry.persistence.PersistenceModule.TransactionIsolationLevel; import google.registry.persistence.transaction.JpaTransactionManager; import javax.annotation.Nullable; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -28,21 +29,32 @@ public class JpaSupplierFactory implements SerializableSupplier jpaGetter; + @Nullable private final TransactionIsolationLevel isolationLevelOverride; public JpaSupplierFactory( String credentialFileUrl, @Nullable String cloudKmsProjectId, SerializableFunction jpaGetter) { + this(credentialFileUrl, cloudKmsProjectId, jpaGetter, null); + } + + public JpaSupplierFactory( + String credentialFileUrl, + @Nullable String cloudKmsProjectId, + SerializableFunction jpaGetter, + @Nullable TransactionIsolationLevel isolationLevelOverride) { this.credentialFileUrl = credentialFileUrl; this.cloudKmsProjectId = cloudKmsProjectId; this.jpaGetter = jpaGetter; + this.isolationLevelOverride = isolationLevelOverride; } @Override public JpaTransactionManager get() { return jpaGetter.apply( DaggerBeamJpaModule_JpaTransactionManagerComponent.builder() - .beamJpaModule(new BeamJpaModule(credentialFileUrl, cloudKmsProjectId)) + .beamJpaModule( + new BeamJpaModule(credentialFileUrl, cloudKmsProjectId, isolationLevelOverride)) .build()); } } diff --git a/core/src/main/java/google/registry/beam/initsql/Transforms.java b/core/src/main/java/google/registry/beam/initsql/Transforms.java index 4416116e1..68fad974c 100644 --- a/core/src/main/java/google/registry/beam/initsql/Transforms.java +++ b/core/src/main/java/google/registry/beam/initsql/Transforms.java @@ -17,11 +17,9 @@ package google.registry.beam.initsql; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; -import static com.google.common.base.Throwables.throwIfUnchecked; import static google.registry.beam.initsql.BackupPaths.getCommitLogTimestamp; import static google.registry.beam.initsql.BackupPaths.getExportFilePatterns; import static google.registry.model.ofy.ObjectifyService.ofy; -import static google.registry.persistence.JpaRetries.isFailedTxnRetriable; import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; import static google.registry.persistence.transaction.TransactionManagerFactory.setJpaTm; import static google.registry.util.DateTimeUtils.START_OF_TIME; @@ -38,27 +36,34 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Streams; +import com.googlecode.objectify.Key; import google.registry.backup.AppEngineEnvironment; import google.registry.backup.CommitLogImports; import google.registry.backup.VersionedEntity; import google.registry.model.domain.DomainBase; import google.registry.model.ofy.ObjectifyService; +import google.registry.model.reporting.HistoryEntry; import google.registry.persistence.transaction.JpaTransactionManager; +import google.registry.schema.replay.DatastoreAndSqlEntity; +import google.registry.schema.replay.SqlEntity; import google.registry.tools.LevelDbLogReader; -import google.registry.util.SystemSleeper; import java.io.Serializable; import java.util.Collection; import java.util.Iterator; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.ThreadLocalRandom; import java.util.function.Supplier; +import javax.annotation.Nullable; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.Compression; import org.apache.beam.sdk.io.FileIO; import org.apache.beam.sdk.io.FileIO.ReadableFile; import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; @@ -78,7 +83,6 @@ import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.TypeDescriptor; import org.joda.time.DateTime; -import org.joda.time.Duration; /** * {@link PTransform Pipeline transforms} used in pipelines that load from both Datastore export @@ -288,7 +292,7 @@ public final class Transforms { maxWriters, batchSize, jpaSupplier, - (e) -> ofy().toPojo(e.getEntity().get()), + Transforms::convertVersionedEntityToSqlEntity, TypeDescriptor.of(VersionedEntity.class)); } @@ -329,11 +333,50 @@ public final class Transforms { .apply("Batch output by shard " + transformId, GroupIntoBatches.ofSize(batchSize)) .apply( "Write in batch for " + transformId, - ParDo.of(new SqlBatchWriter(jpaSupplier, jpaConverter))); + ParDo.of(new SqlBatchWriter(transformId, jpaSupplier, jpaConverter))); } }; } + private static Key toOfyKey(Object ofyEntity) { + return Key.create(ofyEntity); + } + + private static boolean isMigratable(Entity entity) { + if (entity.getKind().equals("HistoryEntry")) { + // DOMAIN_APPLICATION_CREATE is deprecated type and should not be migrated. + // The Enum name DOMAIN_APPLICATION_CREATE no longer exists in Java and cannot + // be deserialized. + return !Objects.equals(entity.getProperty("type"), "DOMAIN_APPLICATION_CREATE"); + } + return true; + } + + private static SqlEntity toSqlEntity(Object ofyEntity) { + if (ofyEntity instanceof HistoryEntry) { + HistoryEntry ofyHistory = (HistoryEntry) ofyEntity; + return (SqlEntity) ofyHistory.toChildHistoryEntity(); + } + return ((DatastoreAndSqlEntity) ofyEntity).toSqlEntity().get(); + } + + /** + * Converts a {@link VersionedEntity} to an JPA entity for persistence. + * + * @return An object to be persisted to SQL, or null if the input is not to be migrated. (Not + * using Optional in return because as a one-use method, we do not want to invest the effort + * to make Optional work with BEAM) + */ + @Nullable + private static Object convertVersionedEntityToSqlEntity(VersionedEntity dsEntity) { + return dsEntity + .getEntity() + .filter(Transforms::isMigratable) + .map(e -> ofy().toPojo(e)) + .map(Transforms::toSqlEntity) + .orElse(null); + } + /** Interface for serializable {@link Supplier suppliers}. */ public interface SerializableSupplier extends Supplier, Serializable {} @@ -428,22 +471,22 @@ public final class Transforms { private static int instanceCount = 0; private static JpaTransactionManager originalJpa; + private Counter counter; + private final SerializableSupplier jpaSupplier; private final SerializableFunction jpaConverter; - private transient SystemSleeper sleeper; - SqlBatchWriter( + String type, SerializableSupplier jpaSupplier, SerializableFunction jpaConverter) { + counter = Metrics.counter("SQL_WRITE", type); this.jpaSupplier = jpaSupplier; this.jpaConverter = jpaConverter; } @Setup public void setup() { - sleeper = new SystemSleeper(); - try (AppEngineEnvironment env = new AppEngineEnvironment()) { ObjectifyService.initOfy(); } @@ -474,31 +517,29 @@ public final class Transforms { ImmutableList ofyEntities = Streams.stream(kv.getValue()) .map(this.jpaConverter::apply) + // TODO(b/177340730): post migration delete the line below. + .filter(Objects::nonNull) .collect(ImmutableList.toImmutableList()); - retry(() -> jpaTm().transact(() -> jpaTm().putAll(ofyEntities))); + try { + jpaTm().transact(() -> jpaTm().putAll(ofyEntities)); + counter.inc(ofyEntities.size()); + } catch (RuntimeException e) { + processSingly(ofyEntities); + } } } - // TODO(b/160632289): Enhance Retrier and use it here. - private void retry(Runnable runnable) { - int maxAttempts = 5; - int initialDelayMillis = 100; - double jitterRatio = 0.2; - - for (int attempt = 0; attempt < maxAttempts; attempt++) { + /** + * Writes entities in a failed batch one by one to identify the first bad entity and throws a + * {@link RuntimeException} on it. + */ + private void processSingly(ImmutableList ofyEntities) { + for (Object ofyEntity : ofyEntities) { try { - runnable.run(); - return; - } catch (Throwable throwable) { - if (!isFailedTxnRetriable(throwable)) { - throwIfUnchecked(throwable); - throw new RuntimeException(throwable); - } - int sleepMillis = (1 << attempt) * initialDelayMillis; - int jitter = - ThreadLocalRandom.current().nextInt((int) (sleepMillis * jitterRatio)) - - (int) (sleepMillis * jitterRatio / 2); - sleeper.sleepUninterruptibly(Duration.millis(sleepMillis + jitter)); + jpaTm().transact(() -> jpaTm().put(ofyEntity)); + counter.inc(); + } catch (RuntimeException e) { + throw new RuntimeException(toOfyKey(ofyEntity).toString(), e); } } } diff --git a/core/src/test/java/google/registry/beam/initsql/DomainBaseUtilTest.java b/core/src/test/java/google/registry/beam/initsql/DomainBaseUtilTest.java index ce83380c4..301994c69 100644 --- a/core/src/test/java/google/registry/beam/initsql/DomainBaseUtilTest.java +++ b/core/src/test/java/google/registry/beam/initsql/DomainBaseUtilTest.java @@ -179,6 +179,7 @@ public class DomainBaseUtilTest { .setNameservers(ImmutableSet.of()) .setDeletePollMessage(null) .setTransferData(null) + .setGracePeriods(ImmutableSet.of()) .build(); DomainBase domainTransformedByUtil = (DomainBase) ofy().toPojo(DomainBaseUtil.removeBillingAndPollAndHosts(domainEntity)); @@ -199,6 +200,7 @@ public class DomainBaseUtilTest { .setNameservers(ImmutableSet.of()) .setDeletePollMessage(null) .setTransferData(null) + .setGracePeriods(ImmutableSet.of()) .build(); Entity entityWithoutFkeys = tm().transact(() -> ofy().toEntity(domainWithoutFKeys)); DomainBase domainTransformedByUtil = diff --git a/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineTest.java b/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineTest.java index 6452f8b51..779cc3773 100644 --- a/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineTest.java +++ b/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineTest.java @@ -17,9 +17,12 @@ package google.registry.beam.initsql; import static com.google.common.truth.Truth.assertThat; import static google.registry.model.ImmutableObjectSubject.assertAboutImmutableObjects; import static google.registry.model.ImmutableObjectSubject.immutableObjectCorrespondence; +import static google.registry.model.domain.token.AllocationToken.TokenType.SINGLE_USE; import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; import static google.registry.testing.DatabaseHelper.newRegistry; import static google.registry.testing.DatabaseHelper.persistResource; +import static google.registry.testing.DatabaseHelper.persistSimpleResource; +import static google.registry.util.DateTimeUtils.END_OF_TIME; import static google.registry.util.DateTimeUtils.START_OF_TIME; import com.google.common.collect.ImmutableList; @@ -27,7 +30,10 @@ import com.google.common.collect.ImmutableSet; import com.googlecode.objectify.Key; import google.registry.backup.AppEngineEnvironment; import google.registry.beam.TestPipelineExtension; +import google.registry.flows.domain.DomainFlowUtils; import google.registry.model.billing.BillingEvent; +import google.registry.model.billing.BillingEvent.Flag; +import google.registry.model.billing.BillingEvent.Reason; import google.registry.model.contact.ContactResource; import google.registry.model.domain.DesignatedContact; import google.registry.model.domain.DomainAuthInfo; @@ -36,6 +42,7 @@ import google.registry.model.domain.GracePeriod; import google.registry.model.domain.launch.LaunchNotice; import google.registry.model.domain.rgp.GracePeriodStatus; import google.registry.model.domain.secdns.DelegationSignerData; +import google.registry.model.domain.token.AllocationToken; import google.registry.model.eppcommon.AuthInfo.PasswordAuth; import google.registry.model.eppcommon.StatusValue; import google.registry.model.eppcommon.Trid; @@ -43,6 +50,7 @@ import google.registry.model.host.HostResource; import google.registry.model.ofy.Ofy; import google.registry.model.poll.PollMessage; import google.registry.model.registrar.Registrar; +import google.registry.model.registrar.RegistrarContact; import google.registry.model.registry.Registry; import google.registry.model.reporting.HistoryEntry; import google.registry.model.transfer.DomainTransferData; @@ -58,6 +66,7 @@ import java.io.File; import java.nio.file.Files; import java.nio.file.Path; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.joda.money.Money; import org.joda.time.DateTime; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Order; @@ -69,14 +78,24 @@ import org.junit.jupiter.api.io.TempDir; class InitSqlPipelineTest { private static final DateTime START_TIME = DateTime.parse("2000-01-01T00:00:00.0Z"); + /** + * All kinds of entities to be set up in the Datastore. Must contain all kinds known to {@link + * InitSqlPipeline}. + */ private static final ImmutableList> ALL_KINDS = ImmutableList.of( Registry.class, Registrar.class, ContactResource.class, - HostResource.class, + RegistrarContact.class, DomainBase.class, - HistoryEntry.class); + HostResource.class, + HistoryEntry.class, + AllocationToken.class, + BillingEvent.Recurring.class, + BillingEvent.OneTime.class, + BillingEvent.Cancellation.class, + PollMessage.class); private transient FakeClock fakeClock = new FakeClock(START_TIME); @@ -152,16 +171,75 @@ class InitSqlPipelineTest { .setCreationClientId(registrar1.getClientId()) .setPersistedCurrentSponsorClientId(registrar1.getClientId()) .build()); - historyEntry = persistResource(new HistoryEntry.Builder().setParent(domainKey).build()); + persistSimpleResource( + new RegistrarContact.Builder() + .setParent(registrar1) + .setName("John Abused") + .setEmailAddress("johnabuse@example.com") + .setVisibleInWhoisAsAdmin(true) + .setVisibleInWhoisAsTech(false) + .setPhoneNumber("+1.2125551213") + .setFaxNumber("+1.2125551213") + .setTypes(ImmutableSet.of(RegistrarContact.Type.ABUSE, RegistrarContact.Type.ADMIN)) + .build()); + historyEntry = + persistResource( + new HistoryEntry.Builder() + .setParent(domainKey) + .setModificationTime(fakeClock.nowUtc()) + .setType(HistoryEntry.Type.DOMAIN_CREATE) + .build()); + persistResource( + new AllocationToken.Builder().setToken("abc123").setTokenType(SINGLE_USE).build()); Key historyEntryKey = Key.create(historyEntry); - Key oneTimeBillKey = - Key.create(historyEntryKey, BillingEvent.OneTime.class, 1); - VKey recurringBillKey = - VKey.from(Key.create(historyEntryKey, BillingEvent.Recurring.class, 2)); - VKey autorenewPollKey = - VKey.from(Key.create(historyEntryKey, PollMessage.Autorenew.class, 3)); - VKey onetimePollKey = - VKey.from(Key.create(historyEntryKey, PollMessage.OneTime.class, 1)); + BillingEvent.OneTime onetimeBillEvent = + new BillingEvent.OneTime.Builder() + .setId(1) + .setReason(Reason.RENEW) + .setTargetId("example.com") + .setClientId("TheRegistrar") + .setCost(Money.parse("USD 44.00")) + .setPeriodYears(4) + .setEventTime(fakeClock.nowUtc()) + .setBillingTime(fakeClock.nowUtc()) + .setParent(historyEntryKey) + .build(); + persistResource(onetimeBillEvent); + Key oneTimeBillKey = Key.create(onetimeBillEvent); + BillingEvent.Recurring recurringBillEvent = + new BillingEvent.Recurring.Builder() + .setId(2) + .setReason(Reason.RENEW) + .setFlags(ImmutableSet.of(Flag.AUTO_RENEW)) + .setTargetId("example.com") + .setClientId("TheRegistrar") + .setEventTime(fakeClock.nowUtc()) + .setRecurrenceEndTime(END_OF_TIME) + .setParent(historyEntryKey) + .build(); + persistResource(recurringBillEvent); + VKey recurringBillKey = recurringBillEvent.createVKey(); + PollMessage.Autorenew autorenewPollMessage = + new PollMessage.Autorenew.Builder() + .setId(3L) + .setTargetId("example.com") + .setClientId("TheRegistrar") + .setEventTime(fakeClock.nowUtc()) + .setMsg("Domain was auto-renewed.") + .setParent(historyEntry) + .build(); + persistResource(autorenewPollMessage); + VKey autorenewPollKey = autorenewPollMessage.createVKey(); + PollMessage.OneTime oneTimePollMessage = + new PollMessage.OneTime.Builder() + .setId(1L) + .setParent(historyEntry) + .setEventTime(fakeClock.nowUtc()) + .setClientId("TheRegistrar") + .setMsg(DomainFlowUtils.COLLISION_MESSAGE) + .build(); + persistResource(oneTimePollMessage); + VKey onetimePollKey = oneTimePollMessage.createVKey(); domain = persistResource( new DomainBase.Builder() @@ -220,6 +298,16 @@ class InitSqlPipelineTest { "TheRegistrar", null)) .build()); + persistResource( + new BillingEvent.Cancellation.Builder() + .setReason(Reason.RENEW) + .setTargetId(domain.getDomainName()) + .setClientId(domain.getCurrentSponsorClientId()) + .setEventTime(fakeClock.nowUtc()) + .setBillingTime(fakeClock.nowUtc()) + .setRecurringEventKey(recurringBillEvent.createVKey()) + .setParent(historyEntryKey) + .build()); exportDir = store.export(exportRootDir.getAbsolutePath(), ALL_KINDS, ImmutableSet.of()); commitLogDir = Files.createDirectory(tmpDir.resolve("commits")).toFile(); } @@ -248,8 +336,7 @@ class InitSqlPipelineTest { assertThat(jpaTm().transact(() -> jpaTm().loadAllOf(ContactResource.class))) .comparingElementsUsing(immutableObjectCorrespondence("revisions", "updateTimestamp")) .containsExactly(contact1, contact2); - assertCleansedDomainEquals( - jpaTm().transact(() -> jpaTm().loadByKey(domain.createVKey())), domain); + assertDomainEquals(jpaTm().transact(() -> jpaTm().loadByKey(domain.createVKey())), domain); } } @@ -261,29 +348,26 @@ class InitSqlPipelineTest { .isEqualTo(expected.getSuperordinateDomain().getSqlKey()); } - private static void assertCleansedDomainEquals(DomainBase actual, DomainBase expected) { + private static void assertDomainEquals(DomainBase actual, DomainBase expected) { assertAboutImmutableObjects() .that(actual) .isEqualExceptFields( expected, - "adminContact", - "registrantContact", - "gracePeriods", - "dsData", - "allContacts", "revisions", "updateTimestamp", - "autorenewBillingEvent", - "autorenewBillingEventHistoryId", "autorenewPollMessage", - "autorenewPollMessageHistoryId", "deletePollMessage", - "deletePollMessageHistoryId", "nsHosts", + "gracePeriods", "transferData"); assertThat(actual.getAdminContact().getSqlKey()) .isEqualTo(expected.getAdminContact().getSqlKey()); assertThat(actual.getRegistrant().getSqlKey()).isEqualTo(expected.getRegistrant().getSqlKey()); - // TODO(weiminyu): compare gracePeriods, allContacts and dsData, when SQL model supports them. + assertThat(actual.getNsHosts()).isEqualTo(expected.getNsHosts()); + assertThat(actual.getAutorenewPollMessage().getOfyKey()) + .isEqualTo(expected.getAutorenewPollMessage().getOfyKey()); + assertThat(actual.getDeletePollMessage().getOfyKey()) + .isEqualTo(expected.getDeletePollMessage().getOfyKey()); + // TODO(weiminyu): check gracePeriods and transferData when it is easier to do } } diff --git a/core/src/test/resources/google/registry/beam/initsql/pipeline_golden.dot b/core/src/test/resources/google/registry/beam/initsql/pipeline_golden.dot index cbe670ae6..d9d516349 100644 --- a/core/src/test/resources/google/registry/beam/initsql/pipeline_golden.dot +++ b/core/src/test/resources/google/registry/beam/initsql/pipeline_golden.dot @@ -462,172 +462,172 @@ digraph { } } subgraph cluster_173 { - label = "Remove circular foreign keys from DomainBase" - 174 [label="ParMultiDo(RemoveDomainBaseForeignKeys)"] - 79 -> 174 [style=solid label=""] - } - subgraph cluster_175 { - label = "Wait on phase one" - subgraph cluster_176 { - label = "Wait on phase one/To wait view 0" + label = "Wait on Transforms:ContactResource" + subgraph cluster_174 { + label = "Wait on Transforms:ContactResource/To wait view 0" + subgraph cluster_175 { + label = "Wait on Transforms:ContactResource/To wait view 0/Window.Into()" + 176 [label="Flatten.PCollections"] + 172 -> 176 [style=solid label=""] + } subgraph cluster_177 { - label = "Wait on phase one/To wait view 0/Window.Into()" - 178 [label="Flatten.PCollections"] - 172 -> 178 [style=solid label=""] + label = "Wait on Transforms:ContactResource/To wait view 0/ParDo(CollectWindows)" + 178 [label="ParMultiDo(CollectWindows)"] + 176 -> 178 [style=solid label=""] } subgraph cluster_179 { - label = "Wait on phase one/To wait view 0/ParDo(CollectWindows)" - 180 [label="ParMultiDo(CollectWindows)"] - 178 -> 180 [style=solid label=""] - } - subgraph cluster_181 { - label = "Wait on phase one/To wait view 0/Sample.Any" - subgraph cluster_182 { - label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)" - subgraph cluster_183 { - label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys" - subgraph cluster_184 { - label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys" - subgraph cluster_185 { - label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys/Map" - 186 [label="ParMultiDo(Anonymous)"] - 180 -> 186 [style=solid label=""] + label = "Wait on Transforms:ContactResource/To wait view 0/Sample.Any" + subgraph cluster_180 { + label = "Wait on Transforms:ContactResource/To wait view 0/Sample.Any/Combine.globally(SampleAny)" + subgraph cluster_181 { + label = "Wait on Transforms:ContactResource/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys" + subgraph cluster_182 { + label = "Wait on Transforms:ContactResource/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys" + subgraph cluster_183 { + label = "Wait on Transforms:ContactResource/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys/Map" + 184 [label="ParMultiDo(Anonymous)"] + 178 -> 184 [style=solid label=""] } } } - subgraph cluster_187 { - label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)" - 188 [label="GroupByKey"] - 186 -> 188 [style=solid label=""] - subgraph cluster_189 { - label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues" - subgraph cluster_190 { - label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues/ParDo(Anonymous)" - 191 [label="ParMultiDo(Anonymous)"] - 188 -> 191 [style=solid label=""] + subgraph cluster_185 { + label = "Wait on Transforms:ContactResource/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)" + 186 [label="GroupByKey"] + 184 -> 186 [style=solid label=""] + subgraph cluster_187 { + label = "Wait on Transforms:ContactResource/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues" + subgraph cluster_188 { + label = "Wait on Transforms:ContactResource/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues/ParDo(Anonymous)" + 189 [label="ParMultiDo(Anonymous)"] + 186 -> 189 [style=solid label=""] } } } - subgraph cluster_192 { - label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values" - subgraph cluster_193 { - label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values" - subgraph cluster_194 { - label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values/Map" - 195 [label="ParMultiDo(Anonymous)"] - 191 -> 195 [style=solid label=""] + subgraph cluster_190 { + label = "Wait on Transforms:ContactResource/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values" + subgraph cluster_191 { + label = "Wait on Transforms:ContactResource/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values" + subgraph cluster_192 { + label = "Wait on Transforms:ContactResource/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values/Map" + 193 [label="ParMultiDo(Anonymous)"] + 189 -> 193 [style=solid label=""] } } } } - subgraph cluster_196 { - label = "Wait on phase one/To wait view 0/Sample.Any/Flatten.Iterables" - subgraph cluster_197 { - label = "Wait on phase one/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables" - subgraph cluster_198 { - label = "Wait on phase one/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables/FlatMap" - 199 [label="ParMultiDo(Anonymous)"] - 195 -> 199 [style=solid label=""] + subgraph cluster_194 { + label = "Wait on Transforms:ContactResource/To wait view 0/Sample.Any/Flatten.Iterables" + subgraph cluster_195 { + label = "Wait on Transforms:ContactResource/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables" + subgraph cluster_196 { + label = "Wait on Transforms:ContactResource/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables/FlatMap" + 197 [label="ParMultiDo(Anonymous)"] + 193 -> 197 [style=solid label=""] } } } } - subgraph cluster_200 { - label = "Wait on phase one/To wait view 0/View.AsList" - subgraph cluster_201 { - label = "Wait on phase one/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization" - subgraph cluster_202 { - label = "Wait on phase one/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)" - 203 [label="ParMultiDo(VoidKeyToMultimapMaterialization)"] - 199 -> 203 [style=solid label=""] + subgraph cluster_198 { + label = "Wait on Transforms:ContactResource/To wait view 0/View.AsList" + subgraph cluster_199 { + label = "Wait on Transforms:ContactResource/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization" + subgraph cluster_200 { + label = "Wait on Transforms:ContactResource/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)" + 201 [label="ParMultiDo(VoidKeyToMultimapMaterialization)"] + 197 -> 201 [style=solid label=""] } } - 204 [label="View.CreatePCollectionView"] - 203 -> 204 [style=solid label=""] + 202 [label="View.CreatePCollectionView"] + 201 -> 202 [style=solid label=""] } } - subgraph cluster_205 { - label = "Wait on phase one/Wait" - subgraph cluster_206 { - label = "Wait on phase one/Wait/Map" - 207 [label="ParMultiDo(Anonymous)"] - 174 -> 207 [style=solid label=""] - 203 -> 207 [style=dashed label=""] + subgraph cluster_203 { + label = "Wait on Transforms:ContactResource/Wait" + subgraph cluster_204 { + label = "Wait on Transforms:ContactResource/Wait/Map" + 205 [label="ParMultiDo(Anonymous)"] + 79 -> 205 [style=solid label=""] + 201 -> 205 [style=dashed label=""] } } } - subgraph cluster_208 { - label = "Write to sql: DomainBase without circular foreign keys" - subgraph cluster_209 { - label = "Write to sql: DomainBase without circular foreign keys/Shard data for DomainBase without circular foreign keys" - subgraph cluster_210 { - label = "Write to sql: DomainBase without circular foreign keys/Shard data for DomainBase without circular foreign keys/Map" - 211 [label="ParMultiDo(Anonymous)"] - 207 -> 211 [style=solid label=""] + subgraph cluster_206 { + label = "Write to sql: Transforms:RegistrarContact" + subgraph cluster_207 { + label = "Write to sql: Transforms:RegistrarContact/Shard data for Transforms:RegistrarContact" + subgraph cluster_208 { + label = "Write to sql: Transforms:RegistrarContact/Shard data for Transforms:RegistrarContact/Map" + 209 [label="ParMultiDo(Anonymous)"] + 205 -> 209 [style=solid label=""] } } - subgraph cluster_212 { - label = "Write to sql: DomainBase without circular foreign keys/Batch output by shard DomainBase without circular foreign keys" - subgraph cluster_213 { - label = "Write to sql: DomainBase without circular foreign keys/Batch output by shard DomainBase without circular foreign keys/ParDo(GroupIntoBatches)" - 214 [label="ParMultiDo(GroupIntoBatches)"] - 211 -> 214 [style=solid label=""] + subgraph cluster_210 { + label = "Write to sql: Transforms:RegistrarContact/Batch output by shard Transforms:RegistrarContact" + subgraph cluster_211 { + label = "Write to sql: Transforms:RegistrarContact/Batch output by shard Transforms:RegistrarContact/ParDo(GroupIntoBatches)" + 212 [label="ParMultiDo(GroupIntoBatches)"] + 209 -> 212 [style=solid label=""] } } - subgraph cluster_215 { - label = "Write to sql: DomainBase without circular foreign keys/Write in batch for DomainBase without circular foreign keys" - 216 [label="ParMultiDo(SqlBatchWriter)"] - 214 -> 216 [style=solid label=""] + subgraph cluster_213 { + label = "Write to sql: Transforms:RegistrarContact/Write in batch for Transforms:RegistrarContact" + 214 [label="ParMultiDo(SqlBatchWriter)"] + 212 -> 214 [style=solid label=""] } } + subgraph cluster_215 { + label = "Remove circular foreign keys from DomainBase" + 216 [label="ParMultiDo(RemoveDomainBaseForeignKeys)"] + 79 -> 216 [style=solid label=""] + } subgraph cluster_217 { - label = "Wait on DomainBaseNoFkeys" + label = "Wait on phase one" subgraph cluster_218 { - label = "Wait on DomainBaseNoFkeys/To wait view 0" + label = "Wait on phase one/To wait view 0" subgraph cluster_219 { - label = "Wait on DomainBaseNoFkeys/To wait view 0/Window.Into()" + label = "Wait on phase one/To wait view 0/Window.Into()" 220 [label="Flatten.PCollections"] - 216 -> 220 [style=solid label=""] + 214 -> 220 [style=solid label=""] } subgraph cluster_221 { - label = "Wait on DomainBaseNoFkeys/To wait view 0/ParDo(CollectWindows)" + label = "Wait on phase one/To wait view 0/ParDo(CollectWindows)" 222 [label="ParMultiDo(CollectWindows)"] 220 -> 222 [style=solid label=""] } subgraph cluster_223 { - label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any" + label = "Wait on phase one/To wait view 0/Sample.Any" subgraph cluster_224 { - label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)" + label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)" subgraph cluster_225 { - label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys" + label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys" subgraph cluster_226 { - label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys" + label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys" subgraph cluster_227 { - label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys/Map" + label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys/Map" 228 [label="ParMultiDo(Anonymous)"] 222 -> 228 [style=solid label=""] } } } subgraph cluster_229 { - label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)" + label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)" 230 [label="GroupByKey"] 228 -> 230 [style=solid label=""] subgraph cluster_231 { - label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues" + label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues" subgraph cluster_232 { - label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues/ParDo(Anonymous)" + label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues/ParDo(Anonymous)" 233 [label="ParMultiDo(Anonymous)"] 230 -> 233 [style=solid label=""] } } } subgraph cluster_234 { - label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values" + label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values" subgraph cluster_235 { - label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values" + label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values" subgraph cluster_236 { - label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values/Map" + label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values/Map" 237 [label="ParMultiDo(Anonymous)"] 233 -> 237 [style=solid label=""] } @@ -635,11 +635,11 @@ digraph { } } subgraph cluster_238 { - label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Flatten.Iterables" + label = "Wait on phase one/To wait view 0/Sample.Any/Flatten.Iterables" subgraph cluster_239 { - label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables" + label = "Wait on phase one/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables" subgraph cluster_240 { - label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables/FlatMap" + label = "Wait on phase one/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables/FlatMap" 241 [label="ParMultiDo(Anonymous)"] 237 -> 241 [style=solid label=""] } @@ -647,11 +647,11 @@ digraph { } } subgraph cluster_242 { - label = "Wait on DomainBaseNoFkeys/To wait view 0/View.AsList" + label = "Wait on phase one/To wait view 0/View.AsList" subgraph cluster_243 { - label = "Wait on DomainBaseNoFkeys/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization" + label = "Wait on phase one/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization" subgraph cluster_244 { - label = "Wait on DomainBaseNoFkeys/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)" + label = "Wait on phase one/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)" 245 [label="ParMultiDo(VoidKeyToMultimapMaterialization)"] 241 -> 245 [style=solid label=""] } @@ -661,38 +661,950 @@ digraph { } } subgraph cluster_247 { - label = "Wait on DomainBaseNoFkeys/Wait" + label = "Wait on phase one/Wait" subgraph cluster_248 { - label = "Wait on DomainBaseNoFkeys/Wait/Map" + label = "Wait on phase one/Wait/Map" 249 [label="ParMultiDo(Anonymous)"] - 79 -> 249 [style=solid label=""] + 216 -> 249 [style=solid label=""] 245 -> 249 [style=dashed label=""] } } } subgraph cluster_250 { - label = "Write to sql: Transforms:HostResource" + label = "Write to sql: DomainBase without circular foreign keys" subgraph cluster_251 { - label = "Write to sql: Transforms:HostResource/Shard data for Transforms:HostResource" + label = "Write to sql: DomainBase without circular foreign keys/Shard data for DomainBase without circular foreign keys" subgraph cluster_252 { - label = "Write to sql: Transforms:HostResource/Shard data for Transforms:HostResource/Map" + label = "Write to sql: DomainBase without circular foreign keys/Shard data for DomainBase without circular foreign keys/Map" 253 [label="ParMultiDo(Anonymous)"] 249 -> 253 [style=solid label=""] } } subgraph cluster_254 { - label = "Write to sql: Transforms:HostResource/Batch output by shard Transforms:HostResource" + label = "Write to sql: DomainBase without circular foreign keys/Batch output by shard DomainBase without circular foreign keys" subgraph cluster_255 { - label = "Write to sql: Transforms:HostResource/Batch output by shard Transforms:HostResource/ParDo(GroupIntoBatches)" + label = "Write to sql: DomainBase without circular foreign keys/Batch output by shard DomainBase without circular foreign keys/ParDo(GroupIntoBatches)" 256 [label="ParMultiDo(GroupIntoBatches)"] 253 -> 256 [style=solid label=""] } } subgraph cluster_257 { - label = "Write to sql: Transforms:HostResource/Write in batch for Transforms:HostResource" + label = "Write to sql: DomainBase without circular foreign keys/Write in batch for DomainBase without circular foreign keys" 258 [label="ParMultiDo(SqlBatchWriter)"] 256 -> 258 [style=solid label=""] } } + subgraph cluster_259 { + label = "Wait on DomainBaseNoFkeys" + subgraph cluster_260 { + label = "Wait on DomainBaseNoFkeys/To wait view 0" + subgraph cluster_261 { + label = "Wait on DomainBaseNoFkeys/To wait view 0/Window.Into()" + 262 [label="Flatten.PCollections"] + 258 -> 262 [style=solid label=""] + } + subgraph cluster_263 { + label = "Wait on DomainBaseNoFkeys/To wait view 0/ParDo(CollectWindows)" + 264 [label="ParMultiDo(CollectWindows)"] + 262 -> 264 [style=solid label=""] + } + subgraph cluster_265 { + label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any" + subgraph cluster_266 { + label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)" + subgraph cluster_267 { + label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys" + subgraph cluster_268 { + label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys" + subgraph cluster_269 { + label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys/Map" + 270 [label="ParMultiDo(Anonymous)"] + 264 -> 270 [style=solid label=""] + } + } + } + subgraph cluster_271 { + label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)" + 272 [label="GroupByKey"] + 270 -> 272 [style=solid label=""] + subgraph cluster_273 { + label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues" + subgraph cluster_274 { + label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues/ParDo(Anonymous)" + 275 [label="ParMultiDo(Anonymous)"] + 272 -> 275 [style=solid label=""] + } + } + } + subgraph cluster_276 { + label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values" + subgraph cluster_277 { + label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values" + subgraph cluster_278 { + label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values/Map" + 279 [label="ParMultiDo(Anonymous)"] + 275 -> 279 [style=solid label=""] + } + } + } + } + subgraph cluster_280 { + label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Flatten.Iterables" + subgraph cluster_281 { + label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables" + subgraph cluster_282 { + label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables/FlatMap" + 283 [label="ParMultiDo(Anonymous)"] + 279 -> 283 [style=solid label=""] + } + } + } + } + subgraph cluster_284 { + label = "Wait on DomainBaseNoFkeys/To wait view 0/View.AsList" + subgraph cluster_285 { + label = "Wait on DomainBaseNoFkeys/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization" + subgraph cluster_286 { + label = "Wait on DomainBaseNoFkeys/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)" + 287 [label="ParMultiDo(VoidKeyToMultimapMaterialization)"] + 283 -> 287 [style=solid label=""] + } + } + 288 [label="View.CreatePCollectionView"] + 287 -> 288 [style=solid label=""] + } + } + subgraph cluster_289 { + label = "Wait on DomainBaseNoFkeys/Wait" + subgraph cluster_290 { + label = "Wait on DomainBaseNoFkeys/Wait/Map" + 291 [label="ParMultiDo(Anonymous)"] + 79 -> 291 [style=solid label=""] + 287 -> 291 [style=dashed label=""] + } + } + } + subgraph cluster_292 { + label = "Write to sql: Transforms:HostResource" + subgraph cluster_293 { + label = "Write to sql: Transforms:HostResource/Shard data for Transforms:HostResource" + subgraph cluster_294 { + label = "Write to sql: Transforms:HostResource/Shard data for Transforms:HostResource/Map" + 295 [label="ParMultiDo(Anonymous)"] + 291 -> 295 [style=solid label=""] + } + } + subgraph cluster_296 { + label = "Write to sql: Transforms:HostResource/Batch output by shard Transforms:HostResource" + subgraph cluster_297 { + label = "Write to sql: Transforms:HostResource/Batch output by shard Transforms:HostResource/ParDo(GroupIntoBatches)" + 298 [label="ParMultiDo(GroupIntoBatches)"] + 295 -> 298 [style=solid label=""] + } + } + subgraph cluster_299 { + label = "Write to sql: Transforms:HostResource/Write in batch for Transforms:HostResource" + 300 [label="ParMultiDo(SqlBatchWriter)"] + 298 -> 300 [style=solid label=""] + } + } + subgraph cluster_301 { + label = "Wait on Transforms:HostResource" + subgraph cluster_302 { + label = "Wait on Transforms:HostResource/To wait view 0" + subgraph cluster_303 { + label = "Wait on Transforms:HostResource/To wait view 0/Window.Into()" + 304 [label="Flatten.PCollections"] + 300 -> 304 [style=solid label=""] + } + subgraph cluster_305 { + label = "Wait on Transforms:HostResource/To wait view 0/ParDo(CollectWindows)" + 306 [label="ParMultiDo(CollectWindows)"] + 304 -> 306 [style=solid label=""] + } + subgraph cluster_307 { + label = "Wait on Transforms:HostResource/To wait view 0/Sample.Any" + subgraph cluster_308 { + label = "Wait on Transforms:HostResource/To wait view 0/Sample.Any/Combine.globally(SampleAny)" + subgraph cluster_309 { + label = "Wait on Transforms:HostResource/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys" + subgraph cluster_310 { + label = "Wait on Transforms:HostResource/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys" + subgraph cluster_311 { + label = "Wait on Transforms:HostResource/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys/Map" + 312 [label="ParMultiDo(Anonymous)"] + 306 -> 312 [style=solid label=""] + } + } + } + subgraph cluster_313 { + label = "Wait on Transforms:HostResource/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)" + 314 [label="GroupByKey"] + 312 -> 314 [style=solid label=""] + subgraph cluster_315 { + label = "Wait on Transforms:HostResource/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues" + subgraph cluster_316 { + label = "Wait on Transforms:HostResource/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues/ParDo(Anonymous)" + 317 [label="ParMultiDo(Anonymous)"] + 314 -> 317 [style=solid label=""] + } + } + } + subgraph cluster_318 { + label = "Wait on Transforms:HostResource/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values" + subgraph cluster_319 { + label = "Wait on Transforms:HostResource/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values" + subgraph cluster_320 { + label = "Wait on Transforms:HostResource/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values/Map" + 321 [label="ParMultiDo(Anonymous)"] + 317 -> 321 [style=solid label=""] + } + } + } + } + subgraph cluster_322 { + label = "Wait on Transforms:HostResource/To wait view 0/Sample.Any/Flatten.Iterables" + subgraph cluster_323 { + label = "Wait on Transforms:HostResource/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables" + subgraph cluster_324 { + label = "Wait on Transforms:HostResource/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables/FlatMap" + 325 [label="ParMultiDo(Anonymous)"] + 321 -> 325 [style=solid label=""] + } + } + } + } + subgraph cluster_326 { + label = "Wait on Transforms:HostResource/To wait view 0/View.AsList" + subgraph cluster_327 { + label = "Wait on Transforms:HostResource/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization" + subgraph cluster_328 { + label = "Wait on Transforms:HostResource/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)" + 329 [label="ParMultiDo(VoidKeyToMultimapMaterialization)"] + 325 -> 329 [style=solid label=""] + } + } + 330 [label="View.CreatePCollectionView"] + 329 -> 330 [style=solid label=""] + } + } + subgraph cluster_331 { + label = "Wait on Transforms:HostResource/Wait" + subgraph cluster_332 { + label = "Wait on Transforms:HostResource/Wait/Map" + 333 [label="ParMultiDo(Anonymous)"] + 79 -> 333 [style=solid label=""] + 329 -> 333 [style=dashed label=""] + } + } + } + subgraph cluster_334 { + label = "Write to sql: Transforms:HistoryEntry" + subgraph cluster_335 { + label = "Write to sql: Transforms:HistoryEntry/Shard data for Transforms:HistoryEntry" + subgraph cluster_336 { + label = "Write to sql: Transforms:HistoryEntry/Shard data for Transforms:HistoryEntry/Map" + 337 [label="ParMultiDo(Anonymous)"] + 333 -> 337 [style=solid label=""] + } + } + subgraph cluster_338 { + label = "Write to sql: Transforms:HistoryEntry/Batch output by shard Transforms:HistoryEntry" + subgraph cluster_339 { + label = "Write to sql: Transforms:HistoryEntry/Batch output by shard Transforms:HistoryEntry/ParDo(GroupIntoBatches)" + 340 [label="ParMultiDo(GroupIntoBatches)"] + 337 -> 340 [style=solid label=""] + } + } + subgraph cluster_341 { + label = "Write to sql: Transforms:HistoryEntry/Write in batch for Transforms:HistoryEntry" + 342 [label="ParMultiDo(SqlBatchWriter)"] + 340 -> 342 [style=solid label=""] + } + } + subgraph cluster_343 { + label = "Wait on Transforms:HistoryEntry" + subgraph cluster_344 { + label = "Wait on Transforms:HistoryEntry/To wait view 0" + subgraph cluster_345 { + label = "Wait on Transforms:HistoryEntry/To wait view 0/Window.Into()" + 346 [label="Flatten.PCollections"] + 342 -> 346 [style=solid label=""] + } + subgraph cluster_347 { + label = "Wait on Transforms:HistoryEntry/To wait view 0/ParDo(CollectWindows)" + 348 [label="ParMultiDo(CollectWindows)"] + 346 -> 348 [style=solid label=""] + } + subgraph cluster_349 { + label = "Wait on Transforms:HistoryEntry/To wait view 0/Sample.Any" + subgraph cluster_350 { + label = "Wait on Transforms:HistoryEntry/To wait view 0/Sample.Any/Combine.globally(SampleAny)" + subgraph cluster_351 { + label = "Wait on Transforms:HistoryEntry/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys" + subgraph cluster_352 { + label = "Wait on Transforms:HistoryEntry/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys" + subgraph cluster_353 { + label = "Wait on Transforms:HistoryEntry/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys/Map" + 354 [label="ParMultiDo(Anonymous)"] + 348 -> 354 [style=solid label=""] + } + } + } + subgraph cluster_355 { + label = "Wait on Transforms:HistoryEntry/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)" + 356 [label="GroupByKey"] + 354 -> 356 [style=solid label=""] + subgraph cluster_357 { + label = "Wait on Transforms:HistoryEntry/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues" + subgraph cluster_358 { + label = "Wait on Transforms:HistoryEntry/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues/ParDo(Anonymous)" + 359 [label="ParMultiDo(Anonymous)"] + 356 -> 359 [style=solid label=""] + } + } + } + subgraph cluster_360 { + label = "Wait on Transforms:HistoryEntry/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values" + subgraph cluster_361 { + label = "Wait on Transforms:HistoryEntry/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values" + subgraph cluster_362 { + label = "Wait on Transforms:HistoryEntry/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values/Map" + 363 [label="ParMultiDo(Anonymous)"] + 359 -> 363 [style=solid label=""] + } + } + } + } + subgraph cluster_364 { + label = "Wait on Transforms:HistoryEntry/To wait view 0/Sample.Any/Flatten.Iterables" + subgraph cluster_365 { + label = "Wait on Transforms:HistoryEntry/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables" + subgraph cluster_366 { + label = "Wait on Transforms:HistoryEntry/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables/FlatMap" + 367 [label="ParMultiDo(Anonymous)"] + 363 -> 367 [style=solid label=""] + } + } + } + } + subgraph cluster_368 { + label = "Wait on Transforms:HistoryEntry/To wait view 0/View.AsList" + subgraph cluster_369 { + label = "Wait on Transforms:HistoryEntry/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization" + subgraph cluster_370 { + label = "Wait on Transforms:HistoryEntry/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)" + 371 [label="ParMultiDo(VoidKeyToMultimapMaterialization)"] + 367 -> 371 [style=solid label=""] + } + } + 372 [label="View.CreatePCollectionView"] + 371 -> 372 [style=solid label=""] + } + } + subgraph cluster_373 { + label = "Wait on Transforms:HistoryEntry/Wait" + subgraph cluster_374 { + label = "Wait on Transforms:HistoryEntry/Wait/Map" + 375 [label="ParMultiDo(Anonymous)"] + 79 -> 375 [style=solid label=""] + 371 -> 375 [style=dashed label=""] + } + } + } + subgraph cluster_376 { + label = "Write to sql: Transforms:AllocationToken" + subgraph cluster_377 { + label = "Write to sql: Transforms:AllocationToken/Shard data for Transforms:AllocationToken" + subgraph cluster_378 { + label = "Write to sql: Transforms:AllocationToken/Shard data for Transforms:AllocationToken/Map" + 379 [label="ParMultiDo(Anonymous)"] + 375 -> 379 [style=solid label=""] + } + } + subgraph cluster_380 { + label = "Write to sql: Transforms:AllocationToken/Batch output by shard Transforms:AllocationToken" + subgraph cluster_381 { + label = "Write to sql: Transforms:AllocationToken/Batch output by shard Transforms:AllocationToken/ParDo(GroupIntoBatches)" + 382 [label="ParMultiDo(GroupIntoBatches)"] + 379 -> 382 [style=solid label=""] + } + } + subgraph cluster_383 { + label = "Write to sql: Transforms:AllocationToken/Write in batch for Transforms:AllocationToken" + 384 [label="ParMultiDo(SqlBatchWriter)"] + 382 -> 384 [style=solid label=""] + } + } + subgraph cluster_385 { + label = "Wait on Transforms:AllocationToken" + subgraph cluster_386 { + label = "Wait on Transforms:AllocationToken/To wait view 0" + subgraph cluster_387 { + label = "Wait on Transforms:AllocationToken/To wait view 0/Window.Into()" + 388 [label="Flatten.PCollections"] + 384 -> 388 [style=solid label=""] + } + subgraph cluster_389 { + label = "Wait on Transforms:AllocationToken/To wait view 0/ParDo(CollectWindows)" + 390 [label="ParMultiDo(CollectWindows)"] + 388 -> 390 [style=solid label=""] + } + subgraph cluster_391 { + label = "Wait on Transforms:AllocationToken/To wait view 0/Sample.Any" + subgraph cluster_392 { + label = "Wait on Transforms:AllocationToken/To wait view 0/Sample.Any/Combine.globally(SampleAny)" + subgraph cluster_393 { + label = "Wait on Transforms:AllocationToken/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys" + subgraph cluster_394 { + label = "Wait on Transforms:AllocationToken/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys" + subgraph cluster_395 { + label = "Wait on Transforms:AllocationToken/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys/Map" + 396 [label="ParMultiDo(Anonymous)"] + 390 -> 396 [style=solid label=""] + } + } + } + subgraph cluster_397 { + label = "Wait on Transforms:AllocationToken/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)" + 398 [label="GroupByKey"] + 396 -> 398 [style=solid label=""] + subgraph cluster_399 { + label = "Wait on Transforms:AllocationToken/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues" + subgraph cluster_400 { + label = "Wait on Transforms:AllocationToken/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues/ParDo(Anonymous)" + 401 [label="ParMultiDo(Anonymous)"] + 398 -> 401 [style=solid label=""] + } + } + } + subgraph cluster_402 { + label = "Wait on Transforms:AllocationToken/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values" + subgraph cluster_403 { + label = "Wait on Transforms:AllocationToken/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values" + subgraph cluster_404 { + label = "Wait on Transforms:AllocationToken/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values/Map" + 405 [label="ParMultiDo(Anonymous)"] + 401 -> 405 [style=solid label=""] + } + } + } + } + subgraph cluster_406 { + label = "Wait on Transforms:AllocationToken/To wait view 0/Sample.Any/Flatten.Iterables" + subgraph cluster_407 { + label = "Wait on Transforms:AllocationToken/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables" + subgraph cluster_408 { + label = "Wait on Transforms:AllocationToken/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables/FlatMap" + 409 [label="ParMultiDo(Anonymous)"] + 405 -> 409 [style=solid label=""] + } + } + } + } + subgraph cluster_410 { + label = "Wait on Transforms:AllocationToken/To wait view 0/View.AsList" + subgraph cluster_411 { + label = "Wait on Transforms:AllocationToken/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization" + subgraph cluster_412 { + label = "Wait on Transforms:AllocationToken/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)" + 413 [label="ParMultiDo(VoidKeyToMultimapMaterialization)"] + 409 -> 413 [style=solid label=""] + } + } + 414 [label="View.CreatePCollectionView"] + 413 -> 414 [style=solid label=""] + } + } + subgraph cluster_415 { + label = "Wait on Transforms:AllocationToken/Wait" + subgraph cluster_416 { + label = "Wait on Transforms:AllocationToken/Wait/Map" + 417 [label="ParMultiDo(Anonymous)"] + 79 -> 417 [style=solid label=""] + 413 -> 417 [style=dashed label=""] + } + } + } + subgraph cluster_418 { + label = "Write to sql: Transforms:Recurring" + subgraph cluster_419 { + label = "Write to sql: Transforms:Recurring/Shard data for Transforms:Recurring" + subgraph cluster_420 { + label = "Write to sql: Transforms:Recurring/Shard data for Transforms:Recurring/Map" + 421 [label="ParMultiDo(Anonymous)"] + 417 -> 421 [style=solid label=""] + } + } + subgraph cluster_422 { + label = "Write to sql: Transforms:Recurring/Batch output by shard Transforms:Recurring" + subgraph cluster_423 { + label = "Write to sql: Transforms:Recurring/Batch output by shard Transforms:Recurring/ParDo(GroupIntoBatches)" + 424 [label="ParMultiDo(GroupIntoBatches)"] + 421 -> 424 [style=solid label=""] + } + } + subgraph cluster_425 { + label = "Write to sql: Transforms:Recurring/Write in batch for Transforms:Recurring" + 426 [label="ParMultiDo(SqlBatchWriter)"] + 424 -> 426 [style=solid label=""] + } + } + subgraph cluster_427 { + label = "Wait on Transforms:Recurring" + subgraph cluster_428 { + label = "Wait on Transforms:Recurring/To wait view 0" + subgraph cluster_429 { + label = "Wait on Transforms:Recurring/To wait view 0/Window.Into()" + 430 [label="Flatten.PCollections"] + 426 -> 430 [style=solid label=""] + } + subgraph cluster_431 { + label = "Wait on Transforms:Recurring/To wait view 0/ParDo(CollectWindows)" + 432 [label="ParMultiDo(CollectWindows)"] + 430 -> 432 [style=solid label=""] + } + subgraph cluster_433 { + label = "Wait on Transforms:Recurring/To wait view 0/Sample.Any" + subgraph cluster_434 { + label = "Wait on Transforms:Recurring/To wait view 0/Sample.Any/Combine.globally(SampleAny)" + subgraph cluster_435 { + label = "Wait on Transforms:Recurring/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys" + subgraph cluster_436 { + label = "Wait on Transforms:Recurring/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys" + subgraph cluster_437 { + label = "Wait on Transforms:Recurring/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys/Map" + 438 [label="ParMultiDo(Anonymous)"] + 432 -> 438 [style=solid label=""] + } + } + } + subgraph cluster_439 { + label = "Wait on Transforms:Recurring/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)" + 440 [label="GroupByKey"] + 438 -> 440 [style=solid label=""] + subgraph cluster_441 { + label = "Wait on Transforms:Recurring/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues" + subgraph cluster_442 { + label = "Wait on Transforms:Recurring/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues/ParDo(Anonymous)" + 443 [label="ParMultiDo(Anonymous)"] + 440 -> 443 [style=solid label=""] + } + } + } + subgraph cluster_444 { + label = "Wait on Transforms:Recurring/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values" + subgraph cluster_445 { + label = "Wait on Transforms:Recurring/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values" + subgraph cluster_446 { + label = "Wait on Transforms:Recurring/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values/Map" + 447 [label="ParMultiDo(Anonymous)"] + 443 -> 447 [style=solid label=""] + } + } + } + } + subgraph cluster_448 { + label = "Wait on Transforms:Recurring/To wait view 0/Sample.Any/Flatten.Iterables" + subgraph cluster_449 { + label = "Wait on Transforms:Recurring/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables" + subgraph cluster_450 { + label = "Wait on Transforms:Recurring/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables/FlatMap" + 451 [label="ParMultiDo(Anonymous)"] + 447 -> 451 [style=solid label=""] + } + } + } + } + subgraph cluster_452 { + label = "Wait on Transforms:Recurring/To wait view 0/View.AsList" + subgraph cluster_453 { + label = "Wait on Transforms:Recurring/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization" + subgraph cluster_454 { + label = "Wait on Transforms:Recurring/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)" + 455 [label="ParMultiDo(VoidKeyToMultimapMaterialization)"] + 451 -> 455 [style=solid label=""] + } + } + 456 [label="View.CreatePCollectionView"] + 455 -> 456 [style=solid label=""] + } + } + subgraph cluster_457 { + label = "Wait on Transforms:Recurring/Wait" + subgraph cluster_458 { + label = "Wait on Transforms:Recurring/Wait/Map" + 459 [label="ParMultiDo(Anonymous)"] + 79 -> 459 [style=solid label=""] + 455 -> 459 [style=dashed label=""] + } + } + } + subgraph cluster_460 { + label = "Write to sql: Transforms:OneTime" + subgraph cluster_461 { + label = "Write to sql: Transforms:OneTime/Shard data for Transforms:OneTime" + subgraph cluster_462 { + label = "Write to sql: Transforms:OneTime/Shard data for Transforms:OneTime/Map" + 463 [label="ParMultiDo(Anonymous)"] + 459 -> 463 [style=solid label=""] + } + } + subgraph cluster_464 { + label = "Write to sql: Transforms:OneTime/Batch output by shard Transforms:OneTime" + subgraph cluster_465 { + label = "Write to sql: Transforms:OneTime/Batch output by shard Transforms:OneTime/ParDo(GroupIntoBatches)" + 466 [label="ParMultiDo(GroupIntoBatches)"] + 463 -> 466 [style=solid label=""] + } + } + subgraph cluster_467 { + label = "Write to sql: Transforms:OneTime/Write in batch for Transforms:OneTime" + 468 [label="ParMultiDo(SqlBatchWriter)"] + 466 -> 468 [style=solid label=""] + } + } + subgraph cluster_469 { + label = "Wait on Transforms:OneTime" + subgraph cluster_470 { + label = "Wait on Transforms:OneTime/To wait view 0" + subgraph cluster_471 { + label = "Wait on Transforms:OneTime/To wait view 0/Window.Into()" + 472 [label="Flatten.PCollections"] + 468 -> 472 [style=solid label=""] + } + subgraph cluster_473 { + label = "Wait on Transforms:OneTime/To wait view 0/ParDo(CollectWindows)" + 474 [label="ParMultiDo(CollectWindows)"] + 472 -> 474 [style=solid label=""] + } + subgraph cluster_475 { + label = "Wait on Transforms:OneTime/To wait view 0/Sample.Any" + subgraph cluster_476 { + label = "Wait on Transforms:OneTime/To wait view 0/Sample.Any/Combine.globally(SampleAny)" + subgraph cluster_477 { + label = "Wait on Transforms:OneTime/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys" + subgraph cluster_478 { + label = "Wait on Transforms:OneTime/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys" + subgraph cluster_479 { + label = "Wait on Transforms:OneTime/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys/Map" + 480 [label="ParMultiDo(Anonymous)"] + 474 -> 480 [style=solid label=""] + } + } + } + subgraph cluster_481 { + label = "Wait on Transforms:OneTime/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)" + 482 [label="GroupByKey"] + 480 -> 482 [style=solid label=""] + subgraph cluster_483 { + label = "Wait on Transforms:OneTime/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues" + subgraph cluster_484 { + label = "Wait on Transforms:OneTime/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues/ParDo(Anonymous)" + 485 [label="ParMultiDo(Anonymous)"] + 482 -> 485 [style=solid label=""] + } + } + } + subgraph cluster_486 { + label = "Wait on Transforms:OneTime/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values" + subgraph cluster_487 { + label = "Wait on Transforms:OneTime/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values" + subgraph cluster_488 { + label = "Wait on Transforms:OneTime/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values/Map" + 489 [label="ParMultiDo(Anonymous)"] + 485 -> 489 [style=solid label=""] + } + } + } + } + subgraph cluster_490 { + label = "Wait on Transforms:OneTime/To wait view 0/Sample.Any/Flatten.Iterables" + subgraph cluster_491 { + label = "Wait on Transforms:OneTime/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables" + subgraph cluster_492 { + label = "Wait on Transforms:OneTime/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables/FlatMap" + 493 [label="ParMultiDo(Anonymous)"] + 489 -> 493 [style=solid label=""] + } + } + } + } + subgraph cluster_494 { + label = "Wait on Transforms:OneTime/To wait view 0/View.AsList" + subgraph cluster_495 { + label = "Wait on Transforms:OneTime/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization" + subgraph cluster_496 { + label = "Wait on Transforms:OneTime/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)" + 497 [label="ParMultiDo(VoidKeyToMultimapMaterialization)"] + 493 -> 497 [style=solid label=""] + } + } + 498 [label="View.CreatePCollectionView"] + 497 -> 498 [style=solid label=""] + } + } + subgraph cluster_499 { + label = "Wait on Transforms:OneTime/Wait" + subgraph cluster_500 { + label = "Wait on Transforms:OneTime/Wait/Map" + 501 [label="ParMultiDo(Anonymous)"] + 79 -> 501 [style=solid label=""] + 497 -> 501 [style=dashed label=""] + } + } + } + subgraph cluster_502 { + label = "Write to sql: Transforms:Cancellation" + subgraph cluster_503 { + label = "Write to sql: Transforms:Cancellation/Shard data for Transforms:Cancellation" + subgraph cluster_504 { + label = "Write to sql: Transforms:Cancellation/Shard data for Transforms:Cancellation/Map" + 505 [label="ParMultiDo(Anonymous)"] + 501 -> 505 [style=solid label=""] + } + } + subgraph cluster_506 { + label = "Write to sql: Transforms:Cancellation/Batch output by shard Transforms:Cancellation" + subgraph cluster_507 { + label = "Write to sql: Transforms:Cancellation/Batch output by shard Transforms:Cancellation/ParDo(GroupIntoBatches)" + 508 [label="ParMultiDo(GroupIntoBatches)"] + 505 -> 508 [style=solid label=""] + } + } + subgraph cluster_509 { + label = "Write to sql: Transforms:Cancellation/Write in batch for Transforms:Cancellation" + 510 [label="ParMultiDo(SqlBatchWriter)"] + 508 -> 510 [style=solid label=""] + } + } + subgraph cluster_511 { + label = "Wait on Transforms:Cancellation" + subgraph cluster_512 { + label = "Wait on Transforms:Cancellation/To wait view 0" + subgraph cluster_513 { + label = "Wait on Transforms:Cancellation/To wait view 0/Window.Into()" + 514 [label="Flatten.PCollections"] + 510 -> 514 [style=solid label=""] + } + subgraph cluster_515 { + label = "Wait on Transforms:Cancellation/To wait view 0/ParDo(CollectWindows)" + 516 [label="ParMultiDo(CollectWindows)"] + 514 -> 516 [style=solid label=""] + } + subgraph cluster_517 { + label = "Wait on Transforms:Cancellation/To wait view 0/Sample.Any" + subgraph cluster_518 { + label = "Wait on Transforms:Cancellation/To wait view 0/Sample.Any/Combine.globally(SampleAny)" + subgraph cluster_519 { + label = "Wait on Transforms:Cancellation/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys" + subgraph cluster_520 { + label = "Wait on Transforms:Cancellation/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys" + subgraph cluster_521 { + label = "Wait on Transforms:Cancellation/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys/Map" + 522 [label="ParMultiDo(Anonymous)"] + 516 -> 522 [style=solid label=""] + } + } + } + subgraph cluster_523 { + label = "Wait on Transforms:Cancellation/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)" + 524 [label="GroupByKey"] + 522 -> 524 [style=solid label=""] + subgraph cluster_525 { + label = "Wait on Transforms:Cancellation/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues" + subgraph cluster_526 { + label = "Wait on Transforms:Cancellation/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues/ParDo(Anonymous)" + 527 [label="ParMultiDo(Anonymous)"] + 524 -> 527 [style=solid label=""] + } + } + } + subgraph cluster_528 { + label = "Wait on Transforms:Cancellation/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values" + subgraph cluster_529 { + label = "Wait on Transforms:Cancellation/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values" + subgraph cluster_530 { + label = "Wait on Transforms:Cancellation/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values/Map" + 531 [label="ParMultiDo(Anonymous)"] + 527 -> 531 [style=solid label=""] + } + } + } + } + subgraph cluster_532 { + label = "Wait on Transforms:Cancellation/To wait view 0/Sample.Any/Flatten.Iterables" + subgraph cluster_533 { + label = "Wait on Transforms:Cancellation/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables" + subgraph cluster_534 { + label = "Wait on Transforms:Cancellation/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables/FlatMap" + 535 [label="ParMultiDo(Anonymous)"] + 531 -> 535 [style=solid label=""] + } + } + } + } + subgraph cluster_536 { + label = "Wait on Transforms:Cancellation/To wait view 0/View.AsList" + subgraph cluster_537 { + label = "Wait on Transforms:Cancellation/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization" + subgraph cluster_538 { + label = "Wait on Transforms:Cancellation/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)" + 539 [label="ParMultiDo(VoidKeyToMultimapMaterialization)"] + 535 -> 539 [style=solid label=""] + } + } + 540 [label="View.CreatePCollectionView"] + 539 -> 540 [style=solid label=""] + } + } + subgraph cluster_541 { + label = "Wait on Transforms:Cancellation/Wait" + subgraph cluster_542 { + label = "Wait on Transforms:Cancellation/Wait/Map" + 543 [label="ParMultiDo(Anonymous)"] + 79 -> 543 [style=solid label=""] + 539 -> 543 [style=dashed label=""] + } + } + } + subgraph cluster_544 { + label = "Write to sql: Transforms:PollMessage" + subgraph cluster_545 { + label = "Write to sql: Transforms:PollMessage/Shard data for Transforms:PollMessage" + subgraph cluster_546 { + label = "Write to sql: Transforms:PollMessage/Shard data for Transforms:PollMessage/Map" + 547 [label="ParMultiDo(Anonymous)"] + 543 -> 547 [style=solid label=""] + } + } + subgraph cluster_548 { + label = "Write to sql: Transforms:PollMessage/Batch output by shard Transforms:PollMessage" + subgraph cluster_549 { + label = "Write to sql: Transforms:PollMessage/Batch output by shard Transforms:PollMessage/ParDo(GroupIntoBatches)" + 550 [label="ParMultiDo(GroupIntoBatches)"] + 547 -> 550 [style=solid label=""] + } + } + subgraph cluster_551 { + label = "Write to sql: Transforms:PollMessage/Write in batch for Transforms:PollMessage" + 552 [label="ParMultiDo(SqlBatchWriter)"] + 550 -> 552 [style=solid label=""] + } + } + subgraph cluster_553 { + label = "Wait on Transforms:PollMessage" + subgraph cluster_554 { + label = "Wait on Transforms:PollMessage/To wait view 0" + subgraph cluster_555 { + label = "Wait on Transforms:PollMessage/To wait view 0/Window.Into()" + 556 [label="Flatten.PCollections"] + 552 -> 556 [style=solid label=""] + } + subgraph cluster_557 { + label = "Wait on Transforms:PollMessage/To wait view 0/ParDo(CollectWindows)" + 558 [label="ParMultiDo(CollectWindows)"] + 556 -> 558 [style=solid label=""] + } + subgraph cluster_559 { + label = "Wait on Transforms:PollMessage/To wait view 0/Sample.Any" + subgraph cluster_560 { + label = "Wait on Transforms:PollMessage/To wait view 0/Sample.Any/Combine.globally(SampleAny)" + subgraph cluster_561 { + label = "Wait on Transforms:PollMessage/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys" + subgraph cluster_562 { + label = "Wait on Transforms:PollMessage/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys" + subgraph cluster_563 { + label = "Wait on Transforms:PollMessage/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys/Map" + 564 [label="ParMultiDo(Anonymous)"] + 558 -> 564 [style=solid label=""] + } + } + } + subgraph cluster_565 { + label = "Wait on Transforms:PollMessage/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)" + 566 [label="GroupByKey"] + 564 -> 566 [style=solid label=""] + subgraph cluster_567 { + label = "Wait on Transforms:PollMessage/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues" + subgraph cluster_568 { + label = "Wait on Transforms:PollMessage/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues/ParDo(Anonymous)" + 569 [label="ParMultiDo(Anonymous)"] + 566 -> 569 [style=solid label=""] + } + } + } + subgraph cluster_570 { + label = "Wait on Transforms:PollMessage/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values" + subgraph cluster_571 { + label = "Wait on Transforms:PollMessage/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values" + subgraph cluster_572 { + label = "Wait on Transforms:PollMessage/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values/Map" + 573 [label="ParMultiDo(Anonymous)"] + 569 -> 573 [style=solid label=""] + } + } + } + } + subgraph cluster_574 { + label = "Wait on Transforms:PollMessage/To wait view 0/Sample.Any/Flatten.Iterables" + subgraph cluster_575 { + label = "Wait on Transforms:PollMessage/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables" + subgraph cluster_576 { + label = "Wait on Transforms:PollMessage/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables/FlatMap" + 577 [label="ParMultiDo(Anonymous)"] + 573 -> 577 [style=solid label=""] + } + } + } + } + subgraph cluster_578 { + label = "Wait on Transforms:PollMessage/To wait view 0/View.AsList" + subgraph cluster_579 { + label = "Wait on Transforms:PollMessage/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization" + subgraph cluster_580 { + label = "Wait on Transforms:PollMessage/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)" + 581 [label="ParMultiDo(VoidKeyToMultimapMaterialization)"] + 577 -> 581 [style=solid label=""] + } + } + 582 [label="View.CreatePCollectionView"] + 581 -> 582 [style=solid label=""] + } + } + subgraph cluster_583 { + label = "Wait on Transforms:PollMessage/Wait" + subgraph cluster_584 { + label = "Wait on Transforms:PollMessage/Wait/Map" + 585 [label="ParMultiDo(Anonymous)"] + 79 -> 585 [style=solid label=""] + 581 -> 585 [style=dashed label=""] + } + } + } + subgraph cluster_586 { + label = "Write to sql: Transforms:DomainBase" + subgraph cluster_587 { + label = "Write to sql: Transforms:DomainBase/Shard data for Transforms:DomainBase" + subgraph cluster_588 { + label = "Write to sql: Transforms:DomainBase/Shard data for Transforms:DomainBase/Map" + 589 [label="ParMultiDo(Anonymous)"] + 585 -> 589 [style=solid label=""] + } + } + subgraph cluster_590 { + label = "Write to sql: Transforms:DomainBase/Batch output by shard Transforms:DomainBase" + subgraph cluster_591 { + label = "Write to sql: Transforms:DomainBase/Batch output by shard Transforms:DomainBase/ParDo(GroupIntoBatches)" + 592 [label="ParMultiDo(GroupIntoBatches)"] + 589 -> 592 [style=solid label=""] + } + } + subgraph cluster_593 { + label = "Write to sql: Transforms:DomainBase/Write in batch for Transforms:DomainBase" + 594 [label="ParMultiDo(SqlBatchWriter)"] + 592 -> 594 [style=solid label=""] + } + } } } diff --git a/core/src/test/resources/google/registry/beam/initsql/pipeline_golden.png b/core/src/test/resources/google/registry/beam/initsql/pipeline_golden.png index 40ebad90d..36d180232 100644 Binary files a/core/src/test/resources/google/registry/beam/initsql/pipeline_golden.png and b/core/src/test/resources/google/registry/beam/initsql/pipeline_golden.png differ