Update BEAM SDK to work with Java 11 (#762)

* Update BEAM SDK to work with Java 11

Upgraded BEAM dependencies to 2.23.0.

Updated Spec11 and invoice pipelines:
- Added the required region parameter.
- Removed the workaround code for staging.

Verified that staging is successful in alpha:
./nom_build :core:registryTool --args='-e alpha --sql_access_info "gs://..." deploy_spec11_pipeline --project domain-registry-alpha'

and

./nom_build :core:registryTool --args='-e alpha --sql_access_info "gs://..." deploy_invoicing_pipeline'
This commit is contained in:
Weimin Yu 2020-08-10 20:56:08 -04:00 committed by GitHub
parent a7e1bd800b
commit 8dbfbb0f33
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
70 changed files with 6555 additions and 6137 deletions

View file

@ -14,8 +14,6 @@
package google.registry.beam.invoicing;
import static com.google.common.collect.ImmutableList.toImmutableList;
import com.google.auth.oauth2.GoogleCredentials;
import google.registry.beam.invoicing.BillingEvent.InvoiceGroupingKey;
import google.registry.beam.invoicing.BillingEvent.InvoiceGroupingKey.InvoiceGroupingKeyCoder;
@ -24,9 +22,7 @@ import google.registry.config.RegistryConfig.Config;
import google.registry.reporting.billing.BillingModule;
import google.registry.reporting.billing.GenerateInvoicesAction;
import google.registry.util.GoogleCredentialsBundle;
import java.io.File;
import java.io.Serializable;
import java.util.Arrays;
import javax.inject.Inject;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
@ -63,6 +59,7 @@ import org.apache.beam.sdk.values.TypeDescriptors;
public class InvoicingPipeline implements Serializable {
private final String projectId;
private final String beamJobRegion;
private final String beamBucketUrl;
private final String invoiceTemplateUrl;
private final String beamStagingUrl;
@ -73,6 +70,7 @@ public class InvoicingPipeline implements Serializable {
@Inject
public InvoicingPipeline(
@Config("projectId") String projectId,
@Config("defaultJobRegion") String beamJobRegion,
@Config("apacheBeamBucketUrl") String beamBucketUrl,
@Config("invoiceTemplateUrl") String invoiceTemplateUrl,
@Config("beamStagingUrl") String beamStagingUrl,
@ -80,6 +78,7 @@ public class InvoicingPipeline implements Serializable {
@Config("invoiceFilePrefix") String invoiceFilePrefix,
@LocalCredential GoogleCredentialsBundle googleCredentialsBundle) {
this.projectId = projectId;
this.beamJobRegion = beamJobRegion;
this.beamBucketUrl = beamBucketUrl;
this.invoiceTemplateUrl = invoiceTemplateUrl;
this.beamStagingUrl = beamStagingUrl;
@ -107,6 +106,7 @@ public class InvoicingPipeline implements Serializable {
// We can't store options as a member variable due to serialization concerns.
InvoicingPipelineOptions options = PipelineOptionsFactory.as(InvoicingPipelineOptions.class);
options.setProject(projectId);
options.setRegion(beamJobRegion);
options.setRunner(DataflowRunner.class);
// This causes p.run() to stage the pipeline as a template on GCS, as opposed to running it.
options.setTemplateLocation(invoiceTemplateUrl);
@ -115,14 +115,6 @@ public class InvoicingPipeline implements Serializable {
// So, make sure the credential has write permission to GCS in that project.
options.setGcpCredential(googleCredentials);
// The BEAM files-to-stage classpath loader is broken past Java 8, so we do this manually.
// See https://issues.apache.org/jira/browse/BEAM-2530
options.setFilesToStage(
Arrays.stream(System.getProperty("java.class.path").split(File.separator))
.map(File::new)
.map(File::getPath)
.collect(toImmutableList()));
Pipeline p = Pipeline.create(options);
PCollection<BillingEvent> billingEvents =

View file

@ -15,7 +15,6 @@
package google.registry.beam.spec11;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static google.registry.beam.BeamUtils.getQueryFromFile;
import com.google.auth.oauth2.GoogleCredentials;
@ -32,9 +31,7 @@ import google.registry.persistence.transaction.JpaTransactionManager;
import google.registry.util.GoogleCredentialsBundle;
import google.registry.util.Retrier;
import google.registry.util.SqlTemplate;
import java.io.File;
import java.io.Serializable;
import java.util.Arrays;
import javax.inject.Inject;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
@ -92,6 +89,7 @@ public class Spec11Pipeline implements Serializable {
public static final String THREAT_MATCHES_FIELD = "threatMatches";
private final String projectId;
private final String beamJobRegion;
private final String beamStagingUrl;
private final String spec11TemplateUrl;
private final String reportingBucketUrl;
@ -102,6 +100,7 @@ public class Spec11Pipeline implements Serializable {
@Inject
public Spec11Pipeline(
@Config("projectId") String projectId,
@Config("defaultJobRegion") String beamJobRegion,
@Config("beamStagingUrl") String beamStagingUrl,
@Config("spec11TemplateUrl") String spec11TemplateUrl,
@Config("reportingBucketUrl") String reportingBucketUrl,
@ -109,6 +108,7 @@ public class Spec11Pipeline implements Serializable {
@LocalCredential GoogleCredentialsBundle googleCredentialsBundle,
Retrier retrier) {
this.projectId = projectId;
this.beamJobRegion = beamJobRegion;
this.beamStagingUrl = beamStagingUrl;
this.spec11TemplateUrl = spec11TemplateUrl;
this.reportingBucketUrl = reportingBucketUrl;
@ -149,6 +149,7 @@ public class Spec11Pipeline implements Serializable {
// We can't store options as a member variable due to serialization concerns.
Spec11PipelineOptions options = PipelineOptionsFactory.as(Spec11PipelineOptions.class);
options.setProject(projectId);
options.setRegion(beamJobRegion);
options.setRunner(DataflowRunner.class);
// This causes p.run() to stage the pipeline as a template on GCS, as opposed to running it.
options.setTemplateLocation(spec11TemplateUrl);
@ -157,14 +158,6 @@ public class Spec11Pipeline implements Serializable {
// So, make sure the credential has write permission to GCS in that project.
options.setGcpCredential(googleCredentials);
// The BEAM files-to-stage classpath loader is broken past Java 8, so we do this manually.
// See https://issues.apache.org/jira/browse/BEAM-2530
options.setFilesToStage(
Arrays.stream(System.getProperty("java.class.path").split(File.separator))
.map(File::new)
.map(File::getPath)
.collect(toImmutableList()));
Pipeline p = Pipeline.create(options);
PCollection<Subdomain> domains =
p.apply(

View file

@ -615,6 +615,18 @@ public final class RegistryConfig {
return config.registryPolicy.requireSslCertificates;
}
/**
* Returns the default job region to run Apache Beam (Cloud Dataflow) jobs in.
*
* @see google.registry.beam.invoicing.InvoicingPipeline
* @see google.registry.beam.spec11.Spec11Pipeline
*/
@Provides
@Config("defaultJobRegion")
public static String provideDefaultJobRegion(RegistryConfigSettings config) {
return config.beam.defaultJobRegion;
}
/**
* Returns the default job zone to run Apache Beam (Cloud Dataflow) jobs in.
*

View file

@ -127,6 +127,7 @@ public class RegistryConfigSettings {
/** Configuration for Apache Beam (Cloud Dataflow). */
public static class Beam {
public String defaultJobRegion;
public String defaultJobZone;
}

View file

@ -419,7 +419,10 @@ misc:
transientFailureRetries: 12
beam:
# The default region to run Apache Beam (Cloud Dataflow) jobs in.
defaultJobRegion: us-east1
# The default zone to run Apache Beam (Cloud Dataflow) jobs in.
# TODO(weiminyu): consider dropping zone config. No obvious needs for this.
defaultJobZone: us-east1-c
keyring:

View file

@ -34,6 +34,10 @@ public class DeploySpec11PipelineCommand implements Command {
@Config("projectId")
String projectId;
@Inject
@Config("defaultJobRegion")
String beamJobRegion;
@Parameter(
names = {"-p", "--project"},
description = "Cloud KMS project ID",
@ -71,6 +75,7 @@ public class DeploySpec11PipelineCommand implements Command {
Spec11Pipeline pipeline =
new Spec11Pipeline(
projectId,
beamJobRegion,
beamStagingUrl,
spec11TemplateUrl,
reportingBucketUrl,

View file

@ -69,6 +69,7 @@ class InvoicingPipelineTest {
invoicingPipeline =
new InvoicingPipeline(
"test-project",
"region",
beamTempFolder,
beamTempFolder + "/templates/invoicing",
beamTempFolder + "/staging",

View file

@ -124,6 +124,7 @@ class Spec11PipelineTest {
spec11Pipeline =
new Spec11Pipeline(
"test-project",
"region",
beamTempFolder + "/staging",
beamTempFolder + "/templates/invoicing",
tmpDir.toAbsolutePath().toString(),