Fix permission issue in Beam pipeline deployment (#170)

This commit is contained in:
Shicong Huang 2019-07-15 16:13:42 -04:00 committed by GitHub
parent 650f1fdd52
commit 633dd887f4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 83 additions and 56 deletions

View file

@ -14,11 +14,14 @@
package google.registry.beam.invoicing; package google.registry.beam.invoicing;
import com.google.auth.oauth2.GoogleCredentials;
import google.registry.beam.invoicing.BillingEvent.InvoiceGroupingKey; import google.registry.beam.invoicing.BillingEvent.InvoiceGroupingKey;
import google.registry.beam.invoicing.BillingEvent.InvoiceGroupingKey.InvoiceGroupingKeyCoder; import google.registry.beam.invoicing.BillingEvent.InvoiceGroupingKey.InvoiceGroupingKeyCoder;
import google.registry.config.CredentialModule.LocalCredential;
import google.registry.config.RegistryConfig.Config; import google.registry.config.RegistryConfig.Config;
import google.registry.reporting.billing.BillingModule; import google.registry.reporting.billing.BillingModule;
import google.registry.reporting.billing.GenerateInvoicesAction; import google.registry.reporting.billing.GenerateInvoicesAction;
import google.registry.util.GoogleCredentialsBundle;
import java.io.Serializable; import java.io.Serializable;
import javax.inject.Inject; import javax.inject.Inject;
import org.apache.beam.runners.dataflow.DataflowRunner; import org.apache.beam.runners.dataflow.DataflowRunner;
@ -55,32 +58,31 @@ import org.apache.beam.sdk.values.TypeDescriptors;
*/ */
public class InvoicingPipeline implements Serializable { public class InvoicingPipeline implements Serializable {
@Inject private final String projectId;
@Config("projectId") private final String beamBucketUrl;
String projectId; private final String invoiceTemplateUrl;
private final String beamStagingUrl;
private final String billingBucketUrl;
private final String invoiceFilePrefix;
private final GoogleCredentials googleCredentials;
@Inject @Inject
@Config("apacheBeamBucketUrl") public InvoicingPipeline(
String beamBucketUrl; @Config("projectId") String projectId,
@Config("apacheBeamBucketUrl") String beamBucketUrl,
@Inject @Config("invoiceTemplateUrl") String invoiceTemplateUrl,
@Config("invoiceTemplateUrl") @Config("beamStagingUrl") String beamStagingUrl,
String invoiceTemplateUrl; @Config("billingBucketUrl") String billingBucketUrl,
@Config("invoiceFilePrefix") String invoiceFilePrefix,
@Inject @LocalCredential GoogleCredentialsBundle googleCredentialsBundle) {
@Config("beamStagingUrl") this.projectId = projectId;
String beamStagingUrl; this.beamBucketUrl = beamBucketUrl;
this.invoiceTemplateUrl = invoiceTemplateUrl;
@Inject this.beamStagingUrl = beamStagingUrl;
@Config("billingBucketUrl") this.billingBucketUrl = billingBucketUrl;
String billingBucketUrl; this.invoiceFilePrefix = invoiceFilePrefix;
this.googleCredentials = googleCredentialsBundle.getGoogleCredentials();
@Inject }
@Config("invoiceFilePrefix")
String invoiceFilePrefix;
@Inject
InvoicingPipeline() {}
/** Custom options for running the invoicing pipeline. */ /** Custom options for running the invoicing pipeline. */
interface InvoicingPipelineOptions extends DataflowPipelineOptions { interface InvoicingPipelineOptions extends DataflowPipelineOptions {
@ -105,6 +107,10 @@ public class InvoicingPipeline implements Serializable {
// This causes p.run() to stage the pipeline as a template on GCS, as opposed to running it. // This causes p.run() to stage the pipeline as a template on GCS, as opposed to running it.
options.setTemplateLocation(invoiceTemplateUrl); options.setTemplateLocation(invoiceTemplateUrl);
options.setStagingLocation(beamStagingUrl); options.setStagingLocation(beamStagingUrl);
// This credential is used when Dataflow deploys the template to GCS in target GCP project.
// So, make sure the credential has write permission to GCS in that project.
options.setGcpCredential(googleCredentials);
Pipeline p = Pipeline.create(options); Pipeline p = Pipeline.create(options);
PCollection<BillingEvent> billingEvents = PCollection<BillingEvent> billingEvents =

View file

@ -17,9 +17,12 @@ package google.registry.beam.spec11;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import static google.registry.beam.BeamUtils.getQueryFromFile; import static google.registry.beam.BeamUtils.getQueryFromFile;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.auto.value.AutoValue; import com.google.auto.value.AutoValue;
import google.registry.beam.spec11.SafeBrowsingTransforms.EvaluateSafeBrowsingFn; import google.registry.beam.spec11.SafeBrowsingTransforms.EvaluateSafeBrowsingFn;
import google.registry.config.CredentialModule.LocalCredential;
import google.registry.config.RegistryConfig.Config; import google.registry.config.RegistryConfig.Config;
import google.registry.util.GoogleCredentialsBundle;
import google.registry.util.Retrier; import google.registry.util.Retrier;
import google.registry.util.SqlTemplate; import google.registry.util.SqlTemplate;
import java.io.Serializable; import java.io.Serializable;
@ -77,26 +80,29 @@ public class Spec11Pipeline implements Serializable {
/** The JSON object field we put the threat match array for Spec11 reports. */ /** The JSON object field we put the threat match array for Spec11 reports. */
public static final String THREAT_MATCHES_FIELD = "threatMatches"; public static final String THREAT_MATCHES_FIELD = "threatMatches";
@Inject private final String projectId;
@Config("projectId") private final String beamStagingUrl;
String projectId; private final String spec11TemplateUrl;
private final String reportingBucketUrl;
private final GoogleCredentials googleCredentials;
private final Retrier retrier;
@Inject @Inject
@Config("beamStagingUrl") public Spec11Pipeline(
String beamStagingUrl; @Config("projectId") String projectId,
@Config("beamStagingUrl") String beamStagingUrl,
@Inject @Config("spec11TemplateUrl") String spec11TemplateUrl,
@Config("spec11TemplateUrl") @Config("reportingBucketUrl") String reportingBucketUrl,
String spec11TemplateUrl; @LocalCredential GoogleCredentialsBundle googleCredentialsBundle,
Retrier retrier
@Inject ) {
@Config("reportingBucketUrl") this.projectId = projectId;
String reportingBucketUrl; this.beamStagingUrl = beamStagingUrl;
this.spec11TemplateUrl = spec11TemplateUrl;
@Inject Retrier retrier; this.reportingBucketUrl = reportingBucketUrl;
this.googleCredentials = googleCredentialsBundle.getGoogleCredentials();
@Inject this.retrier = retrier;
Spec11Pipeline() {} }
/** Custom options for running the spec11 pipeline. */ /** Custom options for running the spec11 pipeline. */
interface Spec11PipelineOptions extends DataflowPipelineOptions { interface Spec11PipelineOptions extends DataflowPipelineOptions {
@ -134,6 +140,9 @@ public class Spec11Pipeline implements Serializable {
// This causes p.run() to stage the pipeline as a template on GCS, as opposed to running it. // This causes p.run() to stage the pipeline as a template on GCS, as opposed to running it.
options.setTemplateLocation(spec11TemplateUrl); options.setTemplateLocation(spec11TemplateUrl);
options.setStagingLocation(beamStagingUrl); options.setStagingLocation(beamStagingUrl);
// This credential is used when Dataflow deploys the template to GCS in target GCP project.
// So, make sure the credential has write permission to GCS in that project.
options.setGcpCredential(googleCredentials);
Pipeline p = Pipeline.create(options); Pipeline p = Pipeline.create(options);
PCollection<Subdomain> domains = PCollection<Subdomain> domains =

View file

@ -100,7 +100,7 @@ public class AuthModule {
AbstractDataStoreFactory dataStoreFactory) { AbstractDataStoreFactory dataStoreFactory) {
try { try {
return new GoogleAuthorizationCodeFlow.Builder( return new GoogleAuthorizationCodeFlow.Builder(
new NetHttpTransport(), jsonFactory, clientSecrets, requiredOauthScopes) new NetHttpTransport(), jsonFactory, clientSecrets, requiredOauthScopes)
.setDataStoreFactory(dataStoreFactory) .setDataStoreFactory(dataStoreFactory)
.build(); .build();
} catch (IOException ex) { } catch (IOException ex) {

View file

@ -16,8 +16,10 @@ package google.registry.beam.invoicing;
import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.Truth.assertThat;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import google.registry.util.GoogleCredentialsBundle;
import google.registry.util.ResourceUtils; import google.registry.util.ResourceUtils;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -58,15 +60,17 @@ public class InvoicingPipelineTest {
@Before @Before
public void initializePipeline() throws IOException { public void initializePipeline() throws IOException {
invoicingPipeline = new InvoicingPipeline();
invoicingPipeline.projectId = "test-project";
File beamTempFolder = tempFolder.newFolder(); File beamTempFolder = tempFolder.newFolder();
invoicingPipeline.beamBucketUrl = beamTempFolder.getAbsolutePath(); String beamTempFolderPath = beamTempFolder.getAbsolutePath();
invoicingPipeline.invoiceFilePrefix = "REG-INV"; invoicingPipeline = new InvoicingPipeline(
invoicingPipeline.beamStagingUrl = beamTempFolder.getAbsolutePath() + "/staging"; "test-project",
invoicingPipeline.invoiceTemplateUrl = beamTempFolderPath,
beamTempFolder.getAbsolutePath() + "/templates/invoicing"; beamTempFolderPath + "/templates/invoicing",
invoicingPipeline.billingBucketUrl = tempFolder.getRoot().getAbsolutePath(); beamTempFolderPath + "/staging",
tempFolder.getRoot().getAbsolutePath(),
"REG-INV",
GoogleCredentialsBundle.create(GoogleCredentials.create(null))
);
} }
private ImmutableList<BillingEvent> getInputEvents() { private ImmutableList<BillingEvent> getInputEvents() {

View file

@ -21,11 +21,13 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.mockito.Mockito.withSettings; import static org.mockito.Mockito.withSettings;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.io.CharStreams; import com.google.common.io.CharStreams;
import google.registry.beam.spec11.SafeBrowsingTransforms.EvaluateSafeBrowsingFn; import google.registry.beam.spec11.SafeBrowsingTransforms.EvaluateSafeBrowsingFn;
import google.registry.testing.FakeClock; import google.registry.testing.FakeClock;
import google.registry.testing.FakeSleeper; import google.registry.testing.FakeSleeper;
import google.registry.util.GoogleCredentialsBundle;
import google.registry.util.ResourceUtils; import google.registry.util.ResourceUtils;
import google.registry.util.Retrier; import google.registry.util.Retrier;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
@ -50,6 +52,7 @@ import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.BasicHttpEntity; import org.apache.http.entity.BasicHttpEntity;
import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.message.BasicStatusLine; import org.apache.http.message.BasicStatusLine;
import org.joda.time.DateTime;
import org.json.JSONArray; import org.json.JSONArray;
import org.json.JSONException; import org.json.JSONException;
import org.json.JSONObject; import org.json.JSONObject;
@ -78,16 +81,21 @@ public class Spec11PipelineTest {
@Rule public final transient TestPipeline p = TestPipeline.fromOptions(pipelineOptions); @Rule public final transient TestPipeline p = TestPipeline.fromOptions(pipelineOptions);
@Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); @Rule public final TemporaryFolder tempFolder = new TemporaryFolder();
private final Retrier retrier = new Retrier(
new FakeSleeper(new FakeClock(DateTime.parse("2019-07-15TZ"))), 1);
private Spec11Pipeline spec11Pipeline; private Spec11Pipeline spec11Pipeline;
@Before @Before
public void initializePipeline() throws IOException { public void initializePipeline() throws IOException {
spec11Pipeline = new Spec11Pipeline();
spec11Pipeline.projectId = "test-project";
spec11Pipeline.reportingBucketUrl = tempFolder.getRoot().getAbsolutePath();
File beamTempFolder = tempFolder.newFolder(); File beamTempFolder = tempFolder.newFolder();
spec11Pipeline.beamStagingUrl = beamTempFolder.getAbsolutePath() + "/staging"; spec11Pipeline = new Spec11Pipeline(
spec11Pipeline.spec11TemplateUrl = beamTempFolder.getAbsolutePath() + "/templates/invoicing"; "test-project",
beamTempFolder.getAbsolutePath() + "/staging",
beamTempFolder.getAbsolutePath() + "/templates/invoicing",
tempFolder.getRoot().getAbsolutePath(),
GoogleCredentialsBundle.create(GoogleCredentials.create(null)),
retrier
);
} }
private static final ImmutableList<String> BAD_DOMAINS = private static final ImmutableList<String> BAD_DOMAINS =