Remove Pipeline as a field in pipeline classes (#1119)

In tests we use a TestPipelineExtension which does some static
initialization that should not be repeated the same JVM. In our
XXXPipeline classes we save the pipeline as a field and usually write lambdas
that are pass to the pipeline. Because lambdas are effectively anonymous inner
classes they are bound to their enclosing instances. When they get serialized
during pipeline execution, their enclosing classes also do. This might result
in undefined behavior when multiple lambdas in the same XXXPipeline are used
on the same JVM (such as in tests) where the static initialization may be done
multiple times if different class loaders are used. This is very
unlikely to happen but as a best practice we still remove them as
fields.
This commit is contained in:
Lai Jiang 2021-04-30 14:32:33 -04:00 committed by GitHub
parent fb4e6c9df0
commit 4bc7077e3d
6 changed files with 22 additions and 43 deletions

View file

@ -87,20 +87,18 @@ public class BulkDeleteDatastorePipeline {
private final BulkDeletePipelineOptions options; private final BulkDeletePipelineOptions options;
private final Pipeline pipeline;
BulkDeleteDatastorePipeline(BulkDeletePipelineOptions options) { BulkDeleteDatastorePipeline(BulkDeletePipelineOptions options) {
this.options = options; this.options = options;
pipeline = Pipeline.create(options);
} }
public void run() { public void run() {
setupPipeline(); Pipeline pipeline = Pipeline.create(options);
setupPipeline(pipeline);
pipeline.run(); pipeline.run();
} }
@SuppressWarnings("deprecation") // org.apache.beam.sdk.transforms.Reshuffle @SuppressWarnings("deprecation") // org.apache.beam.sdk.transforms.Reshuffle
private void setupPipeline() { private void setupPipeline(Pipeline pipeline) {
checkState( checkState(
!FORBIDDEN_PROJECTS.contains(options.getProject()), !FORBIDDEN_PROJECTS.contains(options.getProject()),
"Bulk delete is forbidden in %s", "Bulk delete is forbidden in %s",

View file

@ -120,26 +120,22 @@ public class InitSqlPipeline implements Serializable {
private final InitSqlPipelineOptions options; private final InitSqlPipelineOptions options;
private final Pipeline pipeline;
InitSqlPipeline(InitSqlPipelineOptions options) { InitSqlPipeline(InitSqlPipelineOptions options) {
this.options = options; this.options = options;
pipeline = Pipeline.create(options); }
PipelineResult run() {
return run(Pipeline.create(options));
} }
@VisibleForTesting @VisibleForTesting
InitSqlPipeline(InitSqlPipelineOptions options, Pipeline pipeline) { PipelineResult run(Pipeline pipeline) {
this.options = options; setupPipeline(pipeline);
this.pipeline = pipeline;
}
public PipelineResult run() {
setupPipeline();
return pipeline.run(); return pipeline.run();
} }
@VisibleForTesting @VisibleForTesting
void setupPipeline() { void setupPipeline(Pipeline pipeline) {
options.setIsolationOverride(TransactionIsolationLevel.TRANSACTION_READ_UNCOMMITTED); options.setIsolationOverride(TransactionIsolationLevel.TRANSACTION_READ_UNCOMMITTED);
PCollectionTuple datastoreSnapshot = PCollectionTuple datastoreSnapshot =
pipeline.apply( pipeline.apply(

View file

@ -17,7 +17,6 @@ package google.registry.beam.invoicing;
import static google.registry.beam.BeamUtils.getQueryFromFile; import static google.registry.beam.BeamUtils.getQueryFromFile;
import static org.apache.beam.sdk.values.TypeDescriptors.strings; import static org.apache.beam.sdk.values.TypeDescriptors.strings;
import com.google.common.annotations.VisibleForTesting;
import google.registry.beam.invoicing.BillingEvent.InvoiceGroupingKey; import google.registry.beam.invoicing.BillingEvent.InvoiceGroupingKey;
import google.registry.beam.invoicing.BillingEvent.InvoiceGroupingKey.InvoiceGroupingKeyCoder; import google.registry.beam.invoicing.BillingEvent.InvoiceGroupingKey.InvoiceGroupingKeyCoder;
import google.registry.reporting.billing.BillingModule; import google.registry.reporting.billing.BillingModule;
@ -60,24 +59,18 @@ public class InvoicingPipeline implements Serializable {
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"); DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
private final InvoicingPipelineOptions options; private final InvoicingPipelineOptions options;
private final Pipeline pipeline;
@VisibleForTesting
InvoicingPipeline(InvoicingPipelineOptions options, Pipeline pipeline) {
this.options = options;
this.pipeline = pipeline;
}
InvoicingPipeline(InvoicingPipelineOptions options) { InvoicingPipeline(InvoicingPipelineOptions options) {
this(options, Pipeline.create(options)); this.options = options;
} }
PipelineResult run() { PipelineResult run() {
setupPipeline(); Pipeline pipeline = Pipeline.create(options);
setupPipeline(pipeline);
return pipeline.run(); return pipeline.run();
} }
void setupPipeline() { void setupPipeline(Pipeline pipeline) {
PCollection<BillingEvent> billingEvents = PCollection<BillingEvent> billingEvents =
pipeline.apply( pipeline.apply(
"Read BillingEvents from Bigquery", "Read BillingEvents from Bigquery",

View file

@ -18,7 +18,6 @@ import static com.google.common.base.Preconditions.checkArgument;
import static google.registry.beam.BeamUtils.getQueryFromFile; import static google.registry.beam.BeamUtils.getQueryFromFile;
import com.google.auto.value.AutoValue; import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import dagger.Component; import dagger.Component;
import dagger.Module; import dagger.Module;
@ -84,26 +83,19 @@ public class Spec11Pipeline implements Serializable {
private final Spec11PipelineOptions options; private final Spec11PipelineOptions options;
private final EvaluateSafeBrowsingFn safeBrowsingFn; private final EvaluateSafeBrowsingFn safeBrowsingFn;
private final Pipeline pipeline;
@VisibleForTesting
Spec11Pipeline(
Spec11PipelineOptions options, EvaluateSafeBrowsingFn safeBrowsingFn, Pipeline pipeline) {
this.options = options;
this.safeBrowsingFn = safeBrowsingFn;
this.pipeline = pipeline;
}
Spec11Pipeline(Spec11PipelineOptions options, EvaluateSafeBrowsingFn safeBrowsingFn) { Spec11Pipeline(Spec11PipelineOptions options, EvaluateSafeBrowsingFn safeBrowsingFn) {
this(options, safeBrowsingFn, Pipeline.create(options)); this.options = options;
this.safeBrowsingFn = safeBrowsingFn;
} }
PipelineResult run() { PipelineResult run() {
setupPipeline(); Pipeline pipeline = Pipeline.create(options);
setupPipeline(pipeline);
return pipeline.run(); return pipeline.run();
} }
void setupPipeline() { void setupPipeline(Pipeline pipeline) {
PCollection<Subdomain> domains = PCollection<Subdomain> domains =
pipeline.apply( pipeline.apply(
"Read active domains from BigQuery", "Read active domains from BigQuery",

View file

@ -52,7 +52,7 @@ class InitSqlPipelineGraphTest {
@Test @Test
void createPipeline_compareGraph() throws IOException { void createPipeline_compareGraph() throws IOException {
new InitSqlPipeline(options, testPipeline).setupPipeline(); new InitSqlPipeline(options).setupPipeline(testPipeline);
String dotString = PipelineDotRenderer.toDotString(testPipeline); String dotString = PipelineDotRenderer.toDotString(testPipeline);
URL goldenDotUrl = Resources.getResource(InitSqlPipelineGraphTest.class, GOLDEN_DOT_FILE); URL goldenDotUrl = Resources.getResource(InitSqlPipelineGraphTest.class, GOLDEN_DOT_FILE);
File outputFile = new File(new File(goldenDotUrl.getFile()).getParent(), "pipeline_curr.dot"); File outputFile = new File(new File(goldenDotUrl.getFile()).getParent(), "pipeline_curr.dot");

View file

@ -317,8 +317,8 @@ class InitSqlPipelineTest {
"--commitLogDir=" + commitLogDir.getAbsolutePath()) "--commitLogDir=" + commitLogDir.getAbsolutePath())
.withValidation() .withValidation()
.as(InitSqlPipelineOptions.class); .as(InitSqlPipelineOptions.class);
InitSqlPipeline initSqlPipeline = new InitSqlPipeline(options, testPipeline); InitSqlPipeline initSqlPipeline = new InitSqlPipeline(options);
initSqlPipeline.run().waitUntilFinish(); initSqlPipeline.run(testPipeline).waitUntilFinish();
try (AppEngineEnvironment env = new AppEngineEnvironment("test")) { try (AppEngineEnvironment env = new AppEngineEnvironment("test")) {
assertHostResourceEquals( assertHostResourceEquals(
jpaTm().transact(() -> jpaTm().loadByKey(hostResource.createVKey())), hostResource); jpaTm().transact(() -> jpaTm().loadByKey(hostResource.createVKey())), hostResource);