diff --git a/core/src/main/java/google/registry/beam/invoicing/InvoicingPipeline.java b/core/src/main/java/google/registry/beam/invoicing/InvoicingPipeline.java index 0f70ff631..2130a8b63 100644 --- a/core/src/main/java/google/registry/beam/invoicing/InvoicingPipeline.java +++ b/core/src/main/java/google/registry/beam/invoicing/InvoicingPipeline.java @@ -14,11 +14,14 @@ package google.registry.beam.invoicing; +import com.google.auth.oauth2.GoogleCredentials; import google.registry.beam.invoicing.BillingEvent.InvoiceGroupingKey; import google.registry.beam.invoicing.BillingEvent.InvoiceGroupingKey.InvoiceGroupingKeyCoder; +import google.registry.config.CredentialModule.LocalCredential; import google.registry.config.RegistryConfig.Config; import google.registry.reporting.billing.BillingModule; import google.registry.reporting.billing.GenerateInvoicesAction; +import google.registry.util.GoogleCredentialsBundle; import java.io.Serializable; import javax.inject.Inject; import org.apache.beam.runners.dataflow.DataflowRunner; @@ -55,32 +58,31 @@ import org.apache.beam.sdk.values.TypeDescriptors; */ public class InvoicingPipeline implements Serializable { - @Inject - @Config("projectId") - String projectId; + private final String projectId; + private final String beamBucketUrl; + private final String invoiceTemplateUrl; + private final String beamStagingUrl; + private final String billingBucketUrl; + private final String invoiceFilePrefix; + private final GoogleCredentials googleCredentials; @Inject - @Config("apacheBeamBucketUrl") - String beamBucketUrl; - - @Inject - @Config("invoiceTemplateUrl") - String invoiceTemplateUrl; - - @Inject - @Config("beamStagingUrl") - String beamStagingUrl; - - @Inject - @Config("billingBucketUrl") - String billingBucketUrl; - - @Inject - @Config("invoiceFilePrefix") - String invoiceFilePrefix; - - @Inject - InvoicingPipeline() {} + public InvoicingPipeline( + @Config("projectId") String projectId, + @Config("apacheBeamBucketUrl") String beamBucketUrl, + @Config("invoiceTemplateUrl") String invoiceTemplateUrl, + @Config("beamStagingUrl") String beamStagingUrl, + @Config("billingBucketUrl") String billingBucketUrl, + @Config("invoiceFilePrefix") String invoiceFilePrefix, + @LocalCredential GoogleCredentialsBundle googleCredentialsBundle) { + this.projectId = projectId; + this.beamBucketUrl = beamBucketUrl; + this.invoiceTemplateUrl = invoiceTemplateUrl; + this.beamStagingUrl = beamStagingUrl; + this.billingBucketUrl = billingBucketUrl; + this.invoiceFilePrefix = invoiceFilePrefix; + this.googleCredentials = googleCredentialsBundle.getGoogleCredentials(); + } /** Custom options for running the invoicing pipeline. */ interface InvoicingPipelineOptions extends DataflowPipelineOptions { @@ -105,6 +107,10 @@ public class InvoicingPipeline implements Serializable { // This causes p.run() to stage the pipeline as a template on GCS, as opposed to running it. options.setTemplateLocation(invoiceTemplateUrl); options.setStagingLocation(beamStagingUrl); + // This credential is used when Dataflow deploys the template to GCS in target GCP project. + // So, make sure the credential has write permission to GCS in that project. + options.setGcpCredential(googleCredentials); + Pipeline p = Pipeline.create(options); PCollection billingEvents = diff --git a/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java b/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java index 83fdeb63d..83af99de3 100644 --- a/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java +++ b/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java @@ -17,9 +17,12 @@ package google.registry.beam.spec11; import static com.google.common.base.Preconditions.checkArgument; import static google.registry.beam.BeamUtils.getQueryFromFile; +import com.google.auth.oauth2.GoogleCredentials; import com.google.auto.value.AutoValue; import google.registry.beam.spec11.SafeBrowsingTransforms.EvaluateSafeBrowsingFn; +import google.registry.config.CredentialModule.LocalCredential; import google.registry.config.RegistryConfig.Config; +import google.registry.util.GoogleCredentialsBundle; import google.registry.util.Retrier; import google.registry.util.SqlTemplate; import java.io.Serializable; @@ -77,26 +80,29 @@ public class Spec11Pipeline implements Serializable { /** The JSON object field we put the threat match array for Spec11 reports. */ public static final String THREAT_MATCHES_FIELD = "threatMatches"; - @Inject - @Config("projectId") - String projectId; + private final String projectId; + private final String beamStagingUrl; + private final String spec11TemplateUrl; + private final String reportingBucketUrl; + private final GoogleCredentials googleCredentials; + private final Retrier retrier; @Inject - @Config("beamStagingUrl") - String beamStagingUrl; - - @Inject - @Config("spec11TemplateUrl") - String spec11TemplateUrl; - - @Inject - @Config("reportingBucketUrl") - String reportingBucketUrl; - - @Inject Retrier retrier; - - @Inject - Spec11Pipeline() {} + public Spec11Pipeline( + @Config("projectId") String projectId, + @Config("beamStagingUrl") String beamStagingUrl, + @Config("spec11TemplateUrl") String spec11TemplateUrl, + @Config("reportingBucketUrl") String reportingBucketUrl, + @LocalCredential GoogleCredentialsBundle googleCredentialsBundle, + Retrier retrier + ) { + this.projectId = projectId; + this.beamStagingUrl = beamStagingUrl; + this.spec11TemplateUrl = spec11TemplateUrl; + this.reportingBucketUrl = reportingBucketUrl; + this.googleCredentials = googleCredentialsBundle.getGoogleCredentials(); + this.retrier = retrier; + } /** Custom options for running the spec11 pipeline. */ interface Spec11PipelineOptions extends DataflowPipelineOptions { @@ -134,6 +140,9 @@ public class Spec11Pipeline implements Serializable { // This causes p.run() to stage the pipeline as a template on GCS, as opposed to running it. options.setTemplateLocation(spec11TemplateUrl); options.setStagingLocation(beamStagingUrl); + // This credential is used when Dataflow deploys the template to GCS in target GCP project. + // So, make sure the credential has write permission to GCS in that project. + options.setGcpCredential(googleCredentials); Pipeline p = Pipeline.create(options); PCollection domains = diff --git a/core/src/main/java/google/registry/tools/AuthModule.java b/core/src/main/java/google/registry/tools/AuthModule.java index 28228b695..6fafe8050 100644 --- a/core/src/main/java/google/registry/tools/AuthModule.java +++ b/core/src/main/java/google/registry/tools/AuthModule.java @@ -100,7 +100,7 @@ public class AuthModule { AbstractDataStoreFactory dataStoreFactory) { try { return new GoogleAuthorizationCodeFlow.Builder( - new NetHttpTransport(), jsonFactory, clientSecrets, requiredOauthScopes) + new NetHttpTransport(), jsonFactory, clientSecrets, requiredOauthScopes) .setDataStoreFactory(dataStoreFactory) .build(); } catch (IOException ex) { diff --git a/core/src/test/java/google/registry/beam/invoicing/InvoicingPipelineTest.java b/core/src/test/java/google/registry/beam/invoicing/InvoicingPipelineTest.java index b2994a3f2..dcedf4903 100644 --- a/core/src/test/java/google/registry/beam/invoicing/InvoicingPipelineTest.java +++ b/core/src/test/java/google/registry/beam/invoicing/InvoicingPipelineTest.java @@ -16,8 +16,10 @@ package google.registry.beam.invoicing; import static com.google.common.truth.Truth.assertThat; +import com.google.auth.oauth2.GoogleCredentials; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import google.registry.util.GoogleCredentialsBundle; import google.registry.util.ResourceUtils; import java.io.File; import java.io.IOException; @@ -58,15 +60,17 @@ public class InvoicingPipelineTest { @Before public void initializePipeline() throws IOException { - invoicingPipeline = new InvoicingPipeline(); - invoicingPipeline.projectId = "test-project"; File beamTempFolder = tempFolder.newFolder(); - invoicingPipeline.beamBucketUrl = beamTempFolder.getAbsolutePath(); - invoicingPipeline.invoiceFilePrefix = "REG-INV"; - invoicingPipeline.beamStagingUrl = beamTempFolder.getAbsolutePath() + "/staging"; - invoicingPipeline.invoiceTemplateUrl = - beamTempFolder.getAbsolutePath() + "/templates/invoicing"; - invoicingPipeline.billingBucketUrl = tempFolder.getRoot().getAbsolutePath(); + String beamTempFolderPath = beamTempFolder.getAbsolutePath(); + invoicingPipeline = new InvoicingPipeline( + "test-project", + beamTempFolderPath, + beamTempFolderPath + "/templates/invoicing", + beamTempFolderPath + "/staging", + tempFolder.getRoot().getAbsolutePath(), + "REG-INV", + GoogleCredentialsBundle.create(GoogleCredentials.create(null)) + ); } private ImmutableList getInputEvents() { diff --git a/core/src/test/java/google/registry/beam/spec11/Spec11PipelineTest.java b/core/src/test/java/google/registry/beam/spec11/Spec11PipelineTest.java index ca8ea6bde..6bc25fef0 100644 --- a/core/src/test/java/google/registry/beam/spec11/Spec11PipelineTest.java +++ b/core/src/test/java/google/registry/beam/spec11/Spec11PipelineTest.java @@ -21,11 +21,13 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.mockito.Mockito.withSettings; +import com.google.auth.oauth2.GoogleCredentials; import com.google.common.collect.ImmutableList; import com.google.common.io.CharStreams; import google.registry.beam.spec11.SafeBrowsingTransforms.EvaluateSafeBrowsingFn; import google.registry.testing.FakeClock; import google.registry.testing.FakeSleeper; +import google.registry.util.GoogleCredentialsBundle; import google.registry.util.ResourceUtils; import google.registry.util.Retrier; import java.io.ByteArrayInputStream; @@ -50,6 +52,7 @@ import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.BasicHttpEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.message.BasicStatusLine; +import org.joda.time.DateTime; import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; @@ -78,16 +81,21 @@ public class Spec11PipelineTest { @Rule public final transient TestPipeline p = TestPipeline.fromOptions(pipelineOptions); @Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); + private final Retrier retrier = new Retrier( + new FakeSleeper(new FakeClock(DateTime.parse("2019-07-15TZ"))), 1); private Spec11Pipeline spec11Pipeline; @Before public void initializePipeline() throws IOException { - spec11Pipeline = new Spec11Pipeline(); - spec11Pipeline.projectId = "test-project"; - spec11Pipeline.reportingBucketUrl = tempFolder.getRoot().getAbsolutePath(); File beamTempFolder = tempFolder.newFolder(); - spec11Pipeline.beamStagingUrl = beamTempFolder.getAbsolutePath() + "/staging"; - spec11Pipeline.spec11TemplateUrl = beamTempFolder.getAbsolutePath() + "/templates/invoicing"; + spec11Pipeline = new Spec11Pipeline( + "test-project", + beamTempFolder.getAbsolutePath() + "/staging", + beamTempFolder.getAbsolutePath() + "/templates/invoicing", + tempFolder.getRoot().getAbsolutePath(), + GoogleCredentialsBundle.create(GoogleCredentials.create(null)), + retrier + ); } private static final ImmutableList BAD_DOMAINS =