diff --git a/core/src/main/java/google/registry/beam/datastore/BulkDeleteDatastorePipeline.java b/core/src/main/java/google/registry/beam/datastore/BulkDeleteDatastorePipeline.java index 88e7b8ad8..e6500e13e 100644 --- a/core/src/main/java/google/registry/beam/datastore/BulkDeleteDatastorePipeline.java +++ b/core/src/main/java/google/registry/beam/datastore/BulkDeleteDatastorePipeline.java @@ -87,20 +87,18 @@ public class BulkDeleteDatastorePipeline { private final BulkDeletePipelineOptions options; - private final Pipeline pipeline; - BulkDeleteDatastorePipeline(BulkDeletePipelineOptions options) { this.options = options; - pipeline = Pipeline.create(options); } public void run() { - setupPipeline(); + Pipeline pipeline = Pipeline.create(options); + setupPipeline(pipeline); pipeline.run(); } @SuppressWarnings("deprecation") // org.apache.beam.sdk.transforms.Reshuffle - private void setupPipeline() { + private void setupPipeline(Pipeline pipeline) { checkState( !FORBIDDEN_PROJECTS.contains(options.getProject()), "Bulk delete is forbidden in %s", diff --git a/core/src/main/java/google/registry/beam/initsql/InitSqlPipeline.java b/core/src/main/java/google/registry/beam/initsql/InitSqlPipeline.java index 7294593ca..8b94cb5dc 100644 --- a/core/src/main/java/google/registry/beam/initsql/InitSqlPipeline.java +++ b/core/src/main/java/google/registry/beam/initsql/InitSqlPipeline.java @@ -120,26 +120,22 @@ public class InitSqlPipeline implements Serializable { private final InitSqlPipelineOptions options; - private final Pipeline pipeline; - InitSqlPipeline(InitSqlPipelineOptions options) { this.options = options; - pipeline = Pipeline.create(options); + } + + PipelineResult run() { + return run(Pipeline.create(options)); } @VisibleForTesting - InitSqlPipeline(InitSqlPipelineOptions options, Pipeline pipeline) { - this.options = options; - this.pipeline = pipeline; - } - - public PipelineResult run() { - setupPipeline(); + PipelineResult run(Pipeline pipeline) { + setupPipeline(pipeline); return pipeline.run(); } @VisibleForTesting - void setupPipeline() { + void setupPipeline(Pipeline pipeline) { options.setIsolationOverride(TransactionIsolationLevel.TRANSACTION_READ_UNCOMMITTED); PCollectionTuple datastoreSnapshot = pipeline.apply( diff --git a/core/src/main/java/google/registry/beam/invoicing/InvoicingPipeline.java b/core/src/main/java/google/registry/beam/invoicing/InvoicingPipeline.java index f9ae7d294..7806caa51 100644 --- a/core/src/main/java/google/registry/beam/invoicing/InvoicingPipeline.java +++ b/core/src/main/java/google/registry/beam/invoicing/InvoicingPipeline.java @@ -17,7 +17,6 @@ package google.registry.beam.invoicing; import static google.registry.beam.BeamUtils.getQueryFromFile; import static org.apache.beam.sdk.values.TypeDescriptors.strings; -import com.google.common.annotations.VisibleForTesting; import google.registry.beam.invoicing.BillingEvent.InvoiceGroupingKey; import google.registry.beam.invoicing.BillingEvent.InvoiceGroupingKey.InvoiceGroupingKeyCoder; import google.registry.reporting.billing.BillingModule; @@ -60,24 +59,18 @@ public class InvoicingPipeline implements Serializable { DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"); private final InvoicingPipelineOptions options; - private final Pipeline pipeline; - - @VisibleForTesting - InvoicingPipeline(InvoicingPipelineOptions options, Pipeline pipeline) { - this.options = options; - this.pipeline = pipeline; - } InvoicingPipeline(InvoicingPipelineOptions options) { - this(options, Pipeline.create(options)); + this.options = options; } PipelineResult run() { - setupPipeline(); + Pipeline pipeline = Pipeline.create(options); + setupPipeline(pipeline); return pipeline.run(); } - void setupPipeline() { + void setupPipeline(Pipeline pipeline) { PCollection billingEvents = pipeline.apply( "Read BillingEvents from Bigquery", diff --git a/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java b/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java index 674e9d22e..c9cef2d5a 100644 --- a/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java +++ b/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java @@ -18,7 +18,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static google.registry.beam.BeamUtils.getQueryFromFile; import com.google.auto.value.AutoValue; -import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; import dagger.Component; import dagger.Module; @@ -84,26 +83,19 @@ public class Spec11Pipeline implements Serializable { private final Spec11PipelineOptions options; private final EvaluateSafeBrowsingFn safeBrowsingFn; - private final Pipeline pipeline; - - @VisibleForTesting - Spec11Pipeline( - Spec11PipelineOptions options, EvaluateSafeBrowsingFn safeBrowsingFn, Pipeline pipeline) { - this.options = options; - this.safeBrowsingFn = safeBrowsingFn; - this.pipeline = pipeline; - } Spec11Pipeline(Spec11PipelineOptions options, EvaluateSafeBrowsingFn safeBrowsingFn) { - this(options, safeBrowsingFn, Pipeline.create(options)); + this.options = options; + this.safeBrowsingFn = safeBrowsingFn; } PipelineResult run() { - setupPipeline(); + Pipeline pipeline = Pipeline.create(options); + setupPipeline(pipeline); return pipeline.run(); } - void setupPipeline() { + void setupPipeline(Pipeline pipeline) { PCollection domains = pipeline.apply( "Read active domains from BigQuery", diff --git a/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineGraphTest.java b/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineGraphTest.java index a44e2c2a2..7459c6f6a 100644 --- a/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineGraphTest.java +++ b/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineGraphTest.java @@ -52,7 +52,7 @@ class InitSqlPipelineGraphTest { @Test void createPipeline_compareGraph() throws IOException { - new InitSqlPipeline(options, testPipeline).setupPipeline(); + new InitSqlPipeline(options).setupPipeline(testPipeline); String dotString = PipelineDotRenderer.toDotString(testPipeline); URL goldenDotUrl = Resources.getResource(InitSqlPipelineGraphTest.class, GOLDEN_DOT_FILE); File outputFile = new File(new File(goldenDotUrl.getFile()).getParent(), "pipeline_curr.dot"); diff --git a/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineTest.java b/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineTest.java index 4f835f004..9800c93d1 100644 --- a/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineTest.java +++ b/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineTest.java @@ -317,8 +317,8 @@ class InitSqlPipelineTest { "--commitLogDir=" + commitLogDir.getAbsolutePath()) .withValidation() .as(InitSqlPipelineOptions.class); - InitSqlPipeline initSqlPipeline = new InitSqlPipeline(options, testPipeline); - initSqlPipeline.run().waitUntilFinish(); + InitSqlPipeline initSqlPipeline = new InitSqlPipeline(options); + initSqlPipeline.run(testPipeline).waitUntilFinish(); try (AppEngineEnvironment env = new AppEngineEnvironment("test")) { assertHostResourceEquals( jpaTm().transact(() -> jpaTm().loadByKey(hostResource.createVKey())), hostResource);