Manually set the files to stage in Beam using the classpath (#760)

See https://issues.apache.org/jira/browse/BEAM-2530 for more details on
why this is necessary, but basically Beam assumes that we are using a
URLClassLoader which is no longer the case post-Java-8. This means that
we have to manually specify the files to stage.

See https://stackoverflow.com/questions/48292491/java-dataflow-unable-to-use-classloader-to-detect-classpath-elements

Tested by building, deploying, and running the Spec11 pipeline on Alpha
using Java 11.
This commit is contained in:
gbrodman 2020-08-10 15:15:30 -04:00 committed by GitHub
parent 7b2f7c08e4
commit 4d92ba4b8e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 23 additions and 0 deletions

View file

@ -14,6 +14,8 @@
package google.registry.beam.invoicing;
import static com.google.common.collect.ImmutableList.toImmutableList;
import com.google.auth.oauth2.GoogleCredentials;
import google.registry.beam.invoicing.BillingEvent.InvoiceGroupingKey;
import google.registry.beam.invoicing.BillingEvent.InvoiceGroupingKey.InvoiceGroupingKeyCoder;
@ -22,7 +24,9 @@ import google.registry.config.RegistryConfig.Config;
import google.registry.reporting.billing.BillingModule;
import google.registry.reporting.billing.GenerateInvoicesAction;
import google.registry.util.GoogleCredentialsBundle;
import java.io.File;
import java.io.Serializable;
import java.util.Arrays;
import javax.inject.Inject;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
@ -111,6 +115,14 @@ public class InvoicingPipeline implements Serializable {
// So, make sure the credential has write permission to GCS in that project.
options.setGcpCredential(googleCredentials);
// The BEAM files-to-stage classpath loader is broken past Java 8, so we do this manually.
// See https://issues.apache.org/jira/browse/BEAM-2530
options.setFilesToStage(
Arrays.stream(System.getProperty("java.class.path").split(File.separator))
.map(File::new)
.map(File::getPath)
.collect(toImmutableList()));
Pipeline p = Pipeline.create(options);
PCollection<BillingEvent> billingEvents =

View file

@ -15,6 +15,7 @@
package google.registry.beam.spec11;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static google.registry.beam.BeamUtils.getQueryFromFile;
import com.google.auth.oauth2.GoogleCredentials;
@ -31,7 +32,9 @@ import google.registry.persistence.transaction.JpaTransactionManager;
import google.registry.util.GoogleCredentialsBundle;
import google.registry.util.Retrier;
import google.registry.util.SqlTemplate;
import java.io.File;
import java.io.Serializable;
import java.util.Arrays;
import javax.inject.Inject;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
@ -154,6 +157,14 @@ public class Spec11Pipeline implements Serializable {
// So, make sure the credential has write permission to GCS in that project.
options.setGcpCredential(googleCredentials);
// The BEAM files-to-stage classpath loader is broken past Java 8, so we do this manually.
// See https://issues.apache.org/jira/browse/BEAM-2530
options.setFilesToStage(
Arrays.stream(System.getProperty("java.class.path").split(File.separator))
.map(File::new)
.map(File::getPath)
.collect(toImmutableList()));
Pipeline p = Pipeline.create(options);
PCollection<Subdomain> domains =
p.apply(