From 4d92ba4b8e1214b245fa08d90d69540e16e36223 Mon Sep 17 00:00:00 2001 From: gbrodman Date: Mon, 10 Aug 2020 15:15:30 -0400 Subject: [PATCH] Manually set the files to stage in Beam using the classpath (#760) See https://issues.apache.org/jira/browse/BEAM-2530 for more details on why this is necessary, but basically Beam assumes that we are using a URLClassLoader which is no longer the case post-Java-8. This means that we have to manually specify the files to stage. See https://stackoverflow.com/questions/48292491/java-dataflow-unable-to-use-classloader-to-detect-classpath-elements Tested by building, deploying, and running the Spec11 pipeline on Alpha using Java 11. --- .../registry/beam/invoicing/InvoicingPipeline.java | 12 ++++++++++++ .../google/registry/beam/spec11/Spec11Pipeline.java | 11 +++++++++++ 2 files changed, 23 insertions(+) diff --git a/core/src/main/java/google/registry/beam/invoicing/InvoicingPipeline.java b/core/src/main/java/google/registry/beam/invoicing/InvoicingPipeline.java index 24c6e327f..234111b39 100644 --- a/core/src/main/java/google/registry/beam/invoicing/InvoicingPipeline.java +++ b/core/src/main/java/google/registry/beam/invoicing/InvoicingPipeline.java @@ -14,6 +14,8 @@ package google.registry.beam.invoicing; +import static com.google.common.collect.ImmutableList.toImmutableList; + import com.google.auth.oauth2.GoogleCredentials; import google.registry.beam.invoicing.BillingEvent.InvoiceGroupingKey; import google.registry.beam.invoicing.BillingEvent.InvoiceGroupingKey.InvoiceGroupingKeyCoder; @@ -22,7 +24,9 @@ import google.registry.config.RegistryConfig.Config; import google.registry.reporting.billing.BillingModule; import google.registry.reporting.billing.GenerateInvoicesAction; import google.registry.util.GoogleCredentialsBundle; +import java.io.File; import java.io.Serializable; +import java.util.Arrays; import javax.inject.Inject; import org.apache.beam.runners.dataflow.DataflowRunner; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; @@ -111,6 +115,14 @@ public class InvoicingPipeline implements Serializable { // So, make sure the credential has write permission to GCS in that project. options.setGcpCredential(googleCredentials); + // The BEAM files-to-stage classpath loader is broken past Java 8, so we do this manually. + // See https://issues.apache.org/jira/browse/BEAM-2530 + options.setFilesToStage( + Arrays.stream(System.getProperty("java.class.path").split(File.separator)) + .map(File::new) + .map(File::getPath) + .collect(toImmutableList())); + Pipeline p = Pipeline.create(options); PCollection billingEvents = diff --git a/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java b/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java index 5f6208d6b..3b83878ae 100644 --- a/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java +++ b/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java @@ -15,6 +15,7 @@ package google.registry.beam.spec11; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.collect.ImmutableList.toImmutableList; import static google.registry.beam.BeamUtils.getQueryFromFile; import com.google.auth.oauth2.GoogleCredentials; @@ -31,7 +32,9 @@ import google.registry.persistence.transaction.JpaTransactionManager; import google.registry.util.GoogleCredentialsBundle; import google.registry.util.Retrier; import google.registry.util.SqlTemplate; +import java.io.File; import java.io.Serializable; +import java.util.Arrays; import javax.inject.Inject; import org.apache.beam.runners.dataflow.DataflowRunner; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; @@ -154,6 +157,14 @@ public class Spec11Pipeline implements Serializable { // So, make sure the credential has write permission to GCS in that project. options.setGcpCredential(googleCredentials); + // The BEAM files-to-stage classpath loader is broken past Java 8, so we do this manually. + // See https://issues.apache.org/jira/browse/BEAM-2530 + options.setFilesToStage( + Arrays.stream(System.getProperty("java.class.path").split(File.separator)) + .map(File::new) + .map(File::getPath) + .collect(toImmutableList())); + Pipeline p = Pipeline.create(options); PCollection domains = p.apply(