Generate detail reports from Bigquery via Beam

This establishes a fully functional pipeline which generates detail reports for each registrar_tld pair from Bigquery. The main features:

1. Deserialization from AVRO GenericRecord (from Bigquery) into BillingEvent, a POJO we control. This is especially valuable to enable intrinsic type-safety at the start of the  pipeline.
2. Addition of .sql files containing the queries used to generate detail reports. These will later be templated to enable general usage.
3. Multi-file-writing within a single TextIO transform, which writes BillingEvents to different files based on their registrar_tld key combo.

This also upgrades the Beam core SDK referenced in repositories.bzl to 2.2.0 and returns the definitions to alphabetical order, to facilitate use of the check_bazel_deps.py script.

The final steps are:
- Converting this to a Nomulus command
- Templating the .sql queries
- @Injecting the @Config values for a given project

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=178124838
This commit is contained in:
larryruili 2017-12-06 11:19:18 -08:00 committed by jianglai
parent d736f7f08d
commit 735112def6
3 changed files with 395 additions and 399 deletions

View file

@ -31,18 +31,14 @@ import google.registry.util.FormattingLogger;
import java.io.IOException;
import javax.inject.Inject;
/**
* Generates invoices for the month and stores them on GCS.
* Invokes the {@code InvoicingPipeline} beam template via the REST api.
*
* <p>Currently this is just a simplified runner that verifies we can deploy dataflow jobs from App
* Engine.
* <p>This action runs the {@link google.registry.beam.InvoicingPipeline} beam template, staged at
* gs://<projectId>-beam/templates/invoicing. The pipeline then generates invoices for the month and
* stores them on GCS.
*/
@Action(
path = GenerateInvoicesAction.PATH,
method = POST,
auth = Auth.AUTH_INTERNAL_ONLY
)
@Action(path = GenerateInvoicesAction.PATH, method = POST, auth = Auth.AUTH_INTERNAL_ONLY)
public class GenerateInvoicesAction implements Runnable {
private static final FormattingLogger logger = FormattingLogger.getLoggerForCallerClass();
@ -61,7 +57,7 @@ public class GenerateInvoicesAction implements Runnable {
try {
LaunchTemplateParameters params =
new LaunchTemplateParameters()
.setJobName("test-bigquerytemplate1")
.setJobName("test-invoicing")
.setEnvironment(
new RuntimeEnvironment()
.setZone("us-east1-c")
@ -71,7 +67,7 @@ public class GenerateInvoicesAction implements Runnable {
.projects()
.templates()
.launch(projectId, params)
.setGcsPath(beamBucketUrl + "/templates/bigquery1")
.setGcsPath(beamBucketUrl + "/templates/invoicing")
.execute();
logger.infofmt("Got response: %s", launchResponse.getJob().toPrettyString());
} catch (IOException e) {