From eb6a1fe1edbf35a1c24f18efd2ba5e2f8a6f7323 Mon Sep 17 00:00:00 2001 From: Lai Jiang Date: Fri, 30 Apr 2021 14:32:33 -0400 Subject: [PATCH] Remove Pipeline as a field in pipeline classes (#1119) In tests we use a TestPipelineExtension which does some static initialization that should not be repeated the same JVM. In our XXXPipeline classes we save the pipeline as a field and usually write lambdas that are pass to the pipeline. Because lambdas are effectively anonymous inner classes they are bound to their enclosing instances. When they get serialized during pipeline execution, their enclosing classes also do. This might result in undefined behavior when multiple lambdas in the same XXXPipeline are used on the same JVM (such as in tests) where the static initialization may be done multiple times if different class loaders are used. This is very unlikely to happen but as a best practice we still remove them as fields. --- .../datastore/BulkDeleteDatastorePipeline.java | 8 +++----- .../registry/beam/initsql/InitSqlPipeline.java | 18 +++++++----------- .../beam/invoicing/InvoicingPipeline.java | 15 ++++----------- .../registry/beam/spec11/Spec11Pipeline.java | 18 +++++------------- .../beam/initsql/InitSqlPipelineGraphTest.java | 2 +- .../beam/initsql/InitSqlPipelineTest.java | 4 ++-- 6 files changed, 22 insertions(+), 43 deletions(-) diff --git a/core/src/main/java/google/registry/beam/datastore/BulkDeleteDatastorePipeline.java b/core/src/main/java/google/registry/beam/datastore/BulkDeleteDatastorePipeline.java index 88e7b8ad8..e6500e13e 100644 --- a/core/src/main/java/google/registry/beam/datastore/BulkDeleteDatastorePipeline.java +++ b/core/src/main/java/google/registry/beam/datastore/BulkDeleteDatastorePipeline.java @@ -87,20 +87,18 @@ public class BulkDeleteDatastorePipeline { private final BulkDeletePipelineOptions options; - private final Pipeline pipeline; - BulkDeleteDatastorePipeline(BulkDeletePipelineOptions options) { this.options = options; - pipeline = Pipeline.create(options); } public void run() { - setupPipeline(); + Pipeline pipeline = Pipeline.create(options); + setupPipeline(pipeline); pipeline.run(); } @SuppressWarnings("deprecation") // org.apache.beam.sdk.transforms.Reshuffle - private void setupPipeline() { + private void setupPipeline(Pipeline pipeline) { checkState( !FORBIDDEN_PROJECTS.contains(options.getProject()), "Bulk delete is forbidden in %s", diff --git a/core/src/main/java/google/registry/beam/initsql/InitSqlPipeline.java b/core/src/main/java/google/registry/beam/initsql/InitSqlPipeline.java index 7294593ca..8b94cb5dc 100644 --- a/core/src/main/java/google/registry/beam/initsql/InitSqlPipeline.java +++ b/core/src/main/java/google/registry/beam/initsql/InitSqlPipeline.java @@ -120,26 +120,22 @@ public class InitSqlPipeline implements Serializable { private final InitSqlPipelineOptions options; - private final Pipeline pipeline; - InitSqlPipeline(InitSqlPipelineOptions options) { this.options = options; - pipeline = Pipeline.create(options); + } + + PipelineResult run() { + return run(Pipeline.create(options)); } @VisibleForTesting - InitSqlPipeline(InitSqlPipelineOptions options, Pipeline pipeline) { - this.options = options; - this.pipeline = pipeline; - } - - public PipelineResult run() { - setupPipeline(); + PipelineResult run(Pipeline pipeline) { + setupPipeline(pipeline); return pipeline.run(); } @VisibleForTesting - void setupPipeline() { + void setupPipeline(Pipeline pipeline) { options.setIsolationOverride(TransactionIsolationLevel.TRANSACTION_READ_UNCOMMITTED); PCollectionTuple datastoreSnapshot = pipeline.apply( diff --git a/core/src/main/java/google/registry/beam/invoicing/InvoicingPipeline.java b/core/src/main/java/google/registry/beam/invoicing/InvoicingPipeline.java index f9ae7d294..7806caa51 100644 --- a/core/src/main/java/google/registry/beam/invoicing/InvoicingPipeline.java +++ b/core/src/main/java/google/registry/beam/invoicing/InvoicingPipeline.java @@ -17,7 +17,6 @@ package google.registry.beam.invoicing; import static google.registry.beam.BeamUtils.getQueryFromFile; import static org.apache.beam.sdk.values.TypeDescriptors.strings; -import com.google.common.annotations.VisibleForTesting; import google.registry.beam.invoicing.BillingEvent.InvoiceGroupingKey; import google.registry.beam.invoicing.BillingEvent.InvoiceGroupingKey.InvoiceGroupingKeyCoder; import google.registry.reporting.billing.BillingModule; @@ -60,24 +59,18 @@ public class InvoicingPipeline implements Serializable { DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"); private final InvoicingPipelineOptions options; - private final Pipeline pipeline; - - @VisibleForTesting - InvoicingPipeline(InvoicingPipelineOptions options, Pipeline pipeline) { - this.options = options; - this.pipeline = pipeline; - } InvoicingPipeline(InvoicingPipelineOptions options) { - this(options, Pipeline.create(options)); + this.options = options; } PipelineResult run() { - setupPipeline(); + Pipeline pipeline = Pipeline.create(options); + setupPipeline(pipeline); return pipeline.run(); } - void setupPipeline() { + void setupPipeline(Pipeline pipeline) { PCollection billingEvents = pipeline.apply( "Read BillingEvents from Bigquery", diff --git a/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java b/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java index 674e9d22e..c9cef2d5a 100644 --- a/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java +++ b/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java @@ -18,7 +18,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static google.registry.beam.BeamUtils.getQueryFromFile; import com.google.auto.value.AutoValue; -import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; import dagger.Component; import dagger.Module; @@ -84,26 +83,19 @@ public class Spec11Pipeline implements Serializable { private final Spec11PipelineOptions options; private final EvaluateSafeBrowsingFn safeBrowsingFn; - private final Pipeline pipeline; - - @VisibleForTesting - Spec11Pipeline( - Spec11PipelineOptions options, EvaluateSafeBrowsingFn safeBrowsingFn, Pipeline pipeline) { - this.options = options; - this.safeBrowsingFn = safeBrowsingFn; - this.pipeline = pipeline; - } Spec11Pipeline(Spec11PipelineOptions options, EvaluateSafeBrowsingFn safeBrowsingFn) { - this(options, safeBrowsingFn, Pipeline.create(options)); + this.options = options; + this.safeBrowsingFn = safeBrowsingFn; } PipelineResult run() { - setupPipeline(); + Pipeline pipeline = Pipeline.create(options); + setupPipeline(pipeline); return pipeline.run(); } - void setupPipeline() { + void setupPipeline(Pipeline pipeline) { PCollection domains = pipeline.apply( "Read active domains from BigQuery", diff --git a/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineGraphTest.java b/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineGraphTest.java index a44e2c2a2..7459c6f6a 100644 --- a/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineGraphTest.java +++ b/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineGraphTest.java @@ -52,7 +52,7 @@ class InitSqlPipelineGraphTest { @Test void createPipeline_compareGraph() throws IOException { - new InitSqlPipeline(options, testPipeline).setupPipeline(); + new InitSqlPipeline(options).setupPipeline(testPipeline); String dotString = PipelineDotRenderer.toDotString(testPipeline); URL goldenDotUrl = Resources.getResource(InitSqlPipelineGraphTest.class, GOLDEN_DOT_FILE); File outputFile = new File(new File(goldenDotUrl.getFile()).getParent(), "pipeline_curr.dot"); diff --git a/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineTest.java b/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineTest.java index 4f835f004..9800c93d1 100644 --- a/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineTest.java +++ b/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineTest.java @@ -317,8 +317,8 @@ class InitSqlPipelineTest { "--commitLogDir=" + commitLogDir.getAbsolutePath()) .withValidation() .as(InitSqlPipelineOptions.class); - InitSqlPipeline initSqlPipeline = new InitSqlPipeline(options, testPipeline); - initSqlPipeline.run().waitUntilFinish(); + InitSqlPipeline initSqlPipeline = new InitSqlPipeline(options); + initSqlPipeline.run(testPipeline).waitUntilFinish(); try (AppEngineEnvironment env = new AppEngineEnvironment("test")) { assertHostResourceEquals( jpaTm().transact(() -> jpaTm().loadByKey(hostResource.createVKey())), hostResource);