diff --git a/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java b/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java new file mode 100644 index 000000000..dbeb14843 --- /dev/null +++ b/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java @@ -0,0 +1,229 @@ +// Copyright 2020 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam.common; + +import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; +import static org.apache.beam.sdk.values.TypeDescriptors.integers; + +import com.google.auto.value.AutoValue; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Streams; +import google.registry.backup.AppEngineEnvironment; +import google.registry.model.ofy.ObjectifyService; +import google.registry.persistence.transaction.JpaTransactionManager; +import google.registry.persistence.transaction.TransactionManagerFactory; +import java.util.Objects; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupIntoBatches; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.util.ShardedKey; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +/** + * Contains IO {@link PTransform transforms} for a BEAM pipeline that interacts with a single + * database through a {@link JpaTransactionManager}. + * + *

The {@code JpaTransactionManager} is instantiated once on each pipeline worker VM (through + * {@link RegistryPipelineWorkerInitializer}), made available through the static method {@link + * TransactionManagerFactory#jpaTm()}, and is shared by all threads on the VM. Configuration is + * through {@link RegistryPipelineOptions}. + */ +public final class RegistryJpaIO { + + private RegistryJpaIO() {} + + public static Write write() { + return Write.builder().build(); + } + + /** + * A {@link PTransform transform} that writes a PCollection of entities to the SQL database using + * the {@link JpaTransactionManager}. + * + *

Unlike typical BEAM {@link Write} transforms, the output type of this transform is {@code + * PCollection} instead of {@link org.apache.beam.sdk.values.PDone}. This deviation allows + * the sequencing of multiple {@code PCollections}: we have use cases where one collection of data + * must be completely written before another can start (due to foreign key constraints in the + * latter). + * + * @param type of the entities to be written + */ + @AutoValue + public abstract static class Write extends PTransform, PCollection> { + + public static final String DEFAULT_NAME = "RegistryJpaIO.Write"; + + public static final int DEFAULT_BATCH_SIZE = 1; + + /** The default number of write shard. Please refer to {@link #shards} for more information. */ + public static final int DEFAULT_SHARDS = 1; + + public abstract String name(); + + /** Number of elements to be written in one call. */ + public abstract int batchSize(); + + /** + * The number of shards the output should be split into. + * + *

This value is a hint to the pipeline runner on the level of parallelism, and should be + * significantly greater than the number of threads working on this transformation (see next + * paragraph for more information). On the other hand, it should not be too large to the point + * that the number of elements per shard is lower than {@link #batchSize()}. As a rule of thumb, + * the following constraint should hold: {@code shards * batchSize * nThreads <= + * inputElementCount}. Although it is not always possible to determine the number of threads + * working on this transform, when the pipeline run is IO-bound, it most likely is close to the + * total number of threads in the pipeline, which is explained below. + * + *

With Cloud Dataflow runner, the total number of worker threads in a batch pipeline (which + * includes all existing Registry pipelines) is the number of vCPUs used by the pipeline, and + * can be set by the {@code --maxNumWorkers} and {@code --workerMachineType} parameters. The + * number of worker threads in a streaming pipeline can be set by the {@code --maxNumWorkers} + * and {@code --numberOfWorkerHarnessThreads} parameters. + * + *

Note that connections on the database server are a limited resource, therefore the number + * of threads that interact with the database should be set to an appropriate limit. Again, we + * cannot control this number, but can influence it by controlling the total number of threads. + */ + public abstract int shards(); + + public abstract SerializableFunction jpaConverter(); + + public Write withName(String name) { + return toBuilder().name(name).build(); + } + + public Write withBatchSize(int batchSize) { + return toBuilder().batchSize(batchSize).build(); + } + + public Write withShards(int shards) { + return toBuilder().shards(shards).build(); + } + + /** + * An optional function that converts the input entities to a form that can be written into the + * database. + */ + public Write withJpaConverter(SerializableFunction jpaConverter) { + return toBuilder().jpaConverter(jpaConverter).build(); + } + + abstract Builder toBuilder(); + + @Override + public PCollection expand(PCollection input) { + return input + .apply( + "Shard data " + name(), + WithKeys.of(e -> ThreadLocalRandom.current().nextInt(shards())) + .withKeyType(integers())) + // The call to withShardedKey() is performance critical. The resulting transform ensures + // that data is spread evenly across all worker threads. + .apply( + "Group into batches " + name(), + GroupIntoBatches.ofSize(batchSize()).withShardedKey()) + .apply( + "Write in batch for " + name(), + ParDo.of(new SqlBatchWriter<>(name(), jpaConverter()))); + } + + static Builder builder() { + return new AutoValue_RegistryJpaIO_Write.Builder() + .name(DEFAULT_NAME) + .batchSize(DEFAULT_BATCH_SIZE) + .shards(DEFAULT_SHARDS) + .jpaConverter(x -> x); + } + + @AutoValue.Builder + abstract static class Builder { + + abstract Builder name(String name); + + abstract Builder batchSize(int batchSize); + + abstract Builder shards(int jdbcNumConnsHint); + + abstract Builder jpaConverter(SerializableFunction jpaConverter); + + abstract Write build(); + } + } + + /** Writes a batch of entities to a SQL database through a {@link JpaTransactionManager}. */ + private static class SqlBatchWriter extends DoFn, Iterable>, Void> { + private final Counter counter; + private final SerializableFunction jpaConverter; + + SqlBatchWriter(String type, SerializableFunction jpaConverter) { + counter = Metrics.counter("SQL_WRITE", type); + this.jpaConverter = jpaConverter; + } + + @Setup + public void setup() { + // Below is needed as long as Objectify keys are still involved in the handling of SQL + // entities (e.g., in VKeys). + try (AppEngineEnvironment env = new AppEngineEnvironment()) { + ObjectifyService.initOfy(); + } + } + + @ProcessElement + public void processElement(@Element KV, Iterable> kv) { + try (AppEngineEnvironment env = new AppEngineEnvironment()) { + ImmutableList ofyEntities = + Streams.stream(kv.getValue()) + .map(this.jpaConverter::apply) + // TODO(b/177340730): post migration delete the line below. + .filter(Objects::nonNull) + .collect(ImmutableList.toImmutableList()); + try { + jpaTm().transact(() -> jpaTm().putAll(ofyEntities)); + counter.inc(ofyEntities.size()); + } catch (RuntimeException e) { + processSingly(ofyEntities); + } + } + } + + /** + * Writes entities in a failed batch one by one to identify the first bad entity and throws a + * {@link RuntimeException} on it. + */ + private void processSingly(ImmutableList ofyEntities) { + for (Object ofyEntity : ofyEntities) { + try { + jpaTm().transact(() -> jpaTm().put(ofyEntity)); + counter.inc(); + } catch (RuntimeException e) { + throw new RuntimeException(toOfyKey(ofyEntity).toString(), e); + } + } + } + + private com.googlecode.objectify.Key toOfyKey(Object ofyEntity) { + return com.googlecode.objectify.Key.create(ofyEntity); + } + } +} diff --git a/core/src/main/java/google/registry/beam/common/RegistryPipelineComponent.java b/core/src/main/java/google/registry/beam/common/RegistryPipelineComponent.java index bc7a90644..8e43ceb67 100644 --- a/core/src/main/java/google/registry/beam/common/RegistryPipelineComponent.java +++ b/core/src/main/java/google/registry/beam/common/RegistryPipelineComponent.java @@ -14,6 +14,7 @@ package google.registry.beam.common; +import dagger.BindsInstance; import dagger.Component; import dagger.Lazy; import google.registry.config.CredentialModule; @@ -21,9 +22,11 @@ import google.registry.config.RegistryConfig.Config; import google.registry.config.RegistryConfig.ConfigModule; import google.registry.persistence.PersistenceModule; import google.registry.persistence.PersistenceModule.BeamJpaTm; +import google.registry.persistence.PersistenceModule.TransactionIsolationLevel; import google.registry.persistence.transaction.JpaTransactionManager; import google.registry.privileges.secretmanager.SecretManagerModule; import google.registry.util.UtilsModule; +import javax.annotation.Nullable; import javax.inject.Singleton; /** Component that provides everything needed on a Pipeline worker. */ @@ -44,4 +47,18 @@ public interface RegistryPipelineComponent { @BeamJpaTm Lazy getJpaTransactionManager(); + + @Component.Builder + interface Builder { + + /** + * Optionally overrides the default transaction isolation level. This applies to ALL + * transactions executed in the pipeline. + */ + @BindsInstance + Builder isolationOverride( + @Nullable @Config("beamIsolationOverride") TransactionIsolationLevel isolationOverride); + + RegistryPipelineComponent build(); + } } diff --git a/core/src/main/java/google/registry/beam/common/RegistryPipelineOptions.java b/core/src/main/java/google/registry/beam/common/RegistryPipelineOptions.java index 5a4715c73..cda1e33d6 100644 --- a/core/src/main/java/google/registry/beam/common/RegistryPipelineOptions.java +++ b/core/src/main/java/google/registry/beam/common/RegistryPipelineOptions.java @@ -14,13 +14,23 @@ package google.registry.beam.common; +import google.registry.beam.common.RegistryJpaIO.Write; import google.registry.config.RegistryEnvironment; +import google.registry.persistence.PersistenceModule.TransactionIsolationLevel; import java.util.Objects; import javax.annotation.Nullable; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; -/** Defines Nomulus-specific pipeline options. */ +/** + * Defines Nomulus-specific pipeline options, e.g. JPA configurations. + * + *

When using the Cloud Dataflow runner, users are recommended to set an upper bound on active + * database connections by setting the pipeline worker options including {@code --maxNumWorkers}, + * {@code workerMachineType}, and {@code numberOfWorkerHarnessThreads}. Please refer to {@link + * Write#shards()} for more information. + */ public interface RegistryPipelineOptions extends GcpOptions { @Description("The Registry environment.") @@ -29,6 +39,32 @@ public interface RegistryPipelineOptions extends GcpOptions { void setRegistryEnvironment(RegistryEnvironment environment); + @Description("The desired SQL transaction isolation level.") + @Nullable + TransactionIsolationLevel getIsolationOverride(); + + void setIsolationOverride(TransactionIsolationLevel isolationOverride); + + @Description("The number of entities to write to the SQL database in one operation.") + @Default.Integer(20) + int getSqlWriteBatchSize(); + + void setSqlWriteBatchSize(int sqlWriteBatchSize); + + @Description( + "Number of shards to create out of the data before writing to the SQL database. Please refer " + + "to the Javadoc of RegistryJpaIO.Write.shards() for how to choose this value.") + @Default.Integer(100) + int getSqlWriteShards(); + + void setSqlWriteShards(int maxConcurrentSqlWriters); + + static RegistryPipelineComponent toRegistryPipelineComponent(RegistryPipelineOptions options) { + return DaggerRegistryPipelineComponent.builder() + .isolationOverride(options.getIsolationOverride()) + .build(); + } + /** * Validates the GCP project and Registry environment settings in {@code option}. If project is * undefined, it is set according to the Registry environment; if project is defined but @@ -38,18 +74,18 @@ public interface RegistryPipelineOptions extends GcpOptions { * in {@link RegistryEnvironment}). Tests calling this method must restore the original * environment on completion. */ - static void validateRegistryPipelineOptions(RegistryPipelineOptions option) { - RegistryEnvironment environment = option.getRegistryEnvironment(); + static void validateRegistryPipelineOptions(RegistryPipelineOptions options) { + RegistryEnvironment environment = options.getRegistryEnvironment(); if (environment == null) { return; } environment.setup(); - String projectByEnv = DaggerRegistryPipelineComponent.create().getProjectId(); - if (Objects.equals(option.getProject(), projectByEnv)) { + String projectByEnv = toRegistryPipelineComponent(options).getProjectId(); + if (Objects.equals(options.getProject(), projectByEnv)) { return; } - if (option.getProject() == null) { - option.setProject(projectByEnv); + if (options.getProject() == null) { + options.setProject(projectByEnv); return; } throw new IllegalArgumentException( diff --git a/core/src/main/java/google/registry/beam/common/RegistryPipelineWorkerInitializer.java b/core/src/main/java/google/registry/beam/common/RegistryPipelineWorkerInitializer.java index 7682a5978..c89c70710 100644 --- a/core/src/main/java/google/registry/beam/common/RegistryPipelineWorkerInitializer.java +++ b/core/src/main/java/google/registry/beam/common/RegistryPipelineWorkerInitializer.java @@ -14,6 +14,8 @@ package google.registry.beam.common; +import static google.registry.beam.common.RegistryPipelineOptions.toRegistryPipelineComponent; + import com.google.auto.service.AutoService; import com.google.common.flogger.FluentLogger; import dagger.Lazy; @@ -24,7 +26,8 @@ import org.apache.beam.sdk.harness.JvmInitializer; import org.apache.beam.sdk.options.PipelineOptions; /** - * Sets up Nomulus environment and initializes JPA on each pipeline worker. + * Sets up Nomulus environment and initializes JPA on each pipeline worker. It is assumed that the + * pipeline only works with one SQL database. * *

This class only takes effect in portable beam pipeline runners (including the Cloud Dataflow * runner). It is not invoked in test pipelines. @@ -35,15 +38,15 @@ public class RegistryPipelineWorkerInitializer implements JvmInitializer { @Override public void beforeProcessing(PipelineOptions options) { - RegistryEnvironment environment = - options.as(RegistryPipelineOptions.class).getRegistryEnvironment(); + RegistryPipelineOptions registryOptions = options.as(RegistryPipelineOptions.class); + RegistryEnvironment environment = registryOptions.getRegistryEnvironment(); if (environment == null || environment.equals(RegistryEnvironment.UNITTEST)) { return; } logger.atInfo().log("Setting up RegistryEnvironment: %s", environment); environment.setup(); Lazy transactionManagerLazy = - DaggerRegistryPipelineComponent.create().getJpaTransactionManager(); + toRegistryPipelineComponent(registryOptions).getJpaTransactionManager(); TransactionManagerFactory.setJpaTmOnBeamWorker(transactionManagerLazy::get); } } diff --git a/core/src/main/java/google/registry/beam/initsql/InitSqlPipeline.java b/core/src/main/java/google/registry/beam/initsql/InitSqlPipeline.java index 970414fa4..7294593ca 100644 --- a/core/src/main/java/google/registry/beam/initsql/InitSqlPipeline.java +++ b/core/src/main/java/google/registry/beam/initsql/InitSqlPipeline.java @@ -22,7 +22,7 @@ import com.google.common.collect.ImmutableSet; import com.googlecode.objectify.Key; import google.registry.backup.AppEngineEnvironment; import google.registry.backup.VersionedEntity; -import google.registry.beam.initsql.BeamJpaModule.JpaTransactionManagerComponent; +import google.registry.beam.common.RegistryJpaIO; import google.registry.beam.initsql.Transforms.RemoveDomainBaseForeignKeys; import google.registry.model.billing.BillingEvent; import google.registry.model.contact.ContactResource; @@ -35,7 +35,6 @@ import google.registry.model.registrar.RegistrarContact; import google.registry.model.registry.Registry; import google.registry.model.reporting.HistoryEntry; import google.registry.persistence.PersistenceModule.TransactionIsolationLevel; -import google.registry.persistence.transaction.JpaTransactionManager; import java.io.Serializable; import java.util.Collection; import java.util.Optional; @@ -43,7 +42,6 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.Wait; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; @@ -124,20 +122,15 @@ public class InitSqlPipeline implements Serializable { private final Pipeline pipeline; - private final SerializableFunction - jpaGetter; - InitSqlPipeline(InitSqlPipelineOptions options) { this.options = options; pipeline = Pipeline.create(options); - jpaGetter = JpaTransactionManagerComponent::cloudSqlJpaTransactionManager; } @VisibleForTesting InitSqlPipeline(InitSqlPipelineOptions options, Pipeline pipeline) { this.options = options; this.pipeline = pipeline; - jpaGetter = JpaTransactionManagerComponent::localDbJpaTransactionManager; } public PipelineResult run() { @@ -147,6 +140,7 @@ public class InitSqlPipeline implements Serializable { @VisibleForTesting void setupPipeline() { + options.setIsolationOverride(TransactionIsolationLevel.TRANSACTION_READ_UNCOMMITTED); PCollectionTuple datastoreSnapshot = pipeline.apply( "Load Datastore snapshot", @@ -223,22 +217,13 @@ public class InitSqlPipeline implements Serializable { } private PCollection writeToSql(String transformId, PCollection data) { - String credentialFileUrl = - options.getSqlCredentialUrlOverride() != null - ? options.getSqlCredentialUrlOverride() - : BackupPaths.getCloudSQLCredentialFilePatterns(options.getEnvironment()).get(0); - return data.apply( - "Write to sql: " + transformId, - Transforms.writeToSql( - transformId, - options.getMaxConcurrentSqlWriters(), - options.getSqlWriteBatchSize(), - new JpaSupplierFactory( - credentialFileUrl, - options.getCloudKmsProjectId(), - jpaGetter, - TransactionIsolationLevel.TRANSACTION_READ_UNCOMMITTED))); + "Write to Sql: " + transformId, + RegistryJpaIO.write() + .withName(transformId) + .withBatchSize(options.getSqlWriteBatchSize()) + .withShards(options.getSqlWriteShards()) + .withJpaConverter(Transforms::convertVersionedEntityToSqlEntity)); } private static ImmutableList toKindStrings(Collection> entityClasses) { diff --git a/core/src/main/java/google/registry/beam/initsql/InitSqlPipelineOptions.java b/core/src/main/java/google/registry/beam/initsql/InitSqlPipelineOptions.java index ebf1e85c5..83756c52e 100644 --- a/core/src/main/java/google/registry/beam/initsql/InitSqlPipelineOptions.java +++ b/core/src/main/java/google/registry/beam/initsql/InitSqlPipelineOptions.java @@ -14,21 +14,12 @@ package google.registry.beam.initsql; -import javax.annotation.Nullable; -import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; -import org.apache.beam.sdk.options.Default; +import google.registry.beam.common.RegistryPipelineOptions; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.Validation; /** Pipeline options for {@link InitSqlPipeline} */ -public interface InitSqlPipelineOptions extends GcpOptions { - - @Description( - "Overrides the URL to the SQL credential file. " + "Required if environment is not provided.") - @Nullable - String getSqlCredentialUrlOverride(); - - void setSqlCredentialUrlOverride(String credentialUrlOverride); +public interface InitSqlPipelineOptions extends RegistryPipelineOptions { @Description("The root directory of the export to load.") String getDatastoreExportDir(); @@ -51,41 +42,4 @@ public interface InitSqlPipelineOptions extends GcpOptions { String getCommitLogEndTimestamp(); void setCommitLogEndTimestamp(String commitLogEndTimestamp); - - @Description( - "The deployed environment, alpha, crash, sandbox, or production. " - + "Not required only if sqlCredentialUrlOverride is provided.") - @Nullable - String getEnvironment(); - - void setEnvironment(String environment); - - @Description( - "The GCP project that contains the keyring used for decrypting the " + "SQL credential file.") - @Nullable - String getCloudKmsProjectId(); - - void setCloudKmsProjectId(String cloudKmsProjectId); - - @Description( - "The maximum JDBC connection pool size on a VM. " - + "This value should be equal to or greater than the number of cores on the VM.") - @Default.Integer(4) - int getJdbcMaxPoolSize(); - - void setJdbcMaxPoolSize(int jdbcMaxPoolSize); - - @Description( - "A hint to the pipeline runner of the maximum number of concurrent SQL writers to create. " - + "Note that multiple writers may run on the same VM and share the connection pool.") - @Default.Integer(4) - int getMaxConcurrentSqlWriters(); - - void setMaxConcurrentSqlWriters(int maxConcurrentSqlWriters); - - @Description("The number of entities to be written to the SQL database in one transaction.") - @Default.Integer(20) - int getSqlWriteBatchSize(); - - void setSqlWriteBatchSize(int sqlWriteBatchSize); } diff --git a/core/src/main/java/google/registry/beam/initsql/Transforms.java b/core/src/main/java/google/registry/beam/initsql/Transforms.java index 68fad974c..00eb3cc23 100644 --- a/core/src/main/java/google/registry/beam/initsql/Transforms.java +++ b/core/src/main/java/google/registry/beam/initsql/Transforms.java @@ -368,7 +368,7 @@ public final class Transforms { * to make Optional work with BEAM) */ @Nullable - private static Object convertVersionedEntityToSqlEntity(VersionedEntity dsEntity) { + public static Object convertVersionedEntityToSqlEntity(VersionedEntity dsEntity) { return dsEntity .getEntity() .filter(Transforms::isMigratable) diff --git a/core/src/main/java/google/registry/persistence/PersistenceModule.java b/core/src/main/java/google/registry/persistence/PersistenceModule.java index 342eb290a..5335628e7 100644 --- a/core/src/main/java/google/registry/persistence/PersistenceModule.java +++ b/core/src/main/java/google/registry/persistence/PersistenceModule.java @@ -124,18 +124,6 @@ public abstract class PersistenceModule { @Config("beamIsolationOverride") abstract TransactionIsolationLevel bindBeamIsolationOverride(); - /** - * Optionally overrides the maximum connection pool size for JPA. - * - *

If present, this binding overrides the {@code HIKARI_MAXIMUM_POOL_SIZE} value set in {@link - * #provideDefaultDatabaseConfigs()}. The default value is tuned for the Registry server on - * AppEngine. Other applications such as the Nomulus tool and the BEAM pipeline, should override - * it. - */ - @BindsOptionalOf - @Config("jpaMaxPoolSizeOverride") - abstract Integer bindJpaMaxPoolSizeOverride(); - /** * Optionally overrides the Cloud SQL database instance's connection name. * @@ -201,7 +189,6 @@ public abstract class PersistenceModule { SqlCredentialStore credentialStore, @Config("instanceConnectionNameOverride") Optional> instanceConnectionNameOverride, - @Config("jpaMaxPoolSizeOverride") Optional jpaMaxConnectionPoolSizeOverride, @Config("beamIsolationOverride") Optional> isolationOverride, @PartialCloudSqlConfigs ImmutableMap cloudSqlConfigs, @@ -211,13 +198,21 @@ public abstract class PersistenceModule { SqlCredential credential = credentialStore.getCredential(new RobotUser(RobotId.NOMULUS)); overrides.put(Environment.USER, credential.login()); overrides.put(Environment.PASS, credential.password()); + // Override the default minimum which is tuned for the Registry server. A worker VM should + // release all connections if it no longer interacts with the database. + overrides.put(HIKARI_MINIMUM_IDLE, "0"); + /** + * Disable Hikari's maxPoolSize limit check by setting it to an absurdly large number. The + * effective (and desirable) limit is the number of pipeline threads on the pipeline worker, + * which can be configured using pipeline options. See {@link RegistryPipelineOptions} for more + * information. + */ + overrides.put(HIKARI_MAXIMUM_POOL_SIZE, String.valueOf(Integer.MAX_VALUE)); instanceConnectionNameOverride .map(Provider::get) .ifPresent( instanceConnectionName -> overrides.put(HIKARI_DS_CLOUD_SQL_INSTANCE, instanceConnectionName)); - jpaMaxConnectionPoolSizeOverride.ifPresent( - maxPoolSize -> overrides.put(HIKARI_MAXIMUM_POOL_SIZE, String.valueOf(maxPoolSize))); isolationOverride .map(Provider::get) .ifPresent(isolation -> overrides.put(Environment.ISOLATION, isolation.name())); diff --git a/core/src/test/java/google/registry/beam/common/RegistryJpaWriteTest.java b/core/src/test/java/google/registry/beam/common/RegistryJpaWriteTest.java new file mode 100644 index 000000000..c14cdc343 --- /dev/null +++ b/core/src/test/java/google/registry/beam/common/RegistryJpaWriteTest.java @@ -0,0 +1,124 @@ +// Copyright 2020 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam.common; + +import static com.google.common.truth.Truth.assertThat; +import static google.registry.model.ImmutableObjectSubject.immutableObjectCorrespondence; +import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; + +import com.google.appengine.api.datastore.Entity; +import com.google.common.collect.ImmutableList; +import google.registry.backup.VersionedEntity; +import google.registry.beam.TestPipelineExtension; +import google.registry.beam.initsql.BackupTestStore; +import google.registry.beam.initsql.InitSqlTestUtils; +import google.registry.beam.initsql.Transforms; +import google.registry.model.ImmutableObject; +import google.registry.model.contact.ContactResource; +import google.registry.model.ofy.Ofy; +import google.registry.model.registrar.Registrar; +import google.registry.persistence.transaction.JpaTestRules; +import google.registry.persistence.transaction.JpaTestRules.JpaIntegrationTestExtension; +import google.registry.testing.AppEngineExtension; +import google.registry.testing.DatabaseHelper; +import google.registry.testing.DatastoreEntityExtension; +import google.registry.testing.FakeClock; +import google.registry.testing.InjectExtension; +import java.io.Serializable; +import java.nio.file.Path; +import java.util.stream.Collectors; +import org.apache.beam.sdk.transforms.Create; +import org.joda.time.DateTime; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; + +/** Unit test for {@link RegistryJpaIO.Write}. */ +class RegistryJpaWriteTest implements Serializable { + + private static final DateTime START_TIME = DateTime.parse("2000-01-01T00:00:00.0Z"); + + private final FakeClock fakeClock = new FakeClock(START_TIME); + + @RegisterExtension + @Order(Order.DEFAULT - 1) + final transient DatastoreEntityExtension datastore = new DatastoreEntityExtension(); + + @RegisterExtension final transient InjectExtension injectRule = new InjectExtension(); + + @RegisterExtension + final transient JpaIntegrationTestExtension database = + new JpaTestRules.Builder().withClock(fakeClock).buildIntegrationTestRule(); + + @SuppressWarnings("WeakerAccess") + @TempDir + transient Path tmpDir; + + @RegisterExtension + final transient TestPipelineExtension testPipeline = + TestPipelineExtension.create().enableAbandonedNodeEnforcement(true); + + private ImmutableList contacts; + + @BeforeEach + void beforeEach() throws Exception { + try (BackupTestStore store = new BackupTestStore(fakeClock)) { + injectRule.setStaticField(Ofy.class, "clock", fakeClock); + + // Required for contacts created below. + Registrar ofyRegistrar = AppEngineExtension.makeRegistrar2(); + store.insertOrUpdate(ofyRegistrar); + jpaTm().transact(() -> jpaTm().put(store.loadAsOfyEntity(ofyRegistrar))); + + ImmutableList.Builder builder = new ImmutableList.Builder<>(); + + for (int i = 0; i < 3; i++) { + ContactResource contact = DatabaseHelper.newContactResource("contact_" + i); + store.insertOrUpdate(contact); + builder.add(store.loadAsDatastoreEntity(contact)); + } + contacts = builder.build(); + } + } + + @Test + void writeToSql_twoWriters() { + testPipeline + .apply( + Create.of( + contacts.stream() + .map(InitSqlTestUtils::entityToBytes) + .map(bytes -> VersionedEntity.from(0L, bytes)) + .collect(Collectors.toList()))) + .apply( + RegistryJpaIO.write() + .withName("ContactResource") + .withBatchSize(4) + .withShards(2) + .withJpaConverter(Transforms::convertVersionedEntityToSqlEntity)); + testPipeline.run().waitUntilFinish(); + + ImmutableList sqlContacts = jpaTm().transact(() -> jpaTm().loadAllOf(ContactResource.class)); + assertThat(sqlContacts) + .comparingElementsUsing(immutableObjectCorrespondence("revisions", "updateTimestamp")) + .containsExactlyElementsIn( + contacts.stream() + .map(InitSqlTestUtils::datastoreToOfyEntity) + .map(ImmutableObject.class::cast) + .collect(ImmutableList.toImmutableList())); + } +} diff --git a/core/src/test/java/google/registry/beam/common/RegistryPipelineOptionsTest.java b/core/src/test/java/google/registry/beam/common/RegistryPipelineOptionsTest.java index f77bd2310..6c6586fa6 100644 --- a/core/src/test/java/google/registry/beam/common/RegistryPipelineOptionsTest.java +++ b/core/src/test/java/google/registry/beam/common/RegistryPipelineOptionsTest.java @@ -19,6 +19,7 @@ import static google.registry.beam.common.RegistryPipelineOptions.validateRegist import static org.junit.jupiter.api.Assertions.assertThrows; import google.registry.config.RegistryEnvironment; +import google.registry.persistence.PersistenceModule.TransactionIsolationLevel; import google.registry.testing.SystemPropertyExtension; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.junit.jupiter.api.BeforeEach; @@ -43,29 +44,42 @@ class RegistryPipelineOptionsTest { @Test void environment_fromArgs() { - assertThat( - PipelineOptionsFactory.fromArgs("--registryEnvironment=ALPHA") - .as(RegistryPipelineOptions.class) - .getRegistryEnvironment()) - .isSameInstanceAs(RegistryEnvironment.ALPHA); + RegistryPipelineOptions options = + PipelineOptionsFactory.fromArgs( + "--registryEnvironment=ALPHA", "--isolationOverride=TRANSACTION_SERIALIZABLE") + .withValidation() + .as(RegistryPipelineOptions.class); + assertThat(options.getRegistryEnvironment()).isSameInstanceAs(RegistryEnvironment.ALPHA); + assertThat(options.getIsolationOverride()) + .isSameInstanceAs(TransactionIsolationLevel.TRANSACTION_SERIALIZABLE); } @Test - void environment_invalid() { + void environment_invalidEnvironment() { assertThrows( IllegalArgumentException.class, () -> PipelineOptionsFactory.fromArgs("--registryEnvironment=alpha") + .withValidation() + .as(RegistryPipelineOptions.class)); + } + + @Test + void environment_invalidIsolation() { + assertThrows( + IllegalArgumentException.class, + () -> + PipelineOptionsFactory.fromArgs("--isolationOverride=something_wrong") + .withValidation() .as(RegistryPipelineOptions.class)); } @Test void environment_undefined() { - assertThat( - PipelineOptionsFactory.create() - .as(RegistryPipelineOptions.class) - .getRegistryEnvironment()) - .isNull(); + RegistryPipelineOptions options = + PipelineOptionsFactory.fromArgs().withValidation().as(RegistryPipelineOptions.class); + assertThat(options.getRegistryEnvironment()).isNull(); + assertThat(options.getIsolationOverride()).isNull(); } @Test diff --git a/core/src/test/java/google/registry/beam/initsql/BackupTestStore.java b/core/src/test/java/google/registry/beam/initsql/BackupTestStore.java index 7a6a9910e..fdcb22717 100644 --- a/core/src/test/java/google/registry/beam/initsql/BackupTestStore.java +++ b/core/src/test/java/google/registry/beam/initsql/BackupTestStore.java @@ -47,7 +47,7 @@ import org.joda.time.format.DateTimeFormatter; * every transaction is invoked on this store, ensuring strictly increasing timestamps on causally * dependent transactions. In production, the same ordering is ensured by sleep and retry. */ -class BackupTestStore implements AutoCloseable { +public final class BackupTestStore implements AutoCloseable { private static final DateTimeFormatter EXPORT_TIMESTAMP_FORMAT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss_SSS"); @@ -59,7 +59,7 @@ class BackupTestStore implements AutoCloseable { private CommitLogCheckpoint prevCommitLogCheckpoint; - BackupTestStore(FakeClock fakeClock) throws Exception { + public BackupTestStore(FakeClock fakeClock) throws Exception { this.fakeClock = fakeClock; this.appEngine = new AppEngineExtension.Builder() @@ -88,7 +88,7 @@ class BackupTestStore implements AutoCloseable { * transaction. */ @SafeVarargs - final long insertOrUpdate(Object... entities) { + public final long insertOrUpdate(Object... entities) { long timestamp = fakeClock.nowUtc().getMillis(); tm().transact(() -> ofy().save().entities(entities).now()); fakeClock.advanceOneMilli(); @@ -97,7 +97,7 @@ class BackupTestStore implements AutoCloseable { /** Deletes {@code entities} from the Datastore and returns the timestamp of this transaction. */ @SafeVarargs - final long delete(Object... entities) { + public final long delete(Object... entities) { long timestamp = fakeClock.nowUtc().getMillis(); tm().transact(() -> ofy().delete().entities(entities).now()); fakeClock.advanceOneMilli(); @@ -111,7 +111,7 @@ class BackupTestStore implements AutoCloseable { * Objectify entity and want to find out the values of certain assign-on-persist properties. See * {@link VersionedEntity} for more information. */ - Entity loadAsDatastoreEntity(Object ofyEntity) { + public Entity loadAsDatastoreEntity(Object ofyEntity) { try { return datastoreService.get(Key.create(ofyEntity).getRaw()); } catch (EntityNotFoundException e) { @@ -124,7 +124,7 @@ class BackupTestStore implements AutoCloseable { * *

See {@link #loadAsDatastoreEntity} and {@link VersionedEntity} for more information. */ - Object loadAsOfyEntity(Object ofyEntity) { + public Object loadAsOfyEntity(Object ofyEntity) { try { return ofy().load().fromEntity(datastoreService.get(Key.create(ofyEntity).getRaw())); } catch (EntityNotFoundException e) { diff --git a/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineGraphTest.java b/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineGraphTest.java index 0e62a09c5..a44e2c2a2 100644 --- a/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineGraphTest.java +++ b/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineGraphTest.java @@ -38,7 +38,7 @@ class InitSqlPipelineGraphTest { "--commitLogEndTimestamp=2000-01-02TZ", "--datastoreExportDir=/somedir", "--commitLogDir=/someotherdir", - "--environment=alpha" + "--registryEnvironment=ALPHA" }; private static final transient InitSqlPipelineOptions options = diff --git a/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineTest.java b/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineTest.java index 779cc3773..0ec85c17a 100644 --- a/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineTest.java +++ b/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineTest.java @@ -317,8 +317,6 @@ class InitSqlPipelineTest { void runPipeline() { InitSqlPipelineOptions options = PipelineOptionsFactory.fromArgs( - "--sqlCredentialUrlOverride=" - + beamJpaExtension.getCredentialFile().getAbsolutePath(), "--commitLogStartTimestamp=" + START_TIME, "--commitLogEndTimestamp=" + fakeClock.nowUtc().plusMillis(1), "--datastoreExportDir=" + exportDir.getAbsolutePath(), diff --git a/core/src/test/resources/google/registry/beam/initsql/pipeline_golden.dot b/core/src/test/resources/google/registry/beam/initsql/pipeline_golden.dot index 7e81d37cb..0711ac4ad 100644 --- a/core/src/test/resources/google/registry/beam/initsql/pipeline_golden.dot +++ b/core/src/test/resources/google/registry/beam/initsql/pipeline_golden.dot @@ -236,1400 +236,1543 @@ digraph { 88 -> 89 [style=solid label=""] } subgraph cluster_90 { - label = "Write to sql: Transforms:Registry" + label = "Write to Sql: Transforms:Registry" subgraph cluster_91 { - label = "Write to sql: Transforms:Registry/Shard data for Transforms:Registry" + label = "Write to Sql: Transforms:Registry/Shard data Transforms:Registry" subgraph cluster_92 { - label = "Write to sql: Transforms:Registry/Shard data for Transforms:Registry/Map" - 93 [label="ParMultiDo(Anonymous)"] - 89 -> 93 [style=solid label=""] + label = "Write to Sql: Transforms:Registry/Shard data Transforms:Registry/AddKeys" + subgraph cluster_93 { + label = "Write to Sql: Transforms:Registry/Shard data Transforms:Registry/AddKeys/Map" + 94 [label="ParMultiDo(Anonymous)"] + 89 -> 94 [style=solid label=""] + } } } - subgraph cluster_94 { - label = "Write to sql: Transforms:Registry/Batch output by shard Transforms:Registry" - subgraph cluster_95 { - label = "Write to sql: Transforms:Registry/Batch output by shard Transforms:Registry/ParDo(GroupIntoBatches)" - 96 [label="ParMultiDo(GroupIntoBatches)"] - 93 -> 96 [style=solid label=""] + subgraph cluster_95 { + label = "Write to Sql: Transforms:Registry/Group into batches Transforms:Registry" + subgraph cluster_96 { + label = "Write to Sql: Transforms:Registry/Group into batches Transforms:Registry/MapElements" + subgraph cluster_97 { + label = "Write to Sql: Transforms:Registry/Group into batches Transforms:Registry/MapElements/Map" + 98 [label="ParMultiDo(Anonymous)"] + 94 -> 98 [style=solid label=""] + } + } + subgraph cluster_99 { + label = "Write to Sql: Transforms:Registry/Group into batches Transforms:Registry/ParDo(GroupIntoBatches)" + 100 [label="ParMultiDo(GroupIntoBatches)"] + 98 -> 100 [style=solid label=""] } } - subgraph cluster_97 { - label = "Write to sql: Transforms:Registry/Write in batch for Transforms:Registry" - 98 [label="ParMultiDo(SqlBatchWriter)"] - 96 -> 98 [style=solid label=""] + subgraph cluster_101 { + label = "Write to Sql: Transforms:Registry/Write in batch for Transforms:Registry" + 102 [label="ParMultiDo(SqlBatchWriter)"] + 100 -> 102 [style=solid label=""] } } - subgraph cluster_99 { + subgraph cluster_103 { label = "Wait on Transforms:Registry" - subgraph cluster_100 { + subgraph cluster_104 { label = "Wait on Transforms:Registry/To wait view 0" - subgraph cluster_101 { - label = "Wait on Transforms:Registry/To wait view 0/Window.Into()" - 102 [label="Flatten.PCollections"] - 98 -> 102 [style=solid label=""] - } - subgraph cluster_103 { - label = "Wait on Transforms:Registry/To wait view 0/ParDo(CollectWindows)" - 104 [label="ParMultiDo(CollectWindows)"] - 102 -> 104 [style=solid label=""] - } subgraph cluster_105 { + label = "Wait on Transforms:Registry/To wait view 0/Window.Into()" + 106 [label="Flatten.PCollections"] + 102 -> 106 [style=solid label=""] + } + subgraph cluster_107 { + label = "Wait on Transforms:Registry/To wait view 0/ParDo(CollectWindows)" + 108 [label="ParMultiDo(CollectWindows)"] + 106 -> 108 [style=solid label=""] + } + subgraph cluster_109 { label = "Wait on Transforms:Registry/To wait view 0/Sample.Any" - subgraph cluster_106 { + subgraph cluster_110 { label = "Wait on Transforms:Registry/To wait view 0/Sample.Any/Combine.globally(SampleAny)" - subgraph cluster_107 { - label = "Wait on Transforms:Registry/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys" - subgraph cluster_108 { - label = "Wait on Transforms:Registry/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys" - subgraph cluster_109 { - label = "Wait on Transforms:Registry/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys/Map" - 110 [label="ParMultiDo(Anonymous)"] - 104 -> 110 [style=solid label=""] - } - } - } subgraph cluster_111 { + label = "Wait on Transforms:Registry/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys" + subgraph cluster_112 { + label = "Wait on Transforms:Registry/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys" + subgraph cluster_113 { + label = "Wait on Transforms:Registry/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys/Map" + 114 [label="ParMultiDo(Anonymous)"] + 108 -> 114 [style=solid label=""] + } + } + } + subgraph cluster_115 { label = "Wait on Transforms:Registry/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)" - 112 [label="GroupByKey"] - 110 -> 112 [style=solid label=""] - subgraph cluster_113 { - label = "Wait on Transforms:Registry/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues" - subgraph cluster_114 { - label = "Wait on Transforms:Registry/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues/ParDo(Anonymous)" - 115 [label="ParMultiDo(Anonymous)"] - 112 -> 115 [style=solid label=""] - } - } - } - subgraph cluster_116 { - label = "Wait on Transforms:Registry/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values" + 116 [label="GroupByKey"] + 114 -> 116 [style=solid label=""] subgraph cluster_117 { - label = "Wait on Transforms:Registry/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values" + label = "Wait on Transforms:Registry/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues" subgraph cluster_118 { - label = "Wait on Transforms:Registry/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values/Map" + label = "Wait on Transforms:Registry/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues/ParDo(Anonymous)" 119 [label="ParMultiDo(Anonymous)"] - 115 -> 119 [style=solid label=""] + 116 -> 119 [style=solid label=""] + } + } + } + subgraph cluster_120 { + label = "Wait on Transforms:Registry/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values" + subgraph cluster_121 { + label = "Wait on Transforms:Registry/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values" + subgraph cluster_122 { + label = "Wait on Transforms:Registry/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values/Map" + 123 [label="ParMultiDo(Anonymous)"] + 119 -> 123 [style=solid label=""] } } } } - subgraph cluster_120 { + subgraph cluster_124 { label = "Wait on Transforms:Registry/To wait view 0/Sample.Any/Flatten.Iterables" - subgraph cluster_121 { + subgraph cluster_125 { label = "Wait on Transforms:Registry/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables" - subgraph cluster_122 { + subgraph cluster_126 { label = "Wait on Transforms:Registry/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables/FlatMap" - 123 [label="ParMultiDo(Anonymous)"] - 119 -> 123 [style=solid label=""] + 127 [label="ParMultiDo(Anonymous)"] + 123 -> 127 [style=solid label=""] } } } } - subgraph cluster_124 { + subgraph cluster_128 { label = "Wait on Transforms:Registry/To wait view 0/View.AsList" - subgraph cluster_125 { + subgraph cluster_129 { label = "Wait on Transforms:Registry/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization" - subgraph cluster_126 { + subgraph cluster_130 { label = "Wait on Transforms:Registry/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)" - 127 [label="ParMultiDo(VoidKeyToMultimapMaterialization)"] - 123 -> 127 [style=solid label=""] + 131 [label="ParMultiDo(VoidKeyToMultimapMaterialization)"] + 127 -> 131 [style=solid label=""] } } - 128 [label="View.CreatePCollectionView"] - 127 -> 128 [style=solid label=""] + 132 [label="View.CreatePCollectionView"] + 131 -> 132 [style=solid label=""] } } - subgraph cluster_129 { - label = "Wait on Transforms:Registry/Wait" - subgraph cluster_130 { - label = "Wait on Transforms:Registry/Wait/Map" - 131 [label="ParMultiDo(Anonymous)"] - 89 -> 131 [style=solid label=""] - 127 -> 131 [style=dashed label=""] - } - } - } - subgraph cluster_132 { - label = "Write to sql: Transforms:Registrar" subgraph cluster_133 { - label = "Write to sql: Transforms:Registrar/Shard data for Transforms:Registrar" + label = "Wait on Transforms:Registry/Wait" subgraph cluster_134 { - label = "Write to sql: Transforms:Registrar/Shard data for Transforms:Registrar/Map" + label = "Wait on Transforms:Registry/Wait/Map" 135 [label="ParMultiDo(Anonymous)"] - 131 -> 135 [style=solid label=""] + 89 -> 135 [style=solid label=""] + 131 -> 135 [style=dashed label=""] } } - subgraph cluster_136 { - label = "Write to sql: Transforms:Registrar/Batch output by shard Transforms:Registrar" - subgraph cluster_137 { - label = "Write to sql: Transforms:Registrar/Batch output by shard Transforms:Registrar/ParDo(GroupIntoBatches)" - 138 [label="ParMultiDo(GroupIntoBatches)"] - 135 -> 138 [style=solid label=""] - } - } - subgraph cluster_139 { - label = "Write to sql: Transforms:Registrar/Write in batch for Transforms:Registrar" - 140 [label="ParMultiDo(SqlBatchWriter)"] - 138 -> 140 [style=solid label=""] - } } - subgraph cluster_141 { - label = "Wait on Transforms:Registrar" - subgraph cluster_142 { - label = "Wait on Transforms:Registrar/To wait view 0" - subgraph cluster_143 { - label = "Wait on Transforms:Registrar/To wait view 0/Window.Into()" - 144 [label="Flatten.PCollections"] - 140 -> 144 [style=solid label=""] + subgraph cluster_136 { + label = "Write to Sql: Transforms:Registrar" + subgraph cluster_137 { + label = "Write to Sql: Transforms:Registrar/Shard data Transforms:Registrar" + subgraph cluster_138 { + label = "Write to Sql: Transforms:Registrar/Shard data Transforms:Registrar/AddKeys" + subgraph cluster_139 { + label = "Write to Sql: Transforms:Registrar/Shard data Transforms:Registrar/AddKeys/Map" + 140 [label="ParMultiDo(Anonymous)"] + 135 -> 140 [style=solid label=""] + } + } + } + subgraph cluster_141 { + label = "Write to Sql: Transforms:Registrar/Group into batches Transforms:Registrar" + subgraph cluster_142 { + label = "Write to Sql: Transforms:Registrar/Group into batches Transforms:Registrar/MapElements" + subgraph cluster_143 { + label = "Write to Sql: Transforms:Registrar/Group into batches Transforms:Registrar/MapElements/Map" + 144 [label="ParMultiDo(Anonymous)"] + 140 -> 144 [style=solid label=""] + } } subgraph cluster_145 { - label = "Wait on Transforms:Registrar/To wait view 0/ParDo(CollectWindows)" - 146 [label="ParMultiDo(CollectWindows)"] + label = "Write to Sql: Transforms:Registrar/Group into batches Transforms:Registrar/ParDo(GroupIntoBatches)" + 146 [label="ParMultiDo(GroupIntoBatches)"] 144 -> 146 [style=solid label=""] } - subgraph cluster_147 { + } + subgraph cluster_147 { + label = "Write to Sql: Transforms:Registrar/Write in batch for Transforms:Registrar" + 148 [label="ParMultiDo(SqlBatchWriter)"] + 146 -> 148 [style=solid label=""] + } + } + subgraph cluster_149 { + label = "Wait on Transforms:Registrar" + subgraph cluster_150 { + label = "Wait on Transforms:Registrar/To wait view 0" + subgraph cluster_151 { + label = "Wait on Transforms:Registrar/To wait view 0/Window.Into()" + 152 [label="Flatten.PCollections"] + 148 -> 152 [style=solid label=""] + } + subgraph cluster_153 { + label = "Wait on Transforms:Registrar/To wait view 0/ParDo(CollectWindows)" + 154 [label="ParMultiDo(CollectWindows)"] + 152 -> 154 [style=solid label=""] + } + subgraph cluster_155 { label = "Wait on Transforms:Registrar/To wait view 0/Sample.Any" - subgraph cluster_148 { + subgraph cluster_156 { label = "Wait on Transforms:Registrar/To wait view 0/Sample.Any/Combine.globally(SampleAny)" - subgraph cluster_149 { + subgraph cluster_157 { label = "Wait on Transforms:Registrar/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys" - subgraph cluster_150 { + subgraph cluster_158 { label = "Wait on Transforms:Registrar/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys" - subgraph cluster_151 { + subgraph cluster_159 { label = "Wait on Transforms:Registrar/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys/Map" - 152 [label="ParMultiDo(Anonymous)"] - 146 -> 152 [style=solid label=""] + 160 [label="ParMultiDo(Anonymous)"] + 154 -> 160 [style=solid label=""] } } } - subgraph cluster_153 { + subgraph cluster_161 { label = "Wait on Transforms:Registrar/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)" - 154 [label="GroupByKey"] - 152 -> 154 [style=solid label=""] - subgraph cluster_155 { + 162 [label="GroupByKey"] + 160 -> 162 [style=solid label=""] + subgraph cluster_163 { label = "Wait on Transforms:Registrar/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues" - subgraph cluster_156 { + subgraph cluster_164 { label = "Wait on Transforms:Registrar/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues/ParDo(Anonymous)" - 157 [label="ParMultiDo(Anonymous)"] - 154 -> 157 [style=solid label=""] + 165 [label="ParMultiDo(Anonymous)"] + 162 -> 165 [style=solid label=""] } } } - subgraph cluster_158 { + subgraph cluster_166 { label = "Wait on Transforms:Registrar/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values" - subgraph cluster_159 { + subgraph cluster_167 { label = "Wait on Transforms:Registrar/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values" - subgraph cluster_160 { + subgraph cluster_168 { label = "Wait on Transforms:Registrar/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values/Map" - 161 [label="ParMultiDo(Anonymous)"] - 157 -> 161 [style=solid label=""] + 169 [label="ParMultiDo(Anonymous)"] + 165 -> 169 [style=solid label=""] } } } } - subgraph cluster_162 { + subgraph cluster_170 { label = "Wait on Transforms:Registrar/To wait view 0/Sample.Any/Flatten.Iterables" - subgraph cluster_163 { + subgraph cluster_171 { label = "Wait on Transforms:Registrar/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables" - subgraph cluster_164 { + subgraph cluster_172 { label = "Wait on Transforms:Registrar/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables/FlatMap" - 165 [label="ParMultiDo(Anonymous)"] - 161 -> 165 [style=solid label=""] + 173 [label="ParMultiDo(Anonymous)"] + 169 -> 173 [style=solid label=""] } } } } - subgraph cluster_166 { + subgraph cluster_174 { label = "Wait on Transforms:Registrar/To wait view 0/View.AsList" - subgraph cluster_167 { + subgraph cluster_175 { label = "Wait on Transforms:Registrar/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization" - subgraph cluster_168 { + subgraph cluster_176 { label = "Wait on Transforms:Registrar/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)" - 169 [label="ParMultiDo(VoidKeyToMultimapMaterialization)"] - 165 -> 169 [style=solid label=""] + 177 [label="ParMultiDo(VoidKeyToMultimapMaterialization)"] + 173 -> 177 [style=solid label=""] } } - 170 [label="View.CreatePCollectionView"] - 169 -> 170 [style=solid label=""] + 178 [label="View.CreatePCollectionView"] + 177 -> 178 [style=solid label=""] } } - subgraph cluster_171 { + subgraph cluster_179 { label = "Wait on Transforms:Registrar/Wait" - subgraph cluster_172 { + subgraph cluster_180 { label = "Wait on Transforms:Registrar/Wait/Map" - 173 [label="ParMultiDo(Anonymous)"] - 89 -> 173 [style=solid label=""] - 169 -> 173 [style=dashed label=""] + 181 [label="ParMultiDo(Anonymous)"] + 89 -> 181 [style=solid label=""] + 177 -> 181 [style=dashed label=""] } } } - subgraph cluster_174 { - label = "Write to sql: Transforms:ContactResource" - subgraph cluster_175 { - label = "Write to sql: Transforms:ContactResource/Shard data for Transforms:ContactResource" - subgraph cluster_176 { - label = "Write to sql: Transforms:ContactResource/Shard data for Transforms:ContactResource/Map" - 177 [label="ParMultiDo(Anonymous)"] - 173 -> 177 [style=solid label=""] + subgraph cluster_182 { + label = "Write to Sql: Transforms:ContactResource" + subgraph cluster_183 { + label = "Write to Sql: Transforms:ContactResource/Shard data Transforms:ContactResource" + subgraph cluster_184 { + label = "Write to Sql: Transforms:ContactResource/Shard data Transforms:ContactResource/AddKeys" + subgraph cluster_185 { + label = "Write to Sql: Transforms:ContactResource/Shard data Transforms:ContactResource/AddKeys/Map" + 186 [label="ParMultiDo(Anonymous)"] + 181 -> 186 [style=solid label=""] + } } } - subgraph cluster_178 { - label = "Write to sql: Transforms:ContactResource/Batch output by shard Transforms:ContactResource" - subgraph cluster_179 { - label = "Write to sql: Transforms:ContactResource/Batch output by shard Transforms:ContactResource/ParDo(GroupIntoBatches)" - 180 [label="ParMultiDo(GroupIntoBatches)"] - 177 -> 180 [style=solid label=""] + subgraph cluster_187 { + label = "Write to Sql: Transforms:ContactResource/Group into batches Transforms:ContactResource" + subgraph cluster_188 { + label = "Write to Sql: Transforms:ContactResource/Group into batches Transforms:ContactResource/MapElements" + subgraph cluster_189 { + label = "Write to Sql: Transforms:ContactResource/Group into batches Transforms:ContactResource/MapElements/Map" + 190 [label="ParMultiDo(Anonymous)"] + 186 -> 190 [style=solid label=""] + } + } + subgraph cluster_191 { + label = "Write to Sql: Transforms:ContactResource/Group into batches Transforms:ContactResource/ParDo(GroupIntoBatches)" + 192 [label="ParMultiDo(GroupIntoBatches)"] + 190 -> 192 [style=solid label=""] } } - subgraph cluster_181 { - label = "Write to sql: Transforms:ContactResource/Write in batch for Transforms:ContactResource" - 182 [label="ParMultiDo(SqlBatchWriter)"] - 180 -> 182 [style=solid label=""] + subgraph cluster_193 { + label = "Write to Sql: Transforms:ContactResource/Write in batch for Transforms:ContactResource" + 194 [label="ParMultiDo(SqlBatchWriter)"] + 192 -> 194 [style=solid label=""] } } - subgraph cluster_183 { + subgraph cluster_195 { label = "Wait on Transforms:ContactResource" - subgraph cluster_184 { + subgraph cluster_196 { label = "Wait on Transforms:ContactResource/To wait view 0" - subgraph cluster_185 { + subgraph cluster_197 { label = "Wait on Transforms:ContactResource/To wait view 0/Window.Into()" - 186 [label="Flatten.PCollections"] - 182 -> 186 [style=solid label=""] + 198 [label="Flatten.PCollections"] + 194 -> 198 [style=solid label=""] } - subgraph cluster_187 { + subgraph cluster_199 { label = "Wait on Transforms:ContactResource/To wait view 0/ParDo(CollectWindows)" - 188 [label="ParMultiDo(CollectWindows)"] - 186 -> 188 [style=solid label=""] + 200 [label="ParMultiDo(CollectWindows)"] + 198 -> 200 [style=solid label=""] } - subgraph cluster_189 { + subgraph cluster_201 { label = "Wait on Transforms:ContactResource/To wait view 0/Sample.Any" - subgraph cluster_190 { + subgraph cluster_202 { label = "Wait on Transforms:ContactResource/To wait view 0/Sample.Any/Combine.globally(SampleAny)" - subgraph cluster_191 { + subgraph cluster_203 { label = "Wait on Transforms:ContactResource/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys" - subgraph cluster_192 { + subgraph cluster_204 { label = "Wait on Transforms:ContactResource/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys" - subgraph cluster_193 { + subgraph cluster_205 { label = "Wait on Transforms:ContactResource/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys/Map" - 194 [label="ParMultiDo(Anonymous)"] - 188 -> 194 [style=solid label=""] + 206 [label="ParMultiDo(Anonymous)"] + 200 -> 206 [style=solid label=""] } } } - subgraph cluster_195 { + subgraph cluster_207 { label = "Wait on Transforms:ContactResource/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)" - 196 [label="GroupByKey"] - 194 -> 196 [style=solid label=""] - subgraph cluster_197 { + 208 [label="GroupByKey"] + 206 -> 208 [style=solid label=""] + subgraph cluster_209 { label = "Wait on Transforms:ContactResource/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues" - subgraph cluster_198 { + subgraph cluster_210 { label = "Wait on Transforms:ContactResource/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues/ParDo(Anonymous)" - 199 [label="ParMultiDo(Anonymous)"] - 196 -> 199 [style=solid label=""] + 211 [label="ParMultiDo(Anonymous)"] + 208 -> 211 [style=solid label=""] } } } - subgraph cluster_200 { + subgraph cluster_212 { label = "Wait on Transforms:ContactResource/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values" - subgraph cluster_201 { + subgraph cluster_213 { label = "Wait on Transforms:ContactResource/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values" - subgraph cluster_202 { + subgraph cluster_214 { label = "Wait on Transforms:ContactResource/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values/Map" - 203 [label="ParMultiDo(Anonymous)"] - 199 -> 203 [style=solid label=""] + 215 [label="ParMultiDo(Anonymous)"] + 211 -> 215 [style=solid label=""] } } } } - subgraph cluster_204 { + subgraph cluster_216 { label = "Wait on Transforms:ContactResource/To wait view 0/Sample.Any/Flatten.Iterables" - subgraph cluster_205 { + subgraph cluster_217 { label = "Wait on Transforms:ContactResource/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables" - subgraph cluster_206 { + subgraph cluster_218 { label = "Wait on Transforms:ContactResource/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables/FlatMap" - 207 [label="ParMultiDo(Anonymous)"] - 203 -> 207 [style=solid label=""] + 219 [label="ParMultiDo(Anonymous)"] + 215 -> 219 [style=solid label=""] } } } } - subgraph cluster_208 { + subgraph cluster_220 { label = "Wait on Transforms:ContactResource/To wait view 0/View.AsList" - subgraph cluster_209 { + subgraph cluster_221 { label = "Wait on Transforms:ContactResource/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization" - subgraph cluster_210 { + subgraph cluster_222 { label = "Wait on Transforms:ContactResource/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)" - 211 [label="ParMultiDo(VoidKeyToMultimapMaterialization)"] - 207 -> 211 [style=solid label=""] + 223 [label="ParMultiDo(VoidKeyToMultimapMaterialization)"] + 219 -> 223 [style=solid label=""] } } - 212 [label="View.CreatePCollectionView"] - 211 -> 212 [style=solid label=""] + 224 [label="View.CreatePCollectionView"] + 223 -> 224 [style=solid label=""] } } - subgraph cluster_213 { + subgraph cluster_225 { label = "Wait on Transforms:ContactResource/Wait" - subgraph cluster_214 { + subgraph cluster_226 { label = "Wait on Transforms:ContactResource/Wait/Map" - 215 [label="ParMultiDo(Anonymous)"] - 89 -> 215 [style=solid label=""] - 211 -> 215 [style=dashed label=""] + 227 [label="ParMultiDo(Anonymous)"] + 89 -> 227 [style=solid label=""] + 223 -> 227 [style=dashed label=""] } } } - subgraph cluster_216 { - label = "Write to sql: Transforms:RegistrarContact" - subgraph cluster_217 { - label = "Write to sql: Transforms:RegistrarContact/Shard data for Transforms:RegistrarContact" - subgraph cluster_218 { - label = "Write to sql: Transforms:RegistrarContact/Shard data for Transforms:RegistrarContact/Map" - 219 [label="ParMultiDo(Anonymous)"] - 215 -> 219 [style=solid label=""] + subgraph cluster_228 { + label = "Write to Sql: Transforms:RegistrarContact" + subgraph cluster_229 { + label = "Write to Sql: Transforms:RegistrarContact/Shard data Transforms:RegistrarContact" + subgraph cluster_230 { + label = "Write to Sql: Transforms:RegistrarContact/Shard data Transforms:RegistrarContact/AddKeys" + subgraph cluster_231 { + label = "Write to Sql: Transforms:RegistrarContact/Shard data Transforms:RegistrarContact/AddKeys/Map" + 232 [label="ParMultiDo(Anonymous)"] + 227 -> 232 [style=solid label=""] + } } } - subgraph cluster_220 { - label = "Write to sql: Transforms:RegistrarContact/Batch output by shard Transforms:RegistrarContact" - subgraph cluster_221 { - label = "Write to sql: Transforms:RegistrarContact/Batch output by shard Transforms:RegistrarContact/ParDo(GroupIntoBatches)" - 222 [label="ParMultiDo(GroupIntoBatches)"] - 219 -> 222 [style=solid label=""] + subgraph cluster_233 { + label = "Write to Sql: Transforms:RegistrarContact/Group into batches Transforms:RegistrarContact" + subgraph cluster_234 { + label = "Write to Sql: Transforms:RegistrarContact/Group into batches Transforms:RegistrarContact/MapElements" + subgraph cluster_235 { + label = "Write to Sql: Transforms:RegistrarContact/Group into batches Transforms:RegistrarContact/MapElements/Map" + 236 [label="ParMultiDo(Anonymous)"] + 232 -> 236 [style=solid label=""] + } + } + subgraph cluster_237 { + label = "Write to Sql: Transforms:RegistrarContact/Group into batches Transforms:RegistrarContact/ParDo(GroupIntoBatches)" + 238 [label="ParMultiDo(GroupIntoBatches)"] + 236 -> 238 [style=solid label=""] } } - subgraph cluster_223 { - label = "Write to sql: Transforms:RegistrarContact/Write in batch for Transforms:RegistrarContact" - 224 [label="ParMultiDo(SqlBatchWriter)"] - 222 -> 224 [style=solid label=""] + subgraph cluster_239 { + label = "Write to Sql: Transforms:RegistrarContact/Write in batch for Transforms:RegistrarContact" + 240 [label="ParMultiDo(SqlBatchWriter)"] + 238 -> 240 [style=solid label=""] } } - subgraph cluster_225 { + subgraph cluster_241 { label = "Remove circular foreign keys from DomainBase" - 226 [label="ParMultiDo(RemoveDomainBaseForeignKeys)"] - 89 -> 226 [style=solid label=""] + 242 [label="ParMultiDo(RemoveDomainBaseForeignKeys)"] + 89 -> 242 [style=solid label=""] } - subgraph cluster_227 { + subgraph cluster_243 { label = "Wait on phase one" - subgraph cluster_228 { + subgraph cluster_244 { label = "Wait on phase one/To wait view 0" - subgraph cluster_229 { + subgraph cluster_245 { label = "Wait on phase one/To wait view 0/Window.Into()" - 230 [label="Flatten.PCollections"] - 224 -> 230 [style=solid label=""] + 246 [label="Flatten.PCollections"] + 240 -> 246 [style=solid label=""] } - subgraph cluster_231 { + subgraph cluster_247 { label = "Wait on phase one/To wait view 0/ParDo(CollectWindows)" - 232 [label="ParMultiDo(CollectWindows)"] - 230 -> 232 [style=solid label=""] + 248 [label="ParMultiDo(CollectWindows)"] + 246 -> 248 [style=solid label=""] } - subgraph cluster_233 { + subgraph cluster_249 { label = "Wait on phase one/To wait view 0/Sample.Any" - subgraph cluster_234 { + subgraph cluster_250 { label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)" - subgraph cluster_235 { + subgraph cluster_251 { label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys" - subgraph cluster_236 { + subgraph cluster_252 { label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys" - subgraph cluster_237 { + subgraph cluster_253 { label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys/Map" - 238 [label="ParMultiDo(Anonymous)"] - 232 -> 238 [style=solid label=""] + 254 [label="ParMultiDo(Anonymous)"] + 248 -> 254 [style=solid label=""] } } } - subgraph cluster_239 { + subgraph cluster_255 { label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)" - 240 [label="GroupByKey"] - 238 -> 240 [style=solid label=""] - subgraph cluster_241 { + 256 [label="GroupByKey"] + 254 -> 256 [style=solid label=""] + subgraph cluster_257 { label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues" - subgraph cluster_242 { + subgraph cluster_258 { label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues/ParDo(Anonymous)" - 243 [label="ParMultiDo(Anonymous)"] - 240 -> 243 [style=solid label=""] + 259 [label="ParMultiDo(Anonymous)"] + 256 -> 259 [style=solid label=""] } } } - subgraph cluster_244 { + subgraph cluster_260 { label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values" - subgraph cluster_245 { + subgraph cluster_261 { label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values" - subgraph cluster_246 { + subgraph cluster_262 { label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values/Map" - 247 [label="ParMultiDo(Anonymous)"] - 243 -> 247 [style=solid label=""] + 263 [label="ParMultiDo(Anonymous)"] + 259 -> 263 [style=solid label=""] } } } } - subgraph cluster_248 { + subgraph cluster_264 { label = "Wait on phase one/To wait view 0/Sample.Any/Flatten.Iterables" - subgraph cluster_249 { + subgraph cluster_265 { label = "Wait on phase one/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables" - subgraph cluster_250 { + subgraph cluster_266 { label = "Wait on phase one/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables/FlatMap" - 251 [label="ParMultiDo(Anonymous)"] - 247 -> 251 [style=solid label=""] + 267 [label="ParMultiDo(Anonymous)"] + 263 -> 267 [style=solid label=""] } } } } - subgraph cluster_252 { + subgraph cluster_268 { label = "Wait on phase one/To wait view 0/View.AsList" - subgraph cluster_253 { + subgraph cluster_269 { label = "Wait on phase one/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization" - subgraph cluster_254 { + subgraph cluster_270 { label = "Wait on phase one/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)" - 255 [label="ParMultiDo(VoidKeyToMultimapMaterialization)"] - 251 -> 255 [style=solid label=""] + 271 [label="ParMultiDo(VoidKeyToMultimapMaterialization)"] + 267 -> 271 [style=solid label=""] } } - 256 [label="View.CreatePCollectionView"] - 255 -> 256 [style=solid label=""] + 272 [label="View.CreatePCollectionView"] + 271 -> 272 [style=solid label=""] } } - subgraph cluster_257 { + subgraph cluster_273 { label = "Wait on phase one/Wait" - subgraph cluster_258 { + subgraph cluster_274 { label = "Wait on phase one/Wait/Map" - 259 [label="ParMultiDo(Anonymous)"] - 226 -> 259 [style=solid label=""] - 255 -> 259 [style=dashed label=""] + 275 [label="ParMultiDo(Anonymous)"] + 242 -> 275 [style=solid label=""] + 271 -> 275 [style=dashed label=""] } } } - subgraph cluster_260 { - label = "Write to sql: DomainBase without circular foreign keys" - subgraph cluster_261 { - label = "Write to sql: DomainBase without circular foreign keys/Shard data for DomainBase without circular foreign keys" - subgraph cluster_262 { - label = "Write to sql: DomainBase without circular foreign keys/Shard data for DomainBase without circular foreign keys/Map" - 263 [label="ParMultiDo(Anonymous)"] - 259 -> 263 [style=solid label=""] + subgraph cluster_276 { + label = "Write to Sql: DomainBase without circular foreign keys" + subgraph cluster_277 { + label = "Write to Sql: DomainBase without circular foreign keys/Shard data DomainBase without circular foreign keys" + subgraph cluster_278 { + label = "Write to Sql: DomainBase without circular foreign keys/Shard data DomainBase without circular foreign keys/AddKeys" + subgraph cluster_279 { + label = "Write to Sql: DomainBase without circular foreign keys/Shard data DomainBase without circular foreign keys/AddKeys/Map" + 280 [label="ParMultiDo(Anonymous)"] + 275 -> 280 [style=solid label=""] + } } } - subgraph cluster_264 { - label = "Write to sql: DomainBase without circular foreign keys/Batch output by shard DomainBase without circular foreign keys" - subgraph cluster_265 { - label = "Write to sql: DomainBase without circular foreign keys/Batch output by shard DomainBase without circular foreign keys/ParDo(GroupIntoBatches)" - 266 [label="ParMultiDo(GroupIntoBatches)"] - 263 -> 266 [style=solid label=""] + subgraph cluster_281 { + label = "Write to Sql: DomainBase without circular foreign keys/Group into batches DomainBase without circular foreign keys" + subgraph cluster_282 { + label = "Write to Sql: DomainBase without circular foreign keys/Group into batches DomainBase without circular foreign keys/MapElements" + subgraph cluster_283 { + label = "Write to Sql: DomainBase without circular foreign keys/Group into batches DomainBase without circular foreign keys/MapElements/Map" + 284 [label="ParMultiDo(Anonymous)"] + 280 -> 284 [style=solid label=""] + } + } + subgraph cluster_285 { + label = "Write to Sql: DomainBase without circular foreign keys/Group into batches DomainBase without circular foreign keys/ParDo(GroupIntoBatches)" + 286 [label="ParMultiDo(GroupIntoBatches)"] + 284 -> 286 [style=solid label=""] } } - subgraph cluster_267 { - label = "Write to sql: DomainBase without circular foreign keys/Write in batch for DomainBase without circular foreign keys" - 268 [label="ParMultiDo(SqlBatchWriter)"] - 266 -> 268 [style=solid label=""] + subgraph cluster_287 { + label = "Write to Sql: DomainBase without circular foreign keys/Write in batch for DomainBase without circular foreign keys" + 288 [label="ParMultiDo(SqlBatchWriter)"] + 286 -> 288 [style=solid label=""] } } - subgraph cluster_269 { + subgraph cluster_289 { label = "Wait on DomainBaseNoFkeys" - subgraph cluster_270 { + subgraph cluster_290 { label = "Wait on DomainBaseNoFkeys/To wait view 0" - subgraph cluster_271 { + subgraph cluster_291 { label = "Wait on DomainBaseNoFkeys/To wait view 0/Window.Into()" - 272 [label="Flatten.PCollections"] - 268 -> 272 [style=solid label=""] + 292 [label="Flatten.PCollections"] + 288 -> 292 [style=solid label=""] } - subgraph cluster_273 { + subgraph cluster_293 { label = "Wait on DomainBaseNoFkeys/To wait view 0/ParDo(CollectWindows)" - 274 [label="ParMultiDo(CollectWindows)"] - 272 -> 274 [style=solid label=""] + 294 [label="ParMultiDo(CollectWindows)"] + 292 -> 294 [style=solid label=""] } - subgraph cluster_275 { + subgraph cluster_295 { label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any" - subgraph cluster_276 { + subgraph cluster_296 { label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)" - subgraph cluster_277 { + subgraph cluster_297 { label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys" - subgraph cluster_278 { + subgraph cluster_298 { label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys" - subgraph cluster_279 { + subgraph cluster_299 { label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys/Map" - 280 [label="ParMultiDo(Anonymous)"] - 274 -> 280 [style=solid label=""] + 300 [label="ParMultiDo(Anonymous)"] + 294 -> 300 [style=solid label=""] } } } - subgraph cluster_281 { + subgraph cluster_301 { label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)" - 282 [label="GroupByKey"] - 280 -> 282 [style=solid label=""] - subgraph cluster_283 { + 302 [label="GroupByKey"] + 300 -> 302 [style=solid label=""] + subgraph cluster_303 { label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues" - subgraph cluster_284 { + subgraph cluster_304 { label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues/ParDo(Anonymous)" - 285 [label="ParMultiDo(Anonymous)"] - 282 -> 285 [style=solid label=""] + 305 [label="ParMultiDo(Anonymous)"] + 302 -> 305 [style=solid label=""] } } } - subgraph cluster_286 { + subgraph cluster_306 { label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values" - subgraph cluster_287 { + subgraph cluster_307 { label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values" - subgraph cluster_288 { + subgraph cluster_308 { label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values/Map" - 289 [label="ParMultiDo(Anonymous)"] - 285 -> 289 [style=solid label=""] + 309 [label="ParMultiDo(Anonymous)"] + 305 -> 309 [style=solid label=""] } } } } - subgraph cluster_290 { + subgraph cluster_310 { label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Flatten.Iterables" - subgraph cluster_291 { + subgraph cluster_311 { label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables" - subgraph cluster_292 { + subgraph cluster_312 { label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables/FlatMap" - 293 [label="ParMultiDo(Anonymous)"] - 289 -> 293 [style=solid label=""] + 313 [label="ParMultiDo(Anonymous)"] + 309 -> 313 [style=solid label=""] } } } } - subgraph cluster_294 { + subgraph cluster_314 { label = "Wait on DomainBaseNoFkeys/To wait view 0/View.AsList" - subgraph cluster_295 { + subgraph cluster_315 { label = "Wait on DomainBaseNoFkeys/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization" - subgraph cluster_296 { + subgraph cluster_316 { label = "Wait on DomainBaseNoFkeys/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)" - 297 [label="ParMultiDo(VoidKeyToMultimapMaterialization)"] - 293 -> 297 [style=solid label=""] + 317 [label="ParMultiDo(VoidKeyToMultimapMaterialization)"] + 313 -> 317 [style=solid label=""] } } - 298 [label="View.CreatePCollectionView"] - 297 -> 298 [style=solid label=""] + 318 [label="View.CreatePCollectionView"] + 317 -> 318 [style=solid label=""] } } - subgraph cluster_299 { + subgraph cluster_319 { label = "Wait on DomainBaseNoFkeys/Wait" - subgraph cluster_300 { + subgraph cluster_320 { label = "Wait on DomainBaseNoFkeys/Wait/Map" - 301 [label="ParMultiDo(Anonymous)"] - 89 -> 301 [style=solid label=""] - 297 -> 301 [style=dashed label=""] + 321 [label="ParMultiDo(Anonymous)"] + 89 -> 321 [style=solid label=""] + 317 -> 321 [style=dashed label=""] } } } - subgraph cluster_302 { - label = "Write to sql: Transforms:HostResource" - subgraph cluster_303 { - label = "Write to sql: Transforms:HostResource/Shard data for Transforms:HostResource" - subgraph cluster_304 { - label = "Write to sql: Transforms:HostResource/Shard data for Transforms:HostResource/Map" - 305 [label="ParMultiDo(Anonymous)"] - 301 -> 305 [style=solid label=""] + subgraph cluster_322 { + label = "Write to Sql: Transforms:HostResource" + subgraph cluster_323 { + label = "Write to Sql: Transforms:HostResource/Shard data Transforms:HostResource" + subgraph cluster_324 { + label = "Write to Sql: Transforms:HostResource/Shard data Transforms:HostResource/AddKeys" + subgraph cluster_325 { + label = "Write to Sql: Transforms:HostResource/Shard data Transforms:HostResource/AddKeys/Map" + 326 [label="ParMultiDo(Anonymous)"] + 321 -> 326 [style=solid label=""] + } } } - subgraph cluster_306 { - label = "Write to sql: Transforms:HostResource/Batch output by shard Transforms:HostResource" - subgraph cluster_307 { - label = "Write to sql: Transforms:HostResource/Batch output by shard Transforms:HostResource/ParDo(GroupIntoBatches)" - 308 [label="ParMultiDo(GroupIntoBatches)"] - 305 -> 308 [style=solid label=""] + subgraph cluster_327 { + label = "Write to Sql: Transforms:HostResource/Group into batches Transforms:HostResource" + subgraph cluster_328 { + label = "Write to Sql: Transforms:HostResource/Group into batches Transforms:HostResource/MapElements" + subgraph cluster_329 { + label = "Write to Sql: Transforms:HostResource/Group into batches Transforms:HostResource/MapElements/Map" + 330 [label="ParMultiDo(Anonymous)"] + 326 -> 330 [style=solid label=""] + } + } + subgraph cluster_331 { + label = "Write to Sql: Transforms:HostResource/Group into batches Transforms:HostResource/ParDo(GroupIntoBatches)" + 332 [label="ParMultiDo(GroupIntoBatches)"] + 330 -> 332 [style=solid label=""] } } - subgraph cluster_309 { - label = "Write to sql: Transforms:HostResource/Write in batch for Transforms:HostResource" - 310 [label="ParMultiDo(SqlBatchWriter)"] - 308 -> 310 [style=solid label=""] + subgraph cluster_333 { + label = "Write to Sql: Transforms:HostResource/Write in batch for Transforms:HostResource" + 334 [label="ParMultiDo(SqlBatchWriter)"] + 332 -> 334 [style=solid label=""] } } - subgraph cluster_311 { + subgraph cluster_335 { label = "Wait on Transforms:HostResource" - subgraph cluster_312 { + subgraph cluster_336 { label = "Wait on Transforms:HostResource/To wait view 0" - subgraph cluster_313 { + subgraph cluster_337 { label = "Wait on Transforms:HostResource/To wait view 0/Window.Into()" - 314 [label="Flatten.PCollections"] - 310 -> 314 [style=solid label=""] + 338 [label="Flatten.PCollections"] + 334 -> 338 [style=solid label=""] } - subgraph cluster_315 { + subgraph cluster_339 { label = "Wait on Transforms:HostResource/To wait view 0/ParDo(CollectWindows)" - 316 [label="ParMultiDo(CollectWindows)"] - 314 -> 316 [style=solid label=""] + 340 [label="ParMultiDo(CollectWindows)"] + 338 -> 340 [style=solid label=""] } - subgraph cluster_317 { + subgraph cluster_341 { label = "Wait on Transforms:HostResource/To wait view 0/Sample.Any" - subgraph cluster_318 { + subgraph cluster_342 { label = "Wait on Transforms:HostResource/To wait view 0/Sample.Any/Combine.globally(SampleAny)" - subgraph cluster_319 { + subgraph cluster_343 { label = "Wait on Transforms:HostResource/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys" - subgraph cluster_320 { + subgraph cluster_344 { label = "Wait on Transforms:HostResource/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys" - subgraph cluster_321 { + subgraph cluster_345 { label = "Wait on Transforms:HostResource/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys/Map" - 322 [label="ParMultiDo(Anonymous)"] - 316 -> 322 [style=solid label=""] + 346 [label="ParMultiDo(Anonymous)"] + 340 -> 346 [style=solid label=""] } } } - subgraph cluster_323 { + subgraph cluster_347 { label = "Wait on Transforms:HostResource/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)" - 324 [label="GroupByKey"] - 322 -> 324 [style=solid label=""] - subgraph cluster_325 { + 348 [label="GroupByKey"] + 346 -> 348 [style=solid label=""] + subgraph cluster_349 { label = "Wait on Transforms:HostResource/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues" - subgraph cluster_326 { + subgraph cluster_350 { label = "Wait on Transforms:HostResource/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues/ParDo(Anonymous)" - 327 [label="ParMultiDo(Anonymous)"] - 324 -> 327 [style=solid label=""] + 351 [label="ParMultiDo(Anonymous)"] + 348 -> 351 [style=solid label=""] } } } - subgraph cluster_328 { + subgraph cluster_352 { label = "Wait on Transforms:HostResource/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values" - subgraph cluster_329 { + subgraph cluster_353 { label = "Wait on Transforms:HostResource/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values" - subgraph cluster_330 { + subgraph cluster_354 { label = "Wait on Transforms:HostResource/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values/Map" - 331 [label="ParMultiDo(Anonymous)"] - 327 -> 331 [style=solid label=""] + 355 [label="ParMultiDo(Anonymous)"] + 351 -> 355 [style=solid label=""] } } } } - subgraph cluster_332 { + subgraph cluster_356 { label = "Wait on Transforms:HostResource/To wait view 0/Sample.Any/Flatten.Iterables" - subgraph cluster_333 { + subgraph cluster_357 { label = "Wait on Transforms:HostResource/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables" - subgraph cluster_334 { + subgraph cluster_358 { label = "Wait on Transforms:HostResource/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables/FlatMap" - 335 [label="ParMultiDo(Anonymous)"] - 331 -> 335 [style=solid label=""] + 359 [label="ParMultiDo(Anonymous)"] + 355 -> 359 [style=solid label=""] } } } } - subgraph cluster_336 { + subgraph cluster_360 { label = "Wait on Transforms:HostResource/To wait view 0/View.AsList" - subgraph cluster_337 { + subgraph cluster_361 { label = "Wait on Transforms:HostResource/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization" - subgraph cluster_338 { + subgraph cluster_362 { label = "Wait on Transforms:HostResource/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)" - 339 [label="ParMultiDo(VoidKeyToMultimapMaterialization)"] - 335 -> 339 [style=solid label=""] + 363 [label="ParMultiDo(VoidKeyToMultimapMaterialization)"] + 359 -> 363 [style=solid label=""] } } - 340 [label="View.CreatePCollectionView"] - 339 -> 340 [style=solid label=""] + 364 [label="View.CreatePCollectionView"] + 363 -> 364 [style=solid label=""] } } - subgraph cluster_341 { + subgraph cluster_365 { label = "Wait on Transforms:HostResource/Wait" - subgraph cluster_342 { + subgraph cluster_366 { label = "Wait on Transforms:HostResource/Wait/Map" - 343 [label="ParMultiDo(Anonymous)"] - 89 -> 343 [style=solid label=""] - 339 -> 343 [style=dashed label=""] + 367 [label="ParMultiDo(Anonymous)"] + 89 -> 367 [style=solid label=""] + 363 -> 367 [style=dashed label=""] } } } - subgraph cluster_344 { - label = "Write to sql: Transforms:HistoryEntry" - subgraph cluster_345 { - label = "Write to sql: Transforms:HistoryEntry/Shard data for Transforms:HistoryEntry" - subgraph cluster_346 { - label = "Write to sql: Transforms:HistoryEntry/Shard data for Transforms:HistoryEntry/Map" - 347 [label="ParMultiDo(Anonymous)"] - 343 -> 347 [style=solid label=""] + subgraph cluster_368 { + label = "Write to Sql: Transforms:HistoryEntry" + subgraph cluster_369 { + label = "Write to Sql: Transforms:HistoryEntry/Shard data Transforms:HistoryEntry" + subgraph cluster_370 { + label = "Write to Sql: Transforms:HistoryEntry/Shard data Transforms:HistoryEntry/AddKeys" + subgraph cluster_371 { + label = "Write to Sql: Transforms:HistoryEntry/Shard data Transforms:HistoryEntry/AddKeys/Map" + 372 [label="ParMultiDo(Anonymous)"] + 367 -> 372 [style=solid label=""] + } } } - subgraph cluster_348 { - label = "Write to sql: Transforms:HistoryEntry/Batch output by shard Transforms:HistoryEntry" - subgraph cluster_349 { - label = "Write to sql: Transforms:HistoryEntry/Batch output by shard Transforms:HistoryEntry/ParDo(GroupIntoBatches)" - 350 [label="ParMultiDo(GroupIntoBatches)"] - 347 -> 350 [style=solid label=""] + subgraph cluster_373 { + label = "Write to Sql: Transforms:HistoryEntry/Group into batches Transforms:HistoryEntry" + subgraph cluster_374 { + label = "Write to Sql: Transforms:HistoryEntry/Group into batches Transforms:HistoryEntry/MapElements" + subgraph cluster_375 { + label = "Write to Sql: Transforms:HistoryEntry/Group into batches Transforms:HistoryEntry/MapElements/Map" + 376 [label="ParMultiDo(Anonymous)"] + 372 -> 376 [style=solid label=""] + } + } + subgraph cluster_377 { + label = "Write to Sql: Transforms:HistoryEntry/Group into batches Transforms:HistoryEntry/ParDo(GroupIntoBatches)" + 378 [label="ParMultiDo(GroupIntoBatches)"] + 376 -> 378 [style=solid label=""] } } - subgraph cluster_351 { - label = "Write to sql: Transforms:HistoryEntry/Write in batch for Transforms:HistoryEntry" - 352 [label="ParMultiDo(SqlBatchWriter)"] - 350 -> 352 [style=solid label=""] + subgraph cluster_379 { + label = "Write to Sql: Transforms:HistoryEntry/Write in batch for Transforms:HistoryEntry" + 380 [label="ParMultiDo(SqlBatchWriter)"] + 378 -> 380 [style=solid label=""] } } - subgraph cluster_353 { + subgraph cluster_381 { label = "Wait on Transforms:HistoryEntry" - subgraph cluster_354 { + subgraph cluster_382 { label = "Wait on Transforms:HistoryEntry/To wait view 0" - subgraph cluster_355 { + subgraph cluster_383 { label = "Wait on Transforms:HistoryEntry/To wait view 0/Window.Into()" - 356 [label="Flatten.PCollections"] - 352 -> 356 [style=solid label=""] + 384 [label="Flatten.PCollections"] + 380 -> 384 [style=solid label=""] } - subgraph cluster_357 { + subgraph cluster_385 { label = "Wait on Transforms:HistoryEntry/To wait view 0/ParDo(CollectWindows)" - 358 [label="ParMultiDo(CollectWindows)"] - 356 -> 358 [style=solid label=""] + 386 [label="ParMultiDo(CollectWindows)"] + 384 -> 386 [style=solid label=""] } - subgraph cluster_359 { + subgraph cluster_387 { label = "Wait on Transforms:HistoryEntry/To wait view 0/Sample.Any" - subgraph cluster_360 { + subgraph cluster_388 { label = "Wait on Transforms:HistoryEntry/To wait view 0/Sample.Any/Combine.globally(SampleAny)" - subgraph cluster_361 { + subgraph cluster_389 { label = "Wait on Transforms:HistoryEntry/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys" - subgraph cluster_362 { + subgraph cluster_390 { label = "Wait on Transforms:HistoryEntry/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys" - subgraph cluster_363 { + subgraph cluster_391 { label = "Wait on Transforms:HistoryEntry/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys/Map" - 364 [label="ParMultiDo(Anonymous)"] - 358 -> 364 [style=solid label=""] + 392 [label="ParMultiDo(Anonymous)"] + 386 -> 392 [style=solid label=""] } } } - subgraph cluster_365 { + subgraph cluster_393 { label = "Wait on Transforms:HistoryEntry/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)" - 366 [label="GroupByKey"] - 364 -> 366 [style=solid label=""] - subgraph cluster_367 { + 394 [label="GroupByKey"] + 392 -> 394 [style=solid label=""] + subgraph cluster_395 { label = "Wait on Transforms:HistoryEntry/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues" - subgraph cluster_368 { + subgraph cluster_396 { label = "Wait on Transforms:HistoryEntry/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues/ParDo(Anonymous)" - 369 [label="ParMultiDo(Anonymous)"] - 366 -> 369 [style=solid label=""] + 397 [label="ParMultiDo(Anonymous)"] + 394 -> 397 [style=solid label=""] } } } - subgraph cluster_370 { + subgraph cluster_398 { label = "Wait on Transforms:HistoryEntry/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values" - subgraph cluster_371 { + subgraph cluster_399 { label = "Wait on Transforms:HistoryEntry/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values" - subgraph cluster_372 { + subgraph cluster_400 { label = "Wait on Transforms:HistoryEntry/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values/Map" - 373 [label="ParMultiDo(Anonymous)"] - 369 -> 373 [style=solid label=""] + 401 [label="ParMultiDo(Anonymous)"] + 397 -> 401 [style=solid label=""] } } } } - subgraph cluster_374 { - label = "Wait on Transforms:HistoryEntry/To wait view 0/Sample.Any/Flatten.Iterables" - subgraph cluster_375 { - label = "Wait on Transforms:HistoryEntry/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables" - subgraph cluster_376 { - label = "Wait on Transforms:HistoryEntry/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables/FlatMap" - 377 [label="ParMultiDo(Anonymous)"] - 373 -> 377 [style=solid label=""] - } - } - } - } - subgraph cluster_378 { - label = "Wait on Transforms:HistoryEntry/To wait view 0/View.AsList" - subgraph cluster_379 { - label = "Wait on Transforms:HistoryEntry/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization" - subgraph cluster_380 { - label = "Wait on Transforms:HistoryEntry/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)" - 381 [label="ParMultiDo(VoidKeyToMultimapMaterialization)"] - 377 -> 381 [style=solid label=""] - } - } - 382 [label="View.CreatePCollectionView"] - 381 -> 382 [style=solid label=""] - } - } - subgraph cluster_383 { - label = "Wait on Transforms:HistoryEntry/Wait" - subgraph cluster_384 { - label = "Wait on Transforms:HistoryEntry/Wait/Map" - 385 [label="ParMultiDo(Anonymous)"] - 89 -> 385 [style=solid label=""] - 381 -> 385 [style=dashed label=""] - } - } - } - subgraph cluster_386 { - label = "Write to sql: Transforms:AllocationToken" - subgraph cluster_387 { - label = "Write to sql: Transforms:AllocationToken/Shard data for Transforms:AllocationToken" - subgraph cluster_388 { - label = "Write to sql: Transforms:AllocationToken/Shard data for Transforms:AllocationToken/Map" - 389 [label="ParMultiDo(Anonymous)"] - 385 -> 389 [style=solid label=""] - } - } - subgraph cluster_390 { - label = "Write to sql: Transforms:AllocationToken/Batch output by shard Transforms:AllocationToken" - subgraph cluster_391 { - label = "Write to sql: Transforms:AllocationToken/Batch output by shard Transforms:AllocationToken/ParDo(GroupIntoBatches)" - 392 [label="ParMultiDo(GroupIntoBatches)"] - 389 -> 392 [style=solid label=""] - } - } - subgraph cluster_393 { - label = "Write to sql: Transforms:AllocationToken/Write in batch for Transforms:AllocationToken" - 394 [label="ParMultiDo(SqlBatchWriter)"] - 392 -> 394 [style=solid label=""] - } - } - subgraph cluster_395 { - label = "Wait on Transforms:AllocationToken" - subgraph cluster_396 { - label = "Wait on Transforms:AllocationToken/To wait view 0" - subgraph cluster_397 { - label = "Wait on Transforms:AllocationToken/To wait view 0/Window.Into()" - 398 [label="Flatten.PCollections"] - 394 -> 398 [style=solid label=""] - } - subgraph cluster_399 { - label = "Wait on Transforms:AllocationToken/To wait view 0/ParDo(CollectWindows)" - 400 [label="ParMultiDo(CollectWindows)"] - 398 -> 400 [style=solid label=""] - } - subgraph cluster_401 { - label = "Wait on Transforms:AllocationToken/To wait view 0/Sample.Any" subgraph cluster_402 { - label = "Wait on Transforms:AllocationToken/To wait view 0/Sample.Any/Combine.globally(SampleAny)" + label = "Wait on Transforms:HistoryEntry/To wait view 0/Sample.Any/Flatten.Iterables" subgraph cluster_403 { - label = "Wait on Transforms:AllocationToken/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys" + label = "Wait on Transforms:HistoryEntry/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables" subgraph cluster_404 { - label = "Wait on Transforms:AllocationToken/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys" - subgraph cluster_405 { - label = "Wait on Transforms:AllocationToken/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys/Map" - 406 [label="ParMultiDo(Anonymous)"] - 400 -> 406 [style=solid label=""] - } - } - } - subgraph cluster_407 { - label = "Wait on Transforms:AllocationToken/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)" - 408 [label="GroupByKey"] - 406 -> 408 [style=solid label=""] - subgraph cluster_409 { - label = "Wait on Transforms:AllocationToken/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues" - subgraph cluster_410 { - label = "Wait on Transforms:AllocationToken/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues/ParDo(Anonymous)" - 411 [label="ParMultiDo(Anonymous)"] - 408 -> 411 [style=solid label=""] - } - } - } - subgraph cluster_412 { - label = "Wait on Transforms:AllocationToken/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values" - subgraph cluster_413 { - label = "Wait on Transforms:AllocationToken/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values" - subgraph cluster_414 { - label = "Wait on Transforms:AllocationToken/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values/Map" - 415 [label="ParMultiDo(Anonymous)"] - 411 -> 415 [style=solid label=""] - } - } - } - } - subgraph cluster_416 { - label = "Wait on Transforms:AllocationToken/To wait view 0/Sample.Any/Flatten.Iterables" - subgraph cluster_417 { - label = "Wait on Transforms:AllocationToken/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables" - subgraph cluster_418 { - label = "Wait on Transforms:AllocationToken/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables/FlatMap" - 419 [label="ParMultiDo(Anonymous)"] - 415 -> 419 [style=solid label=""] + label = "Wait on Transforms:HistoryEntry/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables/FlatMap" + 405 [label="ParMultiDo(Anonymous)"] + 401 -> 405 [style=solid label=""] } } } } - subgraph cluster_420 { - label = "Wait on Transforms:AllocationToken/To wait view 0/View.AsList" - subgraph cluster_421 { - label = "Wait on Transforms:AllocationToken/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization" - subgraph cluster_422 { - label = "Wait on Transforms:AllocationToken/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)" - 423 [label="ParMultiDo(VoidKeyToMultimapMaterialization)"] - 419 -> 423 [style=solid label=""] + subgraph cluster_406 { + label = "Wait on Transforms:HistoryEntry/To wait view 0/View.AsList" + subgraph cluster_407 { + label = "Wait on Transforms:HistoryEntry/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization" + subgraph cluster_408 { + label = "Wait on Transforms:HistoryEntry/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)" + 409 [label="ParMultiDo(VoidKeyToMultimapMaterialization)"] + 405 -> 409 [style=solid label=""] } } - 424 [label="View.CreatePCollectionView"] - 423 -> 424 [style=solid label=""] + 410 [label="View.CreatePCollectionView"] + 409 -> 410 [style=solid label=""] + } + } + subgraph cluster_411 { + label = "Wait on Transforms:HistoryEntry/Wait" + subgraph cluster_412 { + label = "Wait on Transforms:HistoryEntry/Wait/Map" + 413 [label="ParMultiDo(Anonymous)"] + 89 -> 413 [style=solid label=""] + 409 -> 413 [style=dashed label=""] + } + } + } + subgraph cluster_414 { + label = "Write to Sql: Transforms:AllocationToken" + subgraph cluster_415 { + label = "Write to Sql: Transforms:AllocationToken/Shard data Transforms:AllocationToken" + subgraph cluster_416 { + label = "Write to Sql: Transforms:AllocationToken/Shard data Transforms:AllocationToken/AddKeys" + subgraph cluster_417 { + label = "Write to Sql: Transforms:AllocationToken/Shard data Transforms:AllocationToken/AddKeys/Map" + 418 [label="ParMultiDo(Anonymous)"] + 413 -> 418 [style=solid label=""] + } + } + } + subgraph cluster_419 { + label = "Write to Sql: Transforms:AllocationToken/Group into batches Transforms:AllocationToken" + subgraph cluster_420 { + label = "Write to Sql: Transforms:AllocationToken/Group into batches Transforms:AllocationToken/MapElements" + subgraph cluster_421 { + label = "Write to Sql: Transforms:AllocationToken/Group into batches Transforms:AllocationToken/MapElements/Map" + 422 [label="ParMultiDo(Anonymous)"] + 418 -> 422 [style=solid label=""] + } + } + subgraph cluster_423 { + label = "Write to Sql: Transforms:AllocationToken/Group into batches Transforms:AllocationToken/ParDo(GroupIntoBatches)" + 424 [label="ParMultiDo(GroupIntoBatches)"] + 422 -> 424 [style=solid label=""] } } subgraph cluster_425 { - label = "Wait on Transforms:AllocationToken/Wait" - subgraph cluster_426 { - label = "Wait on Transforms:AllocationToken/Wait/Map" - 427 [label="ParMultiDo(Anonymous)"] - 89 -> 427 [style=solid label=""] - 423 -> 427 [style=dashed label=""] - } + label = "Write to Sql: Transforms:AllocationToken/Write in batch for Transforms:AllocationToken" + 426 [label="ParMultiDo(SqlBatchWriter)"] + 424 -> 426 [style=solid label=""] } } - subgraph cluster_428 { - label = "Write to sql: Transforms:Recurring" - subgraph cluster_429 { - label = "Write to sql: Transforms:Recurring/Shard data for Transforms:Recurring" - subgraph cluster_430 { - label = "Write to sql: Transforms:Recurring/Shard data for Transforms:Recurring/Map" - 431 [label="ParMultiDo(Anonymous)"] - 427 -> 431 [style=solid label=""] + subgraph cluster_427 { + label = "Wait on Transforms:AllocationToken" + subgraph cluster_428 { + label = "Wait on Transforms:AllocationToken/To wait view 0" + subgraph cluster_429 { + label = "Wait on Transforms:AllocationToken/To wait view 0/Window.Into()" + 430 [label="Flatten.PCollections"] + 426 -> 430 [style=solid label=""] + } + subgraph cluster_431 { + label = "Wait on Transforms:AllocationToken/To wait view 0/ParDo(CollectWindows)" + 432 [label="ParMultiDo(CollectWindows)"] + 430 -> 432 [style=solid label=""] } - } - subgraph cluster_432 { - label = "Write to sql: Transforms:Recurring/Batch output by shard Transforms:Recurring" subgraph cluster_433 { - label = "Write to sql: Transforms:Recurring/Batch output by shard Transforms:Recurring/ParDo(GroupIntoBatches)" - 434 [label="ParMultiDo(GroupIntoBatches)"] - 431 -> 434 [style=solid label=""] - } - } - subgraph cluster_435 { - label = "Write to sql: Transforms:Recurring/Write in batch for Transforms:Recurring" - 436 [label="ParMultiDo(SqlBatchWriter)"] - 434 -> 436 [style=solid label=""] - } - } - subgraph cluster_437 { - label = "Wait on Transforms:Recurring" - subgraph cluster_438 { - label = "Wait on Transforms:Recurring/To wait view 0" - subgraph cluster_439 { - label = "Wait on Transforms:Recurring/To wait view 0/Window.Into()" - 440 [label="Flatten.PCollections"] - 436 -> 440 [style=solid label=""] - } - subgraph cluster_441 { - label = "Wait on Transforms:Recurring/To wait view 0/ParDo(CollectWindows)" - 442 [label="ParMultiDo(CollectWindows)"] - 440 -> 442 [style=solid label=""] - } - subgraph cluster_443 { - label = "Wait on Transforms:Recurring/To wait view 0/Sample.Any" - subgraph cluster_444 { - label = "Wait on Transforms:Recurring/To wait view 0/Sample.Any/Combine.globally(SampleAny)" - subgraph cluster_445 { - label = "Wait on Transforms:Recurring/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys" - subgraph cluster_446 { - label = "Wait on Transforms:Recurring/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys" - subgraph cluster_447 { - label = "Wait on Transforms:Recurring/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys/Map" - 448 [label="ParMultiDo(Anonymous)"] - 442 -> 448 [style=solid label=""] + label = "Wait on Transforms:AllocationToken/To wait view 0/Sample.Any" + subgraph cluster_434 { + label = "Wait on Transforms:AllocationToken/To wait view 0/Sample.Any/Combine.globally(SampleAny)" + subgraph cluster_435 { + label = "Wait on Transforms:AllocationToken/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys" + subgraph cluster_436 { + label = "Wait on Transforms:AllocationToken/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys" + subgraph cluster_437 { + label = "Wait on Transforms:AllocationToken/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys/Map" + 438 [label="ParMultiDo(Anonymous)"] + 432 -> 438 [style=solid label=""] } } } + subgraph cluster_439 { + label = "Wait on Transforms:AllocationToken/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)" + 440 [label="GroupByKey"] + 438 -> 440 [style=solid label=""] + subgraph cluster_441 { + label = "Wait on Transforms:AllocationToken/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues" + subgraph cluster_442 { + label = "Wait on Transforms:AllocationToken/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues/ParDo(Anonymous)" + 443 [label="ParMultiDo(Anonymous)"] + 440 -> 443 [style=solid label=""] + } + } + } + subgraph cluster_444 { + label = "Wait on Transforms:AllocationToken/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values" + subgraph cluster_445 { + label = "Wait on Transforms:AllocationToken/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values" + subgraph cluster_446 { + label = "Wait on Transforms:AllocationToken/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values/Map" + 447 [label="ParMultiDo(Anonymous)"] + 443 -> 447 [style=solid label=""] + } + } + } + } + subgraph cluster_448 { + label = "Wait on Transforms:AllocationToken/To wait view 0/Sample.Any/Flatten.Iterables" subgraph cluster_449 { - label = "Wait on Transforms:Recurring/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)" - 450 [label="GroupByKey"] - 448 -> 450 [style=solid label=""] - subgraph cluster_451 { - label = "Wait on Transforms:Recurring/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues" - subgraph cluster_452 { - label = "Wait on Transforms:Recurring/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues/ParDo(Anonymous)" - 453 [label="ParMultiDo(Anonymous)"] - 450 -> 453 [style=solid label=""] - } + label = "Wait on Transforms:AllocationToken/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables" + subgraph cluster_450 { + label = "Wait on Transforms:AllocationToken/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables/FlatMap" + 451 [label="ParMultiDo(Anonymous)"] + 447 -> 451 [style=solid label=""] } } + } + } + subgraph cluster_452 { + label = "Wait on Transforms:AllocationToken/To wait view 0/View.AsList" + subgraph cluster_453 { + label = "Wait on Transforms:AllocationToken/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization" subgraph cluster_454 { - label = "Wait on Transforms:Recurring/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values" - subgraph cluster_455 { - label = "Wait on Transforms:Recurring/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values" - subgraph cluster_456 { - label = "Wait on Transforms:Recurring/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values/Map" - 457 [label="ParMultiDo(Anonymous)"] - 453 -> 457 [style=solid label=""] - } - } - } - } - subgraph cluster_458 { - label = "Wait on Transforms:Recurring/To wait view 0/Sample.Any/Flatten.Iterables" - subgraph cluster_459 { - label = "Wait on Transforms:Recurring/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables" - subgraph cluster_460 { - label = "Wait on Transforms:Recurring/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables/FlatMap" - 461 [label="ParMultiDo(Anonymous)"] - 457 -> 461 [style=solid label=""] - } + label = "Wait on Transforms:AllocationToken/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)" + 455 [label="ParMultiDo(VoidKeyToMultimapMaterialization)"] + 451 -> 455 [style=solid label=""] } } + 456 [label="View.CreatePCollectionView"] + 455 -> 456 [style=solid label=""] } + } + subgraph cluster_457 { + label = "Wait on Transforms:AllocationToken/Wait" + subgraph cluster_458 { + label = "Wait on Transforms:AllocationToken/Wait/Map" + 459 [label="ParMultiDo(Anonymous)"] + 89 -> 459 [style=solid label=""] + 455 -> 459 [style=dashed label=""] + } + } + } + subgraph cluster_460 { + label = "Write to Sql: Transforms:Recurring" + subgraph cluster_461 { + label = "Write to Sql: Transforms:Recurring/Shard data Transforms:Recurring" subgraph cluster_462 { - label = "Wait on Transforms:Recurring/To wait view 0/View.AsList" + label = "Write to Sql: Transforms:Recurring/Shard data Transforms:Recurring/AddKeys" subgraph cluster_463 { - label = "Wait on Transforms:Recurring/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization" - subgraph cluster_464 { - label = "Wait on Transforms:Recurring/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)" - 465 [label="ParMultiDo(VoidKeyToMultimapMaterialization)"] - 461 -> 465 [style=solid label=""] - } + label = "Write to Sql: Transforms:Recurring/Shard data Transforms:Recurring/AddKeys/Map" + 464 [label="ParMultiDo(Anonymous)"] + 459 -> 464 [style=solid label=""] } - 466 [label="View.CreatePCollectionView"] - 465 -> 466 [style=solid label=""] } } - subgraph cluster_467 { - label = "Wait on Transforms:Recurring/Wait" - subgraph cluster_468 { - label = "Wait on Transforms:Recurring/Wait/Map" - 469 [label="ParMultiDo(Anonymous)"] - 89 -> 469 [style=solid label=""] - 465 -> 469 [style=dashed label=""] + subgraph cluster_465 { + label = "Write to Sql: Transforms:Recurring/Group into batches Transforms:Recurring" + subgraph cluster_466 { + label = "Write to Sql: Transforms:Recurring/Group into batches Transforms:Recurring/MapElements" + subgraph cluster_467 { + label = "Write to Sql: Transforms:Recurring/Group into batches Transforms:Recurring/MapElements/Map" + 468 [label="ParMultiDo(Anonymous)"] + 464 -> 468 [style=solid label=""] + } + } + subgraph cluster_469 { + label = "Write to Sql: Transforms:Recurring/Group into batches Transforms:Recurring/ParDo(GroupIntoBatches)" + 470 [label="ParMultiDo(GroupIntoBatches)"] + 468 -> 470 [style=solid label=""] } } - } - subgraph cluster_470 { - label = "Write to sql: Transforms:OneTime" subgraph cluster_471 { - label = "Write to sql: Transforms:OneTime/Shard data for Transforms:OneTime" - subgraph cluster_472 { - label = "Write to sql: Transforms:OneTime/Shard data for Transforms:OneTime/Map" - 473 [label="ParMultiDo(Anonymous)"] - 469 -> 473 [style=solid label=""] - } + label = "Write to Sql: Transforms:Recurring/Write in batch for Transforms:Recurring" + 472 [label="ParMultiDo(SqlBatchWriter)"] + 470 -> 472 [style=solid label=""] } + } + subgraph cluster_473 { + label = "Wait on Transforms:Recurring" subgraph cluster_474 { - label = "Write to sql: Transforms:OneTime/Batch output by shard Transforms:OneTime" + label = "Wait on Transforms:Recurring/To wait view 0" subgraph cluster_475 { - label = "Write to sql: Transforms:OneTime/Batch output by shard Transforms:OneTime/ParDo(GroupIntoBatches)" - 476 [label="ParMultiDo(GroupIntoBatches)"] - 473 -> 476 [style=solid label=""] + label = "Wait on Transforms:Recurring/To wait view 0/Window.Into()" + 476 [label="Flatten.PCollections"] + 472 -> 476 [style=solid label=""] } - } - subgraph cluster_477 { - label = "Write to sql: Transforms:OneTime/Write in batch for Transforms:OneTime" - 478 [label="ParMultiDo(SqlBatchWriter)"] - 476 -> 478 [style=solid label=""] - } - } - subgraph cluster_479 { - label = "Wait on Transforms:OneTime" - subgraph cluster_480 { - label = "Wait on Transforms:OneTime/To wait view 0" - subgraph cluster_481 { - label = "Wait on Transforms:OneTime/To wait view 0/Window.Into()" - 482 [label="Flatten.PCollections"] - 478 -> 482 [style=solid label=""] + subgraph cluster_477 { + label = "Wait on Transforms:Recurring/To wait view 0/ParDo(CollectWindows)" + 478 [label="ParMultiDo(CollectWindows)"] + 476 -> 478 [style=solid label=""] } - subgraph cluster_483 { - label = "Wait on Transforms:OneTime/To wait view 0/ParDo(CollectWindows)" - 484 [label="ParMultiDo(CollectWindows)"] - 482 -> 484 [style=solid label=""] - } - subgraph cluster_485 { - label = "Wait on Transforms:OneTime/To wait view 0/Sample.Any" - subgraph cluster_486 { - label = "Wait on Transforms:OneTime/To wait view 0/Sample.Any/Combine.globally(SampleAny)" - subgraph cluster_487 { - label = "Wait on Transforms:OneTime/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys" - subgraph cluster_488 { - label = "Wait on Transforms:OneTime/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys" - subgraph cluster_489 { - label = "Wait on Transforms:OneTime/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys/Map" - 490 [label="ParMultiDo(Anonymous)"] - 484 -> 490 [style=solid label=""] + subgraph cluster_479 { + label = "Wait on Transforms:Recurring/To wait view 0/Sample.Any" + subgraph cluster_480 { + label = "Wait on Transforms:Recurring/To wait view 0/Sample.Any/Combine.globally(SampleAny)" + subgraph cluster_481 { + label = "Wait on Transforms:Recurring/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys" + subgraph cluster_482 { + label = "Wait on Transforms:Recurring/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys" + subgraph cluster_483 { + label = "Wait on Transforms:Recurring/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys/Map" + 484 [label="ParMultiDo(Anonymous)"] + 478 -> 484 [style=solid label=""] } } } - subgraph cluster_491 { - label = "Wait on Transforms:OneTime/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)" - 492 [label="GroupByKey"] - 490 -> 492 [style=solid label=""] - subgraph cluster_493 { - label = "Wait on Transforms:OneTime/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues" - subgraph cluster_494 { - label = "Wait on Transforms:OneTime/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues/ParDo(Anonymous)" - 495 [label="ParMultiDo(Anonymous)"] - 492 -> 495 [style=solid label=""] + subgraph cluster_485 { + label = "Wait on Transforms:Recurring/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)" + 486 [label="GroupByKey"] + 484 -> 486 [style=solid label=""] + subgraph cluster_487 { + label = "Wait on Transforms:Recurring/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues" + subgraph cluster_488 { + label = "Wait on Transforms:Recurring/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues/ParDo(Anonymous)" + 489 [label="ParMultiDo(Anonymous)"] + 486 -> 489 [style=solid label=""] } } } - subgraph cluster_496 { - label = "Wait on Transforms:OneTime/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values" - subgraph cluster_497 { - label = "Wait on Transforms:OneTime/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values" - subgraph cluster_498 { - label = "Wait on Transforms:OneTime/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values/Map" - 499 [label="ParMultiDo(Anonymous)"] - 495 -> 499 [style=solid label=""] + subgraph cluster_490 { + label = "Wait on Transforms:Recurring/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values" + subgraph cluster_491 { + label = "Wait on Transforms:Recurring/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values" + subgraph cluster_492 { + label = "Wait on Transforms:Recurring/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values/Map" + 493 [label="ParMultiDo(Anonymous)"] + 489 -> 493 [style=solid label=""] } } } } - subgraph cluster_500 { - label = "Wait on Transforms:OneTime/To wait view 0/Sample.Any/Flatten.Iterables" - subgraph cluster_501 { - label = "Wait on Transforms:OneTime/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables" - subgraph cluster_502 { - label = "Wait on Transforms:OneTime/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables/FlatMap" - 503 [label="ParMultiDo(Anonymous)"] - 499 -> 503 [style=solid label=""] + subgraph cluster_494 { + label = "Wait on Transforms:Recurring/To wait view 0/Sample.Any/Flatten.Iterables" + subgraph cluster_495 { + label = "Wait on Transforms:Recurring/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables" + subgraph cluster_496 { + label = "Wait on Transforms:Recurring/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables/FlatMap" + 497 [label="ParMultiDo(Anonymous)"] + 493 -> 497 [style=solid label=""] } } } } + subgraph cluster_498 { + label = "Wait on Transforms:Recurring/To wait view 0/View.AsList" + subgraph cluster_499 { + label = "Wait on Transforms:Recurring/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization" + subgraph cluster_500 { + label = "Wait on Transforms:Recurring/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)" + 501 [label="ParMultiDo(VoidKeyToMultimapMaterialization)"] + 497 -> 501 [style=solid label=""] + } + } + 502 [label="View.CreatePCollectionView"] + 501 -> 502 [style=solid label=""] + } + } + subgraph cluster_503 { + label = "Wait on Transforms:Recurring/Wait" subgraph cluster_504 { - label = "Wait on Transforms:OneTime/To wait view 0/View.AsList" - subgraph cluster_505 { - label = "Wait on Transforms:OneTime/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization" - subgraph cluster_506 { - label = "Wait on Transforms:OneTime/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)" - 507 [label="ParMultiDo(VoidKeyToMultimapMaterialization)"] - 503 -> 507 [style=solid label=""] - } + label = "Wait on Transforms:Recurring/Wait/Map" + 505 [label="ParMultiDo(Anonymous)"] + 89 -> 505 [style=solid label=""] + 501 -> 505 [style=dashed label=""] + } + } + } + subgraph cluster_506 { + label = "Write to Sql: Transforms:OneTime" + subgraph cluster_507 { + label = "Write to Sql: Transforms:OneTime/Shard data Transforms:OneTime" + subgraph cluster_508 { + label = "Write to Sql: Transforms:OneTime/Shard data Transforms:OneTime/AddKeys" + subgraph cluster_509 { + label = "Write to Sql: Transforms:OneTime/Shard data Transforms:OneTime/AddKeys/Map" + 510 [label="ParMultiDo(Anonymous)"] + 505 -> 510 [style=solid label=""] } - 508 [label="View.CreatePCollectionView"] - 507 -> 508 [style=solid label=""] } } - subgraph cluster_509 { - label = "Wait on Transforms:OneTime/Wait" - subgraph cluster_510 { - label = "Wait on Transforms:OneTime/Wait/Map" - 511 [label="ParMultiDo(Anonymous)"] - 89 -> 511 [style=solid label=""] - 507 -> 511 [style=dashed label=""] + subgraph cluster_511 { + label = "Write to Sql: Transforms:OneTime/Group into batches Transforms:OneTime" + subgraph cluster_512 { + label = "Write to Sql: Transforms:OneTime/Group into batches Transforms:OneTime/MapElements" + subgraph cluster_513 { + label = "Write to Sql: Transforms:OneTime/Group into batches Transforms:OneTime/MapElements/Map" + 514 [label="ParMultiDo(Anonymous)"] + 510 -> 514 [style=solid label=""] + } } + subgraph cluster_515 { + label = "Write to Sql: Transforms:OneTime/Group into batches Transforms:OneTime/ParDo(GroupIntoBatches)" + 516 [label="ParMultiDo(GroupIntoBatches)"] + 514 -> 516 [style=solid label=""] + } + } + subgraph cluster_517 { + label = "Write to Sql: Transforms:OneTime/Write in batch for Transforms:OneTime" + 518 [label="ParMultiDo(SqlBatchWriter)"] + 516 -> 518 [style=solid label=""] } } - subgraph cluster_512 { - label = "Write to sql: Transforms:Cancellation" - subgraph cluster_513 { - label = "Write to sql: Transforms:Cancellation/Shard data for Transforms:Cancellation" - subgraph cluster_514 { - label = "Write to sql: Transforms:Cancellation/Shard data for Transforms:Cancellation/Map" - 515 [label="ParMultiDo(Anonymous)"] - 511 -> 515 [style=solid label=""] + subgraph cluster_519 { + label = "Wait on Transforms:OneTime" + subgraph cluster_520 { + label = "Wait on Transforms:OneTime/To wait view 0" + subgraph cluster_521 { + label = "Wait on Transforms:OneTime/To wait view 0/Window.Into()" + 522 [label="Flatten.PCollections"] + 518 -> 522 [style=solid label=""] } - } - subgraph cluster_516 { - label = "Write to sql: Transforms:Cancellation/Batch output by shard Transforms:Cancellation" - subgraph cluster_517 { - label = "Write to sql: Transforms:Cancellation/Batch output by shard Transforms:Cancellation/ParDo(GroupIntoBatches)" - 518 [label="ParMultiDo(GroupIntoBatches)"] - 515 -> 518 [style=solid label=""] - } - } - subgraph cluster_519 { - label = "Write to sql: Transforms:Cancellation/Write in batch for Transforms:Cancellation" - 520 [label="ParMultiDo(SqlBatchWriter)"] - 518 -> 520 [style=solid label=""] - } - } - subgraph cluster_521 { - label = "Wait on Transforms:Cancellation" - subgraph cluster_522 { - label = "Wait on Transforms:Cancellation/To wait view 0" subgraph cluster_523 { - label = "Wait on Transforms:Cancellation/To wait view 0/Window.Into()" - 524 [label="Flatten.PCollections"] - 520 -> 524 [style=solid label=""] + label = "Wait on Transforms:OneTime/To wait view 0/ParDo(CollectWindows)" + 524 [label="ParMultiDo(CollectWindows)"] + 522 -> 524 [style=solid label=""] } subgraph cluster_525 { - label = "Wait on Transforms:Cancellation/To wait view 0/ParDo(CollectWindows)" - 526 [label="ParMultiDo(CollectWindows)"] - 524 -> 526 [style=solid label=""] - } - subgraph cluster_527 { - label = "Wait on Transforms:Cancellation/To wait view 0/Sample.Any" - subgraph cluster_528 { - label = "Wait on Transforms:Cancellation/To wait view 0/Sample.Any/Combine.globally(SampleAny)" - subgraph cluster_529 { - label = "Wait on Transforms:Cancellation/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys" - subgraph cluster_530 { - label = "Wait on Transforms:Cancellation/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys" - subgraph cluster_531 { - label = "Wait on Transforms:Cancellation/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys/Map" - 532 [label="ParMultiDo(Anonymous)"] - 526 -> 532 [style=solid label=""] + label = "Wait on Transforms:OneTime/To wait view 0/Sample.Any" + subgraph cluster_526 { + label = "Wait on Transforms:OneTime/To wait view 0/Sample.Any/Combine.globally(SampleAny)" + subgraph cluster_527 { + label = "Wait on Transforms:OneTime/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys" + subgraph cluster_528 { + label = "Wait on Transforms:OneTime/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys" + subgraph cluster_529 { + label = "Wait on Transforms:OneTime/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys/Map" + 530 [label="ParMultiDo(Anonymous)"] + 524 -> 530 [style=solid label=""] } } } - subgraph cluster_533 { - label = "Wait on Transforms:Cancellation/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)" - 534 [label="GroupByKey"] - 532 -> 534 [style=solid label=""] - subgraph cluster_535 { - label = "Wait on Transforms:Cancellation/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues" - subgraph cluster_536 { - label = "Wait on Transforms:Cancellation/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues/ParDo(Anonymous)" - 537 [label="ParMultiDo(Anonymous)"] - 534 -> 537 [style=solid label=""] + subgraph cluster_531 { + label = "Wait on Transforms:OneTime/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)" + 532 [label="GroupByKey"] + 530 -> 532 [style=solid label=""] + subgraph cluster_533 { + label = "Wait on Transforms:OneTime/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues" + subgraph cluster_534 { + label = "Wait on Transforms:OneTime/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues/ParDo(Anonymous)" + 535 [label="ParMultiDo(Anonymous)"] + 532 -> 535 [style=solid label=""] } } } - subgraph cluster_538 { - label = "Wait on Transforms:Cancellation/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values" - subgraph cluster_539 { - label = "Wait on Transforms:Cancellation/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values" - subgraph cluster_540 { - label = "Wait on Transforms:Cancellation/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values/Map" - 541 [label="ParMultiDo(Anonymous)"] - 537 -> 541 [style=solid label=""] + subgraph cluster_536 { + label = "Wait on Transforms:OneTime/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values" + subgraph cluster_537 { + label = "Wait on Transforms:OneTime/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values" + subgraph cluster_538 { + label = "Wait on Transforms:OneTime/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values/Map" + 539 [label="ParMultiDo(Anonymous)"] + 535 -> 539 [style=solid label=""] } } } } - subgraph cluster_542 { - label = "Wait on Transforms:Cancellation/To wait view 0/Sample.Any/Flatten.Iterables" - subgraph cluster_543 { - label = "Wait on Transforms:Cancellation/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables" - subgraph cluster_544 { - label = "Wait on Transforms:Cancellation/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables/FlatMap" - 545 [label="ParMultiDo(Anonymous)"] - 541 -> 545 [style=solid label=""] + subgraph cluster_540 { + label = "Wait on Transforms:OneTime/To wait view 0/Sample.Any/Flatten.Iterables" + subgraph cluster_541 { + label = "Wait on Transforms:OneTime/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables" + subgraph cluster_542 { + label = "Wait on Transforms:OneTime/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables/FlatMap" + 543 [label="ParMultiDo(Anonymous)"] + 539 -> 543 [style=solid label=""] } } } } - subgraph cluster_546 { - label = "Wait on Transforms:Cancellation/To wait view 0/View.AsList" - subgraph cluster_547 { - label = "Wait on Transforms:Cancellation/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization" - subgraph cluster_548 { - label = "Wait on Transforms:Cancellation/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)" - 549 [label="ParMultiDo(VoidKeyToMultimapMaterialization)"] - 545 -> 549 [style=solid label=""] + subgraph cluster_544 { + label = "Wait on Transforms:OneTime/To wait view 0/View.AsList" + subgraph cluster_545 { + label = "Wait on Transforms:OneTime/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization" + subgraph cluster_546 { + label = "Wait on Transforms:OneTime/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)" + 547 [label="ParMultiDo(VoidKeyToMultimapMaterialization)"] + 543 -> 547 [style=solid label=""] } } - 550 [label="View.CreatePCollectionView"] - 549 -> 550 [style=solid label=""] + 548 [label="View.CreatePCollectionView"] + 547 -> 548 [style=solid label=""] } } - subgraph cluster_551 { - label = "Wait on Transforms:Cancellation/Wait" - subgraph cluster_552 { - label = "Wait on Transforms:Cancellation/Wait/Map" - 553 [label="ParMultiDo(Anonymous)"] - 89 -> 553 [style=solid label=""] - 549 -> 553 [style=dashed label=""] + subgraph cluster_549 { + label = "Wait on Transforms:OneTime/Wait" + subgraph cluster_550 { + label = "Wait on Transforms:OneTime/Wait/Map" + 551 [label="ParMultiDo(Anonymous)"] + 89 -> 551 [style=solid label=""] + 547 -> 551 [style=dashed label=""] } } } - subgraph cluster_554 { - label = "Write to sql: Transforms:PollMessage" - subgraph cluster_555 { - label = "Write to sql: Transforms:PollMessage/Shard data for Transforms:PollMessage" - subgraph cluster_556 { - label = "Write to sql: Transforms:PollMessage/Shard data for Transforms:PollMessage/Map" - 557 [label="ParMultiDo(Anonymous)"] - 553 -> 557 [style=solid label=""] + subgraph cluster_552 { + label = "Write to Sql: Transforms:Cancellation" + subgraph cluster_553 { + label = "Write to Sql: Transforms:Cancellation/Shard data Transforms:Cancellation" + subgraph cluster_554 { + label = "Write to Sql: Transforms:Cancellation/Shard data Transforms:Cancellation/AddKeys" + subgraph cluster_555 { + label = "Write to Sql: Transforms:Cancellation/Shard data Transforms:Cancellation/AddKeys/Map" + 556 [label="ParMultiDo(Anonymous)"] + 551 -> 556 [style=solid label=""] + } } } - subgraph cluster_558 { - label = "Write to sql: Transforms:PollMessage/Batch output by shard Transforms:PollMessage" - subgraph cluster_559 { - label = "Write to sql: Transforms:PollMessage/Batch output by shard Transforms:PollMessage/ParDo(GroupIntoBatches)" - 560 [label="ParMultiDo(GroupIntoBatches)"] - 557 -> 560 [style=solid label=""] + subgraph cluster_557 { + label = "Write to Sql: Transforms:Cancellation/Group into batches Transforms:Cancellation" + subgraph cluster_558 { + label = "Write to Sql: Transforms:Cancellation/Group into batches Transforms:Cancellation/MapElements" + subgraph cluster_559 { + label = "Write to Sql: Transforms:Cancellation/Group into batches Transforms:Cancellation/MapElements/Map" + 560 [label="ParMultiDo(Anonymous)"] + 556 -> 560 [style=solid label=""] + } + } + subgraph cluster_561 { + label = "Write to Sql: Transforms:Cancellation/Group into batches Transforms:Cancellation/ParDo(GroupIntoBatches)" + 562 [label="ParMultiDo(GroupIntoBatches)"] + 560 -> 562 [style=solid label=""] } } - subgraph cluster_561 { - label = "Write to sql: Transforms:PollMessage/Write in batch for Transforms:PollMessage" - 562 [label="ParMultiDo(SqlBatchWriter)"] - 560 -> 562 [style=solid label=""] + subgraph cluster_563 { + label = "Write to Sql: Transforms:Cancellation/Write in batch for Transforms:Cancellation" + 564 [label="ParMultiDo(SqlBatchWriter)"] + 562 -> 564 [style=solid label=""] } } - subgraph cluster_563 { - label = "Wait on Transforms:PollMessage" - subgraph cluster_564 { - label = "Wait on Transforms:PollMessage/To wait view 0" - subgraph cluster_565 { - label = "Wait on Transforms:PollMessage/To wait view 0/Window.Into()" - 566 [label="Flatten.PCollections"] - 562 -> 566 [style=solid label=""] - } + subgraph cluster_565 { + label = "Wait on Transforms:Cancellation" + subgraph cluster_566 { + label = "Wait on Transforms:Cancellation/To wait view 0" subgraph cluster_567 { - label = "Wait on Transforms:PollMessage/To wait view 0/ParDo(CollectWindows)" - 568 [label="ParMultiDo(CollectWindows)"] - 566 -> 568 [style=solid label=""] + label = "Wait on Transforms:Cancellation/To wait view 0/Window.Into()" + 568 [label="Flatten.PCollections"] + 564 -> 568 [style=solid label=""] } subgraph cluster_569 { - label = "Wait on Transforms:PollMessage/To wait view 0/Sample.Any" - subgraph cluster_570 { - label = "Wait on Transforms:PollMessage/To wait view 0/Sample.Any/Combine.globally(SampleAny)" - subgraph cluster_571 { - label = "Wait on Transforms:PollMessage/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys" - subgraph cluster_572 { - label = "Wait on Transforms:PollMessage/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys" - subgraph cluster_573 { - label = "Wait on Transforms:PollMessage/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys/Map" - 574 [label="ParMultiDo(Anonymous)"] - 568 -> 574 [style=solid label=""] + label = "Wait on Transforms:Cancellation/To wait view 0/ParDo(CollectWindows)" + 570 [label="ParMultiDo(CollectWindows)"] + 568 -> 570 [style=solid label=""] + } + subgraph cluster_571 { + label = "Wait on Transforms:Cancellation/To wait view 0/Sample.Any" + subgraph cluster_572 { + label = "Wait on Transforms:Cancellation/To wait view 0/Sample.Any/Combine.globally(SampleAny)" + subgraph cluster_573 { + label = "Wait on Transforms:Cancellation/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys" + subgraph cluster_574 { + label = "Wait on Transforms:Cancellation/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys" + subgraph cluster_575 { + label = "Wait on Transforms:Cancellation/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys/Map" + 576 [label="ParMultiDo(Anonymous)"] + 570 -> 576 [style=solid label=""] } } } - subgraph cluster_575 { - label = "Wait on Transforms:PollMessage/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)" - 576 [label="GroupByKey"] - 574 -> 576 [style=solid label=""] - subgraph cluster_577 { - label = "Wait on Transforms:PollMessage/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues" - subgraph cluster_578 { - label = "Wait on Transforms:PollMessage/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues/ParDo(Anonymous)" - 579 [label="ParMultiDo(Anonymous)"] - 576 -> 579 [style=solid label=""] + subgraph cluster_577 { + label = "Wait on Transforms:Cancellation/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)" + 578 [label="GroupByKey"] + 576 -> 578 [style=solid label=""] + subgraph cluster_579 { + label = "Wait on Transforms:Cancellation/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues" + subgraph cluster_580 { + label = "Wait on Transforms:Cancellation/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues/ParDo(Anonymous)" + 581 [label="ParMultiDo(Anonymous)"] + 578 -> 581 [style=solid label=""] } } } - subgraph cluster_580 { - label = "Wait on Transforms:PollMessage/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values" - subgraph cluster_581 { - label = "Wait on Transforms:PollMessage/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values" - subgraph cluster_582 { - label = "Wait on Transforms:PollMessage/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values/Map" - 583 [label="ParMultiDo(Anonymous)"] - 579 -> 583 [style=solid label=""] + subgraph cluster_582 { + label = "Wait on Transforms:Cancellation/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values" + subgraph cluster_583 { + label = "Wait on Transforms:Cancellation/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values" + subgraph cluster_584 { + label = "Wait on Transforms:Cancellation/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values/Map" + 585 [label="ParMultiDo(Anonymous)"] + 581 -> 585 [style=solid label=""] } } } } - subgraph cluster_584 { - label = "Wait on Transforms:PollMessage/To wait view 0/Sample.Any/Flatten.Iterables" - subgraph cluster_585 { - label = "Wait on Transforms:PollMessage/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables" - subgraph cluster_586 { - label = "Wait on Transforms:PollMessage/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables/FlatMap" - 587 [label="ParMultiDo(Anonymous)"] - 583 -> 587 [style=solid label=""] + subgraph cluster_586 { + label = "Wait on Transforms:Cancellation/To wait view 0/Sample.Any/Flatten.Iterables" + subgraph cluster_587 { + label = "Wait on Transforms:Cancellation/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables" + subgraph cluster_588 { + label = "Wait on Transforms:Cancellation/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables/FlatMap" + 589 [label="ParMultiDo(Anonymous)"] + 585 -> 589 [style=solid label=""] } } } } - subgraph cluster_588 { - label = "Wait on Transforms:PollMessage/To wait view 0/View.AsList" - subgraph cluster_589 { - label = "Wait on Transforms:PollMessage/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization" - subgraph cluster_590 { - label = "Wait on Transforms:PollMessage/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)" - 591 [label="ParMultiDo(VoidKeyToMultimapMaterialization)"] - 587 -> 591 [style=solid label=""] + subgraph cluster_590 { + label = "Wait on Transforms:Cancellation/To wait view 0/View.AsList" + subgraph cluster_591 { + label = "Wait on Transforms:Cancellation/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization" + subgraph cluster_592 { + label = "Wait on Transforms:Cancellation/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)" + 593 [label="ParMultiDo(VoidKeyToMultimapMaterialization)"] + 589 -> 593 [style=solid label=""] } } - 592 [label="View.CreatePCollectionView"] - 591 -> 592 [style=solid label=""] + 594 [label="View.CreatePCollectionView"] + 593 -> 594 [style=solid label=""] } } - subgraph cluster_593 { - label = "Wait on Transforms:PollMessage/Wait" - subgraph cluster_594 { - label = "Wait on Transforms:PollMessage/Wait/Map" - 595 [label="ParMultiDo(Anonymous)"] - 89 -> 595 [style=solid label=""] - 591 -> 595 [style=dashed label=""] + subgraph cluster_595 { + label = "Wait on Transforms:Cancellation/Wait" + subgraph cluster_596 { + label = "Wait on Transforms:Cancellation/Wait/Map" + 597 [label="ParMultiDo(Anonymous)"] + 89 -> 597 [style=solid label=""] + 593 -> 597 [style=dashed label=""] } } } - subgraph cluster_596 { - label = "Write to sql: Transforms:DomainBase" - subgraph cluster_597 { - label = "Write to sql: Transforms:DomainBase/Shard data for Transforms:DomainBase" - subgraph cluster_598 { - label = "Write to sql: Transforms:DomainBase/Shard data for Transforms:DomainBase/Map" - 599 [label="ParMultiDo(Anonymous)"] - 595 -> 599 [style=solid label=""] - } - } - subgraph cluster_600 { - label = "Write to sql: Transforms:DomainBase/Batch output by shard Transforms:DomainBase" - subgraph cluster_601 { - label = "Write to sql: Transforms:DomainBase/Batch output by shard Transforms:DomainBase/ParDo(GroupIntoBatches)" - 602 [label="ParMultiDo(GroupIntoBatches)"] - 599 -> 602 [style=solid label=""] + subgraph cluster_598 { + label = "Write to Sql: Transforms:PollMessage" + subgraph cluster_599 { + label = "Write to Sql: Transforms:PollMessage/Shard data Transforms:PollMessage" + subgraph cluster_600 { + label = "Write to Sql: Transforms:PollMessage/Shard data Transforms:PollMessage/AddKeys" + subgraph cluster_601 { + label = "Write to Sql: Transforms:PollMessage/Shard data Transforms:PollMessage/AddKeys/Map" + 602 [label="ParMultiDo(Anonymous)"] + 597 -> 602 [style=solid label=""] + } } } subgraph cluster_603 { - label = "Write to sql: Transforms:DomainBase/Write in batch for Transforms:DomainBase" - 604 [label="ParMultiDo(SqlBatchWriter)"] - 602 -> 604 [style=solid label=""] + label = "Write to Sql: Transforms:PollMessage/Group into batches Transforms:PollMessage" + subgraph cluster_604 { + label = "Write to Sql: Transforms:PollMessage/Group into batches Transforms:PollMessage/MapElements" + subgraph cluster_605 { + label = "Write to Sql: Transforms:PollMessage/Group into batches Transforms:PollMessage/MapElements/Map" + 606 [label="ParMultiDo(Anonymous)"] + 602 -> 606 [style=solid label=""] + } + } + subgraph cluster_607 { + label = "Write to Sql: Transforms:PollMessage/Group into batches Transforms:PollMessage/ParDo(GroupIntoBatches)" + 608 [label="ParMultiDo(GroupIntoBatches)"] + 606 -> 608 [style=solid label=""] + } + } + subgraph cluster_609 { + label = "Write to Sql: Transforms:PollMessage/Write in batch for Transforms:PollMessage" + 610 [label="ParMultiDo(SqlBatchWriter)"] + 608 -> 610 [style=solid label=""] + } + } + subgraph cluster_611 { + label = "Wait on Transforms:PollMessage" + subgraph cluster_612 { + label = "Wait on Transforms:PollMessage/To wait view 0" + subgraph cluster_613 { + label = "Wait on Transforms:PollMessage/To wait view 0/Window.Into()" + 614 [label="Flatten.PCollections"] + 610 -> 614 [style=solid label=""] + } + subgraph cluster_615 { + label = "Wait on Transforms:PollMessage/To wait view 0/ParDo(CollectWindows)" + 616 [label="ParMultiDo(CollectWindows)"] + 614 -> 616 [style=solid label=""] + } + subgraph cluster_617 { + label = "Wait on Transforms:PollMessage/To wait view 0/Sample.Any" + subgraph cluster_618 { + label = "Wait on Transforms:PollMessage/To wait view 0/Sample.Any/Combine.globally(SampleAny)" + subgraph cluster_619 { + label = "Wait on Transforms:PollMessage/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys" + subgraph cluster_620 { + label = "Wait on Transforms:PollMessage/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys" + subgraph cluster_621 { + label = "Wait on Transforms:PollMessage/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys/Map" + 622 [label="ParMultiDo(Anonymous)"] + 616 -> 622 [style=solid label=""] + } + } + } + subgraph cluster_623 { + label = "Wait on Transforms:PollMessage/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)" + 624 [label="GroupByKey"] + 622 -> 624 [style=solid label=""] + subgraph cluster_625 { + label = "Wait on Transforms:PollMessage/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues" + subgraph cluster_626 { + label = "Wait on Transforms:PollMessage/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues/ParDo(Anonymous)" + 627 [label="ParMultiDo(Anonymous)"] + 624 -> 627 [style=solid label=""] + } + } + } + subgraph cluster_628 { + label = "Wait on Transforms:PollMessage/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values" + subgraph cluster_629 { + label = "Wait on Transforms:PollMessage/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values" + subgraph cluster_630 { + label = "Wait on Transforms:PollMessage/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values/Map" + 631 [label="ParMultiDo(Anonymous)"] + 627 -> 631 [style=solid label=""] + } + } + } + } + subgraph cluster_632 { + label = "Wait on Transforms:PollMessage/To wait view 0/Sample.Any/Flatten.Iterables" + subgraph cluster_633 { + label = "Wait on Transforms:PollMessage/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables" + subgraph cluster_634 { + label = "Wait on Transforms:PollMessage/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables/FlatMap" + 635 [label="ParMultiDo(Anonymous)"] + 631 -> 635 [style=solid label=""] + } + } + } + } + subgraph cluster_636 { + label = "Wait on Transforms:PollMessage/To wait view 0/View.AsList" + subgraph cluster_637 { + label = "Wait on Transforms:PollMessage/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization" + subgraph cluster_638 { + label = "Wait on Transforms:PollMessage/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)" + 639 [label="ParMultiDo(VoidKeyToMultimapMaterialization)"] + 635 -> 639 [style=solid label=""] + } + } + 640 [label="View.CreatePCollectionView"] + 639 -> 640 [style=solid label=""] + } + } + subgraph cluster_641 { + label = "Wait on Transforms:PollMessage/Wait" + subgraph cluster_642 { + label = "Wait on Transforms:PollMessage/Wait/Map" + 643 [label="ParMultiDo(Anonymous)"] + 89 -> 643 [style=solid label=""] + 639 -> 643 [style=dashed label=""] + } + } + } + subgraph cluster_644 { + label = "Write to Sql: Transforms:DomainBase" + subgraph cluster_645 { + label = "Write to Sql: Transforms:DomainBase/Shard data Transforms:DomainBase" + subgraph cluster_646 { + label = "Write to Sql: Transforms:DomainBase/Shard data Transforms:DomainBase/AddKeys" + subgraph cluster_647 { + label = "Write to Sql: Transforms:DomainBase/Shard data Transforms:DomainBase/AddKeys/Map" + 648 [label="ParMultiDo(Anonymous)"] + 643 -> 648 [style=solid label=""] + } + } + } + subgraph cluster_649 { + label = "Write to Sql: Transforms:DomainBase/Group into batches Transforms:DomainBase" + subgraph cluster_650 { + label = "Write to Sql: Transforms:DomainBase/Group into batches Transforms:DomainBase/MapElements" + subgraph cluster_651 { + label = "Write to Sql: Transforms:DomainBase/Group into batches Transforms:DomainBase/MapElements/Map" + 652 [label="ParMultiDo(Anonymous)"] + 648 -> 652 [style=solid label=""] + } + } + subgraph cluster_653 { + label = "Write to Sql: Transforms:DomainBase/Group into batches Transforms:DomainBase/ParDo(GroupIntoBatches)" + 654 [label="ParMultiDo(GroupIntoBatches)"] + 652 -> 654 [style=solid label=""] + } + } + subgraph cluster_655 { + label = "Write to Sql: Transforms:DomainBase/Write in batch for Transforms:DomainBase" + 656 [label="ParMultiDo(SqlBatchWriter)"] + 654 -> 656 [style=solid label=""] } } } diff --git a/core/src/test/resources/google/registry/beam/initsql/pipeline_golden.png b/core/src/test/resources/google/registry/beam/initsql/pipeline_golden.png index fa7cc713f..35e80945b 100644 Binary files a/core/src/test/resources/google/registry/beam/initsql/pipeline_golden.png and b/core/src/test/resources/google/registry/beam/initsql/pipeline_golden.png differ