Add a CloudBuild task to deploy Beam pipeline

This commit is contained in:
Shicong Huang 2019-06-13 13:59:14 -04:00
parent 8ae67ed731
commit ece6ae1d78
3 changed files with 84 additions and 13 deletions

View file

@ -14,17 +14,13 @@
package google.registry.beam.invoicing;
import static java.nio.charset.StandardCharsets.UTF_8;
import com.google.auth.oauth2.GoogleCredentials;
import google.registry.beam.invoicing.BillingEvent.InvoiceGroupingKey;
import google.registry.beam.invoicing.BillingEvent.InvoiceGroupingKey.InvoiceGroupingKeyCoder;
import google.registry.config.CredentialModule.LocalCredentialJson;
import google.registry.config.RegistryConfig.Config;
import google.registry.reporting.billing.BillingModule;
import google.registry.reporting.billing.GenerateInvoicesAction;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import google.registry.tools.AuthModule.LocalOAuth2Credentials;
import java.io.Serializable;
import javax.inject.Inject;
import org.apache.beam.runners.dataflow.DataflowRunner;
@ -85,7 +81,8 @@ public class InvoicingPipeline implements Serializable {
@Config("invoiceFilePrefix")
String invoiceFilePrefix;
@Inject @LocalCredentialJson String credentialJson;
@Inject @LocalOAuth2Credentials
GoogleCredentials credentials;
@Inject
InvoicingPipeline() {}
@ -108,13 +105,7 @@ public class InvoicingPipeline implements Serializable {
public void deploy() {
// We can't store options as a member variable due to serialization concerns.
InvoicingPipelineOptions options = PipelineOptionsFactory.as(InvoicingPipelineOptions.class);
try {
options.setGcpCredential(
GoogleCredentials.fromStream(new ByteArrayInputStream(credentialJson.getBytes(UTF_8))));
} catch (IOException e) {
throw new RuntimeException(
"Cannot obtain local credential to deploy the invoicing pipeline", e);
}
options.setGcpCredential(credentials);
options.setProject(projectId);
options.setRunner(DataflowRunner.class);
// This causes p.run() to stage the pipeline as a template on GCS, as opposed to running it.

View file

@ -27,6 +27,7 @@ import com.google.api.client.http.javanet.NetHttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.util.store.AbstractDataStoreFactory;
import com.google.api.client.util.store.FileDataStoreFactory;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@ -43,6 +44,7 @@ import google.registry.config.RegistryConfig.Config;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.annotation.Documented;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
@ -101,6 +103,23 @@ public class AuthModule {
}
}
@Provides
@LocalOAuth2Credentials
public static GoogleCredentials provideLocalOAuth2Credentials(
@LocalCredentialJson String credentialJson,
@Config("localCredentialOauthScopes") ImmutableList<String> scopes) {
try {
GoogleCredentials credentials =
GoogleCredentials.fromStream(new ByteArrayInputStream(credentialJson.getBytes(UTF_8)));
if (credentials.createScopedRequired()) {
credentials = credentials.createScoped(scopes);
}
return credentials;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
@Provides
public static GoogleAuthorizationCodeFlow provideAuthorizationCodeFlow(
JsonFactory jsonFactory,
@ -215,4 +234,10 @@ public class AuthModule {
@Documented
@Retention(RetentionPolicy.RUNTIME)
@interface OAuthClientId {}
/** Dagger qualifier for the local OAuth2 Credentials. */
@Qualifier
@Documented
@Retention(RetentionPolicy.RUNTIME)
public @interface LocalOAuth2Credentials {}
}