diff --git a/java/google/registry/beam/BUILD b/java/google/registry/beam/BUILD
new file mode 100644
index 000000000..757553669
--- /dev/null
+++ b/java/google/registry/beam/BUILD
@@ -0,0 +1,24 @@
+package(
+ default_visibility = ["//visibility:public"],
+)
+
+licenses(["notice"]) # Apache 2.0
+
+java_library(
+ name = "beam",
+ srcs = glob(["*.java"]),
+ resources = glob(["sql/*"]),
+ deps = [
+ "//java/google/registry/config",
+ "//java/google/registry/util",
+ "@com_google_apis_google_api_services_bigquery",
+ "@com_google_auto_value",
+ "@com_google_dagger",
+ "@com_google_guava",
+ "@org_apache_avro",
+ "@org_apache_beam_runners_direct_java",
+ "@org_apache_beam_runners_google_cloud_dataflow_java",
+ "@org_apache_beam_sdks_java_core",
+ "@org_apache_beam_sdks_java_io_google_cloud_platform",
+ ],
+)
diff --git a/java/google/registry/beam/BigqueryTemplatePipeline.java b/java/google/registry/beam/BigqueryTemplatePipeline.java
new file mode 100644
index 000000000..29d0d5134
--- /dev/null
+++ b/java/google/registry/beam/BigqueryTemplatePipeline.java
@@ -0,0 +1,97 @@
+// Copyright 2017 The Nomulus Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package google.registry.beam;
+
+import com.google.api.services.bigquery.model.TableRow;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptors;
+
+/**
+ * Main class to stage a templated Dataflow pipeline which reads from Bigquery on GCS.
+ *
+ *
To stage this pipeline on GCS, run it with the following command line flags:
+ *
+ * - --runner=DataflowRunner
+ *
- --project=[YOUR PROJECT ID]
+ *
- --stagingLocation=gs://[WHERE PIPELINE JAR FILES SHOULD BE STAGED]
+ *
- --templateLocation=gs://[WHERE TEMPLATE.txt FILE SHOULD BE STAGED]
+ *
+ *
+ * Then, you can run the staged template via the API client library, gCloud or a raw REST call.
+ *
+ * @see Dataflow Templates
+ */
+public class BigqueryTemplatePipeline {
+
+ /** Custom command-line pipeline options for {@code BigqueryTemplatePipeline}. */
+ public interface BigqueryTemplateOptions extends PipelineOptions {
+ @Description("Bigquery query used to get the initial data for the pipeline.")
+ @Default.String("SELECT * FROM `[YOUR_PROJECT].[DATASET].[TABLE]`")
+ String getBigqueryQuery();
+ void setBigqueryQuery(String value);
+
+ @Description("The GCS bucket we output the result text file to.")
+ @Default.String("[YOUR BUCKET HERE]")
+ String getOutputBucket();
+ void setOutputBucket(String value);
+ }
+
+ public static void main(String[] args) {
+ // Parse standard arguments, as well as custom options
+ BigqueryTemplateOptions options =
+ PipelineOptionsFactory.fromArgs(args).withValidation().as(BigqueryTemplateOptions.class);
+
+ // Create pipeline
+ Pipeline p = Pipeline.create(options);
+ p.apply(BigQueryIO.readTableRows().fromQuery(options.getBigqueryQuery()).usingStandardSql())
+ .apply("Count request paths", new CountRequestPaths())
+ .apply(TextIO.write().to(options.getOutputBucket()).withoutSharding().withHeader("HEADER"));
+ p.run();
+ }
+
+ /** A composite {@code PTransform} that counts the number of times each request path appears. */
+ static class CountRequestPaths extends PTransform, PCollection> {
+ @Override
+ public PCollection expand(PCollection input) {
+ return input
+ .apply("Extract paths", ParDo.of(new ExtractRequestPathFn()))
+ .apply("Count paths", Count.perElement())
+ .apply(
+ "Format results",
+ MapElements.into(TypeDescriptors.strings())
+ .via(kv -> kv.getKey() + ": " + kv.getValue()));
+ }
+ }
+
+ /** A {@code DoFn} that extracts the request path from a Bigquery {@code TableRow}. */
+ static class ExtractRequestPathFn extends DoFn {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ c.output(String.valueOf(c.element().get("requestPath")));
+ }
+ }
+}
diff --git a/java/google/registry/beam/BillingEvent.java b/java/google/registry/beam/BillingEvent.java
new file mode 100644
index 000000000..30879b2fa
--- /dev/null
+++ b/java/google/registry/beam/BillingEvent.java
@@ -0,0 +1,334 @@
+// Copyright 2017 The Nomulus Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package google.registry.beam;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import google.registry.util.FormattingLogger;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.stream.Collectors;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
+
+/**
+ * A POJO representing a single billable event, parsed from a {@code SchemaAndRecord}.
+ *
+ * This is a trivially serializable class that allows Beam to transform the results of a
+ * Bigquery query into a standard Java representation, giving us the type guarantees and ease of
+ * manipulation Bigquery lacks, while localizing any Bigquery-side failures to the
+ * {@link #parseFromRecord} function.
+ */
+@AutoValue
+public abstract class BillingEvent implements Serializable {
+
+ private static final FormattingLogger logger = FormattingLogger.getLoggerForCallerClass();
+
+ private static final DateTimeFormatter DATE_TIME_FORMATTER =
+ DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss zzz");
+
+ private static final ImmutableList FIELD_NAMES =
+ ImmutableList.of(
+ "id",
+ "billingTime",
+ "eventTime",
+ "registrarId",
+ "billingId",
+ "tld",
+ "action",
+ "domain",
+ "repositoryId",
+ "years",
+ "currency",
+ "amount",
+ "flags");
+
+ /** Returns the unique Objectify ID for the {@code OneTime} associated with this event. */
+ abstract long id();
+ /** Returns the UTC DateTime this event becomes billable. */
+ abstract ZonedDateTime billingTime();
+ /** Returns the UTC DateTime this event was generated. */
+ abstract ZonedDateTime eventTime();
+ /** Returns the billed registrar's name. */
+ abstract String registrarId();
+ /** Returns the billed registrar's billing account key. */
+ abstract String billingId();
+ /** Returns the tld this event was generated for. */
+ abstract String tld();
+ /** Returns the billable action this event was generated for (i.e. RENEW, CREATE, TRANSFER...) */
+ abstract String action();
+ /** Returns the fully qualified domain name this event was generated for. */
+ abstract String domain();
+ /** Returns the unique RepoID associated with the billed domain. */
+ abstract String repositoryId();
+ /** Returns the number of years this billing event is made out for. */
+ abstract int years();
+ /** Returns the 3-letter currency code for the billing event (i.e. USD or JPY.) */
+ abstract String currency();
+ /** Returns the cost associated with this billing event. */
+ abstract double amount();
+ /** Returns a list of space-delimited flags associated with the event. */
+ abstract String flags();
+
+ /**
+ * Constructs a {@code BillingEvent} from a {@code SchemaAndRecord}.
+ *
+ * @see
+ * Apache AVRO GenericRecord
+ */
+ static BillingEvent parseFromRecord(SchemaAndRecord schemaAndRecord) {
+ checkFieldsNotNull(schemaAndRecord);
+ GenericRecord record = schemaAndRecord.getRecord();
+ return create(
+ // We need to chain parsers off extractField because GenericRecord only returns
+ // Objects, which contain a string representation of their underlying types.
+ Long.parseLong(extractField(record, "id")),
+ // Bigquery provides UNIX timestamps with microsecond precision.
+ Instant.ofEpochMilli(Long.parseLong(extractField(record, "billingTime")) / 1000)
+ .atZone(ZoneId.of("UTC")),
+ Instant.ofEpochMilli(Long.parseLong(extractField(record, "eventTime")) / 1000)
+ .atZone(ZoneId.of("UTC")),
+ extractField(record, "registrarId"),
+ extractField(record, "billingId"),
+ extractField(record, "tld"),
+ extractField(record, "action"),
+ extractField(record, "domain"),
+ extractField(record, "repositoryId"),
+ Integer.parseInt(extractField(record, "years")),
+ extractField(record, "currency"),
+ Double.parseDouble(extractField(record, "amount")),
+ extractField(record, "flags"));
+ }
+
+ /**
+ * Creates a concrete {@code BillingEvent}.
+ *
+ * This should only be used outside this class for testing- instances of {@code BillingEvent}
+ * should otherwise come from {@link #parseFromRecord}.
+ */
+ @VisibleForTesting
+ static BillingEvent create(
+ long id,
+ ZonedDateTime billingTime,
+ ZonedDateTime eventTime,
+ String registrarId,
+ String billingId,
+ String tld,
+ String action,
+ String domain,
+ String repositoryId,
+ int years,
+ String currency,
+ double amount,
+ String flags) {
+ return new AutoValue_BillingEvent(
+ id,
+ billingTime,
+ eventTime,
+ registrarId,
+ billingId,
+ tld,
+ action,
+ domain,
+ repositoryId,
+ years,
+ currency,
+ amount,
+ flags);
+ }
+
+ static String getHeader() {
+ return FIELD_NAMES.stream().collect(Collectors.joining(","));
+ }
+
+ /**
+ * Generates the filename associated with this {@code BillingEvent}.
+ *
+ *
When modifying this function, take care to ensure that there's no way to generate an illegal
+ * filepath with the arguments, such as "../sensitive_info".
+ */
+ String toFilename() {
+ return String.format("%s_%s", registrarId(), tld());
+ }
+
+ /** Generates a CSV representation of this {@code BillingEvent}. */
+ String toCsv() {
+ return Joiner.on(",")
+ .join(
+ ImmutableList.of(
+ id(),
+ DATE_TIME_FORMATTER.format(billingTime()),
+ DATE_TIME_FORMATTER.format(eventTime()),
+ registrarId(),
+ billingId(),
+ tld(),
+ action(),
+ domain(),
+ repositoryId(),
+ years(),
+ currency(),
+ String.format("%.2f", amount()),
+ // Strip out the 'synthetic' flag, which is internal only.
+ flags().replace("SYNTHETIC", "").trim()));
+ }
+
+ /** Returns the grouping key for this {@code BillingEvent}, to generate the overall invoice. */
+ InvoiceGroupingKey getInvoiceGroupingKey() {
+ return new AutoValue_BillingEvent_InvoiceGroupingKey(
+ billingTime().toLocalDate().withDayOfMonth(1).toString(),
+ billingTime().toLocalDate().withDayOfMonth(1).plusYears(years()).minusDays(1).toString(),
+ billingId(),
+ String.format("%s - %s", registrarId(), tld()),
+ String.format("%s | TLD: %s | TERM: %d-year", action(), tld(), years()),
+ amount(),
+ currency(),
+ "");
+ }
+
+ /** Key for each {@code BillingEvent}, when aggregating for the overall invoice. */
+ @AutoValue
+ abstract static class InvoiceGroupingKey implements Serializable {
+
+ private static final ImmutableList INVOICE_HEADERS =
+ ImmutableList.of(
+ "StartDate",
+ "EndDate",
+ "ProductAccountKey",
+ "Amount",
+ "AmountCurrency",
+ "BillingProductCode",
+ "SalesChannel",
+ "LineItemType",
+ "UsageGroupingKey",
+ "Quantity",
+ "Description",
+ "UnitPrice",
+ "UnitPriceCurrency",
+ "PONumber");
+
+ /** Returns the first day this invoice is valid, in yyyy-MM-dd format. */
+ abstract String startDate();
+ /** Returns the last day this invoice is valid, in yyyy-MM-dd format. */
+ abstract String endDate();
+ /** Returns the billing account id, which is the {@code BillingEvent.billingId}. */
+ abstract String productAccountKey();
+ /** Returns the invoice grouping key, which is in the format "registrarId - tld". */
+ abstract String usageGroupingKey();
+ /** Returns a description of the item, formatted as "action | TLD: tld | TERM: n-year." */
+ abstract String description();
+ /** Returns the cost per invoice item. */
+ abstract Double unitPrice();
+ /** Returns the 3-digit currency code the unit price uses. */
+ abstract String unitPriceCurrency();
+ /** Returns the purchase order number for the item, blank for most registrars. */
+ abstract String poNumber();
+
+ /** Generates the CSV header for the overall invoice. */
+ static String invoiceHeader() {
+ return Joiner.on(",").join(INVOICE_HEADERS);
+ }
+
+ /** Generates a CSV representation of n aggregate billing events. */
+ String toCsv(Long quantity) {
+ double totalPrice = unitPrice() * quantity;
+ return Joiner.on(",")
+ .join(
+ ImmutableList.of(
+ startDate(),
+ endDate(),
+ productAccountKey(),
+ String.format("%.2f", totalPrice),
+ unitPriceCurrency(),
+ "10125",
+ "1",
+ "PURCHASE",
+ usageGroupingKey(),
+ String.format("%d", quantity),
+ description(),
+ String.format("%.2f", unitPrice()),
+ unitPriceCurrency(),
+ poNumber()));
+ }
+
+ /** Coder that provides deterministic (de)serialization for {@code InvoiceGroupingKey}. */
+ static class InvoiceGroupingKeyCoder extends AtomicCoder {
+
+ @Override
+ public void encode(InvoiceGroupingKey value, OutputStream outStream) throws IOException {
+ Coder stringCoder = StringUtf8Coder.of();
+ stringCoder.encode(value.startDate(), outStream);
+ stringCoder.encode(value.endDate(), outStream);
+ stringCoder.encode(value.productAccountKey(), outStream);
+ stringCoder.encode(value.usageGroupingKey(), outStream);
+ stringCoder.encode(value.description(), outStream);
+ stringCoder.encode(String.valueOf(value.unitPrice()), outStream);
+ stringCoder.encode(value.unitPriceCurrency(), outStream);
+ stringCoder.encode(value.poNumber(), outStream);
+ }
+
+ @Override
+ public InvoiceGroupingKey decode(InputStream inStream) throws IOException {
+ Coder stringCoder = StringUtf8Coder.of();
+ return new AutoValue_BillingEvent_InvoiceGroupingKey(
+ stringCoder.decode(inStream),
+ stringCoder.decode(inStream),
+ stringCoder.decode(inStream),
+ stringCoder.decode(inStream),
+ stringCoder.decode(inStream),
+ Double.parseDouble(stringCoder.decode(inStream)),
+ stringCoder.decode(inStream),
+ stringCoder.decode(inStream));
+ }
+ }
+ }
+
+ /** Extracts a string representation of a field in a {@code GenericRecord}. */
+ private static String extractField(GenericRecord record, String fieldName) {
+ return String.valueOf(record.get(fieldName));
+ }
+
+ /**
+ * Checks that no expected fields in the record are missing.
+ *
+ * Note that this simply makes sure the field is not null; it may still generate a parse error
+ * in {@code parseFromRecord}.
+ */
+ private static void checkFieldsNotNull(SchemaAndRecord schemaAndRecord) {
+ GenericRecord record = schemaAndRecord.getRecord();
+ ImmutableList nullFields =
+ FIELD_NAMES
+ .stream()
+ .filter(fieldName -> record.get(fieldName) == null)
+ .collect(ImmutableList.toImmutableList());
+ if (!nullFields.isEmpty()) {
+ logger.severefmt(
+ "Found unexpected null value(s) in field(s) %s for record %s",
+ Joiner.on(", ").join(nullFields), record.toString());
+ throw new IllegalStateException("Read null value from Bigquery query");
+ }
+ }
+}
diff --git a/java/google/registry/beam/InvoicingPipeline.java b/java/google/registry/beam/InvoicingPipeline.java
new file mode 100644
index 000000000..cc1804399
--- /dev/null
+++ b/java/google/registry/beam/InvoicingPipeline.java
@@ -0,0 +1,128 @@
+// Copyright 2017 The Nomulus Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package google.registry.beam;
+
+import google.registry.beam.BillingEvent.InvoiceGroupingKey;
+import google.registry.beam.BillingEvent.InvoiceGroupingKey.InvoiceGroupingKeyCoder;
+import google.registry.config.RegistryConfig.Config;
+import javax.inject.Inject;
+import org.apache.beam.runners.dataflow.DataflowRunner;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.io.FileBasedSink;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
+
+/**
+ * Definition of a Dataflow pipeline template, which generates a given month's invoices.
+ *
+ * To stage this template on GCS, run the {@link
+ * google.registry.tools.DeployInvoicingPipelineCommand} Nomulus command.
+ *
+ *
Then, you can run the staged template via the API client library, gCloud or a raw REST call.
+ * For an example using the API client library, see {@link
+ * google.registry.billing.GenerateInvoicesAction}.
+ *
+ * @see Dataflow Templates
+ */
+public class InvoicingPipeline {
+
+ @Inject @Config("projectId") String projectId;
+ @Inject @Config("apacheBeamBucketUrl") String beamBucket;
+ @Inject InvoicingPipeline() {}
+
+ /** Custom options for running the invoicing pipeline. */
+ interface InvoicingPipelineOptions extends DataflowPipelineOptions {
+ /** Returns the yearMonth we're generating invoices for, in yyyy-MM format. */
+ @Description("The yearMonth we generate invoices for, in yyyy-MM format.")
+ ValueProvider getYearMonth();
+ /** Sets the yearMonth we generate invoices for. */
+ void setYearMonth(ValueProvider value);
+ }
+
+ /** Deploys the invoicing pipeline as a template on GCS, for a given projectID and GCS bucket. */
+ public void deploy() {
+ InvoicingPipelineOptions options = PipelineOptionsFactory.as(InvoicingPipelineOptions.class);
+ options.setProject(projectId);
+ options.setRunner(DataflowRunner.class);
+ options.setStagingLocation(beamBucket + "/staging");
+ options.setTemplateLocation(beamBucket + "/templates/invoicing");
+
+ Pipeline p = Pipeline.create(options);
+
+ PCollection billingEvents =
+ p.apply(
+ "Read BillingEvents from Bigquery",
+ BigQueryIO.read(BillingEvent::parseFromRecord)
+ .fromQuery(InvoicingUtils.makeQueryProvider(options.getYearMonth(), projectId))
+ .withCoder(SerializableCoder.of(BillingEvent.class))
+ .usingStandardSql()
+ .withoutValidation());
+
+ billingEvents.apply(
+ "Write events to separate CSVs keyed by registrarId_tld pair",
+ TextIO.writeCustomType()
+ .to(
+ InvoicingUtils.makeDestinationFunction(beamBucket + "/results"),
+ InvoicingUtils.makeEmptyDestinationParams(beamBucket + "/results"))
+ .withFormatFunction(BillingEvent::toCsv)
+ .withoutSharding()
+ .withTempDirectory(
+ FileBasedSink.convertToFileResourceIfPossible(beamBucket + "/temporary"))
+ .withHeader(BillingEvent.getHeader()));
+
+ billingEvents
+ .apply("Generate overall invoice rows", new GenerateInvoiceRows())
+ .apply(
+ "Write overall invoice to CSV",
+ TextIO.write()
+ .to(beamBucket + "/results/overall_invoice")
+ .withHeader(InvoiceGroupingKey.invoiceHeader())
+ .withoutSharding()
+ .withSuffix(".csv"));
+
+ p.run();
+ }
+
+ /** Transform that converts a {@code BillingEvent} into an invoice CSV row. */
+ static class GenerateInvoiceRows
+ extends PTransform, PCollection> {
+ @Override
+ public PCollection expand(PCollection input) {
+ return input
+ .apply(
+ "Map to invoicing key",
+ MapElements.into(TypeDescriptor.of(InvoiceGroupingKey.class))
+ .via(BillingEvent::getInvoiceGroupingKey))
+ .setCoder(new InvoiceGroupingKeyCoder())
+ .apply("Count occurrences", Count.perElement())
+ .apply(
+ "Format as CSVs",
+ MapElements.into(TypeDescriptors.strings())
+ .via((KV kv) -> kv.getKey().toCsv(kv.getValue())));
+ }
+ }
+}
diff --git a/java/google/registry/beam/InvoicingUtils.java b/java/google/registry/beam/InvoicingUtils.java
new file mode 100644
index 000000000..2293d109e
--- /dev/null
+++ b/java/google/registry/beam/InvoicingUtils.java
@@ -0,0 +1,103 @@
+// Copyright 2017 The Nomulus Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package google.registry.beam;
+
+import com.google.common.io.Resources;
+import google.registry.util.ResourceUtils;
+import google.registry.util.SqlTemplate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.YearMonth;
+import java.time.format.DateTimeFormatter;
+import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
+import org.apache.beam.sdk.io.FileBasedSink;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+
+/** Pipeline helper functions used to generate invoices from instances of {@link BillingEvent}. */
+public class InvoicingUtils {
+
+ private InvoicingUtils() {}
+
+ private static final DateTimeFormatter TIMESTAMP_FORMATTER =
+ DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
+
+ /**
+ * Returns a function mapping from {@code BillingEvent} to filename {@code Params}.
+ *
+ * Beam uses this to determine which file a given {@code BillingEvent} should get placed into.
+ */
+ static SerializableFunction makeDestinationFunction(String outputBucket) {
+ return billingEvent ->
+ new Params()
+ .withShardTemplate("")
+ .withSuffix(".csv")
+ .withBaseFilename(
+ FileBasedSink.convertToFileResourceIfPossible(
+ String.format("%s/%s", outputBucket, billingEvent.toFilename())));
+ }
+
+ /**
+ * Returns the default filename parameters for an unmappable {@code BillingEvent}.
+ *
+ * The "failed" file should only be populated when an error occurs, which warrants further
+ * investigation.
+ */
+ static Params makeEmptyDestinationParams(String outputBucket) {
+ return new Params()
+ .withBaseFilename(
+ FileBasedSink.convertToFileResourceIfPossible(
+ String.format("%s/%s", outputBucket, "failed")));
+ }
+
+ /**
+ * Returns a provider that creates a Bigquery query for a given project and yearMonth at runtime.
+ *
+ *
We only know yearMonth at runtime, so this provider fills in the {@code
+ * sql/billing_events.sql} template at runtime.
+ *
+ * @param yearMonthProvider a runtime provider that returns which month we're invoicing for.
+ * @param projectId the projectId we're generating invoicing for.
+ */
+ static ValueProvider makeQueryProvider(
+ ValueProvider yearMonthProvider, String projectId) {
+ return NestedValueProvider.of(
+ yearMonthProvider,
+ (yearMonth) -> {
+ // Get the timestamp endpoints capturing the entire month with microsecond precision
+ YearMonth reportingMonth = YearMonth.parse(yearMonth);
+ LocalDateTime firstMoment = reportingMonth.atDay(1).atTime(LocalTime.MIDNIGHT);
+ LocalDateTime lastMoment = reportingMonth.atEndOfMonth().atTime(LocalTime.MAX);
+ // Construct the month's query by filling in the billing_events.sql template
+ return SqlTemplate.create(getQueryFromFile("billing_events.sql"))
+ .put("FIRST_TIMESTAMP_OF_MONTH", firstMoment.format(TIMESTAMP_FORMATTER))
+ .put("LAST_TIMESTAMP_OF_MONTH", lastMoment.format(TIMESTAMP_FORMATTER))
+ .put("PROJECT_ID", projectId)
+ .put("DATASTORE_EXPORT_DATA_SET", "latest_datastore_export")
+ .put("ONETIME_TABLE", "OneTime")
+ .put("REGISTRY_TABLE", "Registry")
+ .put("REGISTRAR_TABLE", "Registrar")
+ .put("CANCELLATION_TABLE", "Cancellation")
+ .build();
+ });
+ }
+
+ /** Returns the {@link String} contents for a file in the {@code beam/sql/} directory. */
+ private static String getQueryFromFile(String filename) {
+ return ResourceUtils.readResourceUtf8(
+ Resources.getResource(InvoicingUtils.class, "sql/" + filename));
+ }
+}
diff --git a/java/google/registry/beam/sql/billing_events.sql b/java/google/registry/beam/sql/billing_events.sql
new file mode 100644
index 000000000..a0c403830
--- /dev/null
+++ b/java/google/registry/beam/sql/billing_events.sql
@@ -0,0 +1,99 @@
+#standardSQL
+ -- Copyright 2017 The Nomulus Authors. All Rights Reserved.
+ --
+ -- Licensed under the Apache License, Version 2.0 (the "License");
+ -- you may not use this file except in compliance with the License.
+ -- You may obtain a copy of the License at
+ --
+ -- http://www.apache.org/licenses/LICENSE-2.0
+ --
+ -- Unless required by applicable law or agreed to in writing, software
+ -- distributed under the License is distributed on an "AS IS" BASIS,
+ -- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ -- See the License for the specific language governing permissions and
+ -- limitations under the License.
+
+ -- This query gathers all non-canceled billing events for a given
+ -- YEAR_MONTH in yyyy-MM format.
+
+SELECT
+ __key__.id AS id,
+ billingTime,
+ eventTime,
+ BillingEvent.clientId AS registrarId,
+ RegistrarData.accountId AS billingId,
+ tld,
+ reason as action,
+ targetId as domain,
+ BillingEvent.domainRepoId as repositoryId,
+ periodYears as years,
+ BillingEvent.currency AS currency,
+ BillingEvent.amount as amount,
+ -- We'll strip out non-useful flags downstream
+ ARRAY_TO_STRING(flags, " ") AS flags
+FROM (
+ SELECT
+ *,
+ -- We store cost as "CURRENCY AMOUNT" such as "JPY 800" or "USD 20.00"
+ SPLIT(cost, ' ')[OFFSET(0)] AS currency,
+ SPLIT(cost, ' ')[OFFSET(1)] AS amount,
+ -- Extract everything after the first dot in the domain as the TLD
+ REGEXP_EXTRACT(targetId, r'[.](.+)') AS tld,
+ -- __key__.path looks like '"DomainBase", "", ...'
+ REGEXP_REPLACE(SPLIT(__key__.path, ', ')[OFFSET(1)], '"', '')
+ AS domainRepoId,
+ COALESCE(cancellationMatchingBillingEvent.path,
+ __key__.path) AS cancellationMatchingPath
+ FROM
+ `%PROJECT_ID%.%DATASTORE_EXPORT_DATA_SET%.%ONETIME_TABLE%`
+ -- Only include real TLDs (filter prober data)
+ WHERE
+ REGEXP_EXTRACT(targetId, r'[.](.+)') IN (
+ SELECT
+ tldStr
+ FROM
+ `%PROJECT_ID%.%DATASTORE_EXPORT_DATA_SET%.%REGISTRY_TABLE%`
+ WHERE
+ tldType = 'REAL') ) AS BillingEvent
+ -- Gather billing ID from registrar table
+ -- This is a 'JOIN' as opposed to 'LEFT JOIN' to filter out
+ -- non-billable registrars
+JOIN (
+ SELECT
+ __key__.name AS clientId,
+ billingIdentifier,
+ r.billingAccountMap.currency[SAFE_OFFSET(index)] AS currency,
+ r.billingAccountMap.accountId[SAFE_OFFSET(index)] AS accountId
+ FROM
+ `%PROJECT_ID%.%DATASTORE_EXPORT_DATA_SET%.%REGISTRAR_TABLE%` AS r,
+ UNNEST(GENERATE_ARRAY(0, ARRAY_LENGTH(r.billingAccountMap.currency) - 1))
+ AS index
+ WHERE billingAccountMap IS NOT NULL
+ AND type = 'REAL') AS RegistrarData
+ON
+ BillingEvent.clientId = RegistrarData.clientId
+ AND BillingEvent.currency = RegistrarData.currency
+ -- Gather cancellations
+LEFT JOIN (
+ SELECT __key__.id AS cancellationId,
+ COALESCE(refOneTime.path, refRecurring.path) AS cancelledEventPath,
+ eventTime as cancellationTime,
+ billingTime as cancellationBillingTime
+ FROM
+ (SELECT
+ *,
+ -- Count everything after first dot as TLD (to support multi-part TLDs).
+ REGEXP_EXTRACT(targetId, r'[.](.+)') AS tld
+ FROM
+ `%PROJECT_ID%.%DATASTORE_EXPORT_DATA_SET%.%CANCELLATION_TABLE%`)
+) AS Cancellation
+ON BillingEvent.cancellationMatchingPath = Cancellation.cancelledEventPath
+AND BillingEvent.billingTime = Cancellation.cancellationBillingTime
+WHERE billingTime BETWEEN TIMESTAMP('%FIRST_TIMESTAMP_OF_MONTH%')
+ AND TIMESTAMP('%LAST_TIMESTAMP_OF_MONTH%')
+-- Filter out canceled events
+AND Cancellation.cancellationId IS NULL
+ORDER BY
+ billingTime DESC,
+ id,
+ tld
diff --git a/java/google/registry/repositories.bzl b/java/google/registry/repositories.bzl
index 6eac6a908..35352d135 100644
--- a/java/google/registry/repositories.bzl
+++ b/java/google/registry/repositories.bzl
@@ -115,6 +115,7 @@ def domain_registry_repositories(
omit_joda_time=False,
omit_junit=False,
omit_org_apache_avro=False,
+ omit_org_apache_beam_runners_direct_java=False,
omit_org_apache_beam_runners_google_cloud_dataflow_java=False,
omit_org_apache_beam_sdks_common_runner_api=False,
omit_org_apache_beam_sdks_java_core=False,
@@ -337,6 +338,8 @@ def domain_registry_repositories(
junit()
if not omit_org_apache_avro:
org_apache_avro()
+ if not omit_org_apache_beam_runners_direct_java:
+ org_apache_beam_runners_direct_java()
if not omit_org_apache_beam_runners_google_cloud_dataflow_java:
org_apache_beam_runners_google_cloud_dataflow_java()
if not omit_org_apache_beam_sdks_common_runner_api:
@@ -1843,6 +1846,24 @@ def org_apache_avro():
],
)
+def org_apache_beam_runners_direct_java():
+ java_import_external(
+ name = "org_apache_beam_runners_direct_java",
+ licenses = ["notice"], # Apache License, Version 2.0
+ jar_sha256 = "f394ad1577c2af67417af27305c9efd50de268d23629171fd2c7813f8d385713",
+ jar_urls = [
+ "http://domain-registry-maven.storage.googleapis.com/repo1.maven.org/maven2/org/apache/beam/beam-runners-direct-java/2.2.0/beam-runners-direct-java-2.2.0.jar",
+ "http://repo1.maven.org/maven2/org/apache/beam/beam-runners-direct-java/2.2.0/beam-runners-direct-java-2.2.0.jar",
+ ],
+ deps = [
+ "@org_apache_beam_sdks_java_core",
+ "@joda_time",
+ "@org_slf4j_api",
+ "@com_google_auto_value",
+ "@org_hamcrest_all",
+ ],
+ )
+
def org_apache_beam_runners_google_cloud_dataflow_java():
java_import_external(
name = "org_apache_beam_runners_google_cloud_dataflow_java",
@@ -1948,10 +1969,10 @@ def org_apache_beam_sdks_java_io_google_cloud_platform():
java_import_external(
name = "org_apache_beam_sdks_java_io_google_cloud_platform",
licenses = ["notice"], # Apache License, Version 2.0
- jar_sha256 = "a1a502fd7b960859d4ec50de1f4b55927b32a5e9583a7b40371110c3d6a4f297",
+ jar_sha256 = "7b94b19c5ff79e7a0cccf8ae3556b728643fc7b6c23a6fa21806795bbc69ce9a",
jar_urls = [
- "http://domain-registry-maven.storage.googleapis.com/repo1.maven.org/maven2/org/apache/beam/beam-sdks-java-io-google-cloud-platform/2.1.0/beam-sdks-java-io-google-cloud-platform-2.1.0.jar",
- "http://repo1.maven.org/maven2/org/apache/beam/beam-sdks-java-io-google-cloud-platform/2.1.0/beam-sdks-java-io-google-cloud-platform-2.1.0.jar",
+ "http://domain-registry-maven.storage.googleapis.com/repo1.maven.org/maven2/org/apache/beam/beam-sdks-java-io-google-cloud-platform/2.2.0/beam-sdks-java-io-google-cloud-platform-2.2.0.jar",
+ "http://repo1.maven.org/maven2/org/apache/beam/beam-sdks-java-io-google-cloud-platform/2.2.0/beam-sdks-java-io-google-cloud-platform-2.2.0.jar",
],
deps = [
"@org_apache_beam_sdks_java_extensions_google_cloud_platform_core",
diff --git a/javatests/google/registry/beam/BUILD b/javatests/google/registry/beam/BUILD
new file mode 100644
index 000000000..ac0d923cf
--- /dev/null
+++ b/javatests/google/registry/beam/BUILD
@@ -0,0 +1,38 @@
+package(
+ default_testonly = 1,
+ default_visibility = ["//java/google/registry:registry_project"],
+)
+
+licenses(["notice"]) # Apache 2.0
+
+load("//java/com/google/testing/builddefs:GenTestRules.bzl", "GenTestRules")
+
+java_library(
+ name = "beam",
+ srcs = glob(["*.java"]),
+ resources = glob(["testdata/*"]),
+ deps = [
+ "//java/google/registry/beam",
+ "//javatests/google/registry/testing",
+ "@com_google_apis_google_api_services_bigquery",
+ "@com_google_dagger",
+ "@com_google_guava",
+ "@com_google_truth",
+ "@com_google_truth_extensions_truth_java8_extension",
+ "@joda_time",
+ "@junit",
+ "@org_apache_avro",
+ "@org_apache_beam_runners_direct_java",
+ "@org_apache_beam_runners_google_cloud_dataflow_java",
+ "@org_apache_beam_sdks_java_core",
+ "@org_apache_beam_sdks_java_io_google_cloud_platform",
+ "@org_mockito_all",
+ ],
+)
+
+GenTestRules(
+ name = "GeneratedTestRules",
+ default_test_size = "small",
+ test_files = glob(["*Test.java"]),
+ deps = [":beam"],
+)
diff --git a/javatests/google/registry/beam/BigqueryTemplatePipelineTest.java b/javatests/google/registry/beam/BigqueryTemplatePipelineTest.java
new file mode 100644
index 000000000..41ac386b9
--- /dev/null
+++ b/javatests/google/registry/beam/BigqueryTemplatePipelineTest.java
@@ -0,0 +1,84 @@
+// Copyright 2017 The Nomulus Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package google.registry.beam;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.common.collect.ImmutableList;
+import google.registry.beam.BigqueryTemplatePipeline.CountRequestPaths;
+import google.registry.beam.BigqueryTemplatePipeline.ExtractRequestPathFn;
+import java.util.List;
+import org.apache.beam.runners.direct.DirectRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFnTester;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link BigqueryTemplatePipeline}*/
+@RunWith(JUnit4.class)
+public class BigqueryTemplatePipelineTest {
+
+ private static PipelineOptions pipelineOptions;
+
+ @BeforeClass
+ public static void initializePipelineOptions() {
+ pipelineOptions = PipelineOptionsFactory.create();
+ pipelineOptions.setRunner(DirectRunner.class);
+ }
+
+ @Rule public final transient TestPipeline p = TestPipeline.fromOptions(pipelineOptions);
+
+ @Test
+ public void testExtractRequestPathFn() throws Exception {
+ ExtractRequestPathFn extractRequestPathFn = new ExtractRequestPathFn();
+ DoFnTester fnTester = DoFnTester.of(extractRequestPathFn);
+ TableRow emptyRow = new TableRow();
+ TableRow hasRequestPathRow = new TableRow().set("requestPath", "a/path");
+ TableRow hasOtherValueRow = new TableRow().set("anotherValue", "b/lah");
+ List outputs = fnTester.processBundle(emptyRow, hasRequestPathRow, hasOtherValueRow);
+ assertThat(outputs).containsExactly("null", "a/path", "null");
+ }
+
+ @Test
+ public void testEndToEndPipeline() throws Exception {
+ ImmutableList inputRows =
+ ImmutableList.of(
+ new TableRow(),
+ new TableRow().set("requestPath", "a/path"),
+ new TableRow().set("requestPath", "b/path"),
+ new TableRow().set("requestPath", "b/path"),
+ new TableRow().set("anotherValue", "b/path"));
+
+ PCollection input = p.apply(Create.of(inputRows));
+ PCollection output = input.apply(new CountRequestPaths());
+
+ ImmutableList outputStrings = new ImmutableList.Builder()
+ .add("a/path: 1")
+ .add("b/path: 2")
+ .add("null: 2")
+ .build();
+ PAssert.that(output).containsInAnyOrder(outputStrings);
+ p.run();
+ }
+}
diff --git a/javatests/google/registry/beam/BillingEventTest.java b/javatests/google/registry/beam/BillingEventTest.java
new file mode 100644
index 000000000..2b210ef8f
--- /dev/null
+++ b/javatests/google/registry/beam/BillingEventTest.java
@@ -0,0 +1,172 @@
+// Copyright 2017 The Nomulus Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package google.registry.beam;
+
+import static com.google.common.truth.Truth.assertThat;
+import static google.registry.testing.JUnitBackports.assertThrows;
+
+import google.registry.beam.BillingEvent.InvoiceGroupingKey;
+import google.registry.beam.BillingEvent.InvoiceGroupingKey.InvoiceGroupingKeyCoder;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link BillingEvent} */
+@RunWith(JUnit4.class)
+public class BillingEventTest {
+
+ private static final String BILLING_EVENT_SCHEMA =
+ "{\"name\": \"BillingEvent\", "
+ + "\"type\": \"record\", "
+ + "\"fields\": ["
+ + "{\"name\": \"id\", \"type\": \"long\"},"
+ + "{\"name\": \"billingTime\", \"type\": \"string\"},"
+ + "{\"name\": \"eventTime\", \"type\": \"string\"},"
+ + "{\"name\": \"registrarId\", \"type\": \"string\"},"
+ + "{\"name\": \"billingId\", \"type\": \"long\"},"
+ + "{\"name\": \"tld\", \"type\": \"string\"},"
+ + "{\"name\": \"action\", \"type\": \"string\"},"
+ + "{\"name\": \"domain\", \"type\": \"string\"},"
+ + "{\"name\": \"repositoryId\", \"type\": \"string\"},"
+ + "{\"name\": \"years\", \"type\": \"int\"},"
+ + "{\"name\": \"currency\", \"type\": \"string\"},"
+ + "{\"name\": \"amount\", \"type\": \"float\"},"
+ + "{\"name\": \"flags\", \"type\": \"string\"}"
+ + "]}";
+
+ private SchemaAndRecord schemaAndRecord;
+
+ @Before
+ public void initializeRecord() {
+ // Create a record with a given JSON schema.
+ GenericRecord record = new GenericData.Record(new Schema.Parser().parse(BILLING_EVENT_SCHEMA));
+ record.put("id", "1");
+ record.put("billingTime", 1508835963000000L);
+ record.put("eventTime", 1484870383000000L);
+ record.put("registrarId", "myRegistrar");
+ record.put("billingId", "12345-CRRHELLO");
+ record.put("tld", "test");
+ record.put("action", "RENEW");
+ record.put("domain", "example.test");
+ record.put("repositoryId", "123456");
+ record.put("years", 5);
+ record.put("currency", "USD");
+ record.put("amount", 20.5);
+ record.put("flags", "AUTO_RENEW SYNTHETIC");
+ schemaAndRecord = new SchemaAndRecord(record, null);
+ }
+
+ @Test
+ public void testParseBillingEventFromRecord_success() {
+ BillingEvent event = BillingEvent.parseFromRecord(schemaAndRecord);
+ assertThat(event.id()).isEqualTo(1);
+ assertThat(event.billingTime())
+ .isEqualTo(ZonedDateTime.of(2017, 10, 24, 9, 6, 3, 0, ZoneId.of("UTC")));
+ assertThat(event.eventTime())
+ .isEqualTo(ZonedDateTime.of(2017, 1, 19, 23, 59, 43, 0, ZoneId.of("UTC")));
+ assertThat(event.registrarId()).isEqualTo("myRegistrar");
+ assertThat(event.billingId()).isEqualTo("12345-CRRHELLO");
+ assertThat(event.tld()).isEqualTo("test");
+ assertThat(event.action()).isEqualTo("RENEW");
+ assertThat(event.domain()).isEqualTo("example.test");
+ assertThat(event.repositoryId()).isEqualTo("123456");
+ assertThat(event.years()).isEqualTo(5);
+ assertThat(event.currency()).isEqualTo("USD");
+ assertThat(event.amount()).isEqualTo(20.5);
+ assertThat(event.flags()).isEqualTo("AUTO_RENEW SYNTHETIC");
+ }
+
+ @Test
+ public void testParseBillingEventFromRecord_nullValue_throwsException() {
+ schemaAndRecord.getRecord().put("tld", null);
+ assertThrows(IllegalStateException.class, () -> BillingEvent.parseFromRecord(schemaAndRecord));
+ }
+
+ @Test
+ public void testConvertBillingEvent_toCsv() {
+ BillingEvent event = BillingEvent.parseFromRecord(schemaAndRecord);
+ assertThat(event.toCsv())
+ .isEqualTo("1,2017-10-24 09:06:03 UTC,2017-01-19 23:59:43 UTC,myRegistrar,"
+ + "12345-CRRHELLO,test,RENEW,example.test,123456,5,USD,20.50,AUTO_RENEW");
+ }
+
+ @Test
+ public void testGenerateBillingEventFilename() {
+ BillingEvent event = BillingEvent.parseFromRecord(schemaAndRecord);
+ assertThat(event.toFilename()).isEqualTo("myRegistrar_test");
+ }
+
+ @Test
+ public void testGetInvoiceGroupingKey_fromBillingEvent() {
+ BillingEvent event = BillingEvent.parseFromRecord(schemaAndRecord);
+ InvoiceGroupingKey invoiceKey = event.getInvoiceGroupingKey();
+ assertThat(invoiceKey.startDate()).isEqualTo("2017-10-01");
+ assertThat(invoiceKey.endDate()).isEqualTo("2022-09-30");
+ assertThat(invoiceKey.productAccountKey()).isEqualTo("12345-CRRHELLO");
+ assertThat(invoiceKey.usageGroupingKey()).isEqualTo("myRegistrar - test");
+ assertThat(invoiceKey.description()).isEqualTo("RENEW | TLD: test | TERM: 5-year");
+ assertThat(invoiceKey.unitPrice()).isEqualTo(20.5);
+ assertThat(invoiceKey.unitPriceCurrency()).isEqualTo("USD");
+ assertThat(invoiceKey.poNumber()).isEmpty();
+ }
+
+ @Test
+ public void testConvertInvoiceGroupingKey_toCsv() {
+ BillingEvent event = BillingEvent.parseFromRecord(schemaAndRecord);
+ InvoiceGroupingKey invoiceKey = event.getInvoiceGroupingKey();
+ assertThat(invoiceKey.toCsv(3L))
+ .isEqualTo(
+ "2017-10-01,2022-09-30,12345-CRRHELLO,61.50,USD,10125,1,PURCHASE,"
+ + "myRegistrar - test,3,RENEW | TLD: test | TERM: 5-year,20.50,USD,");
+ }
+
+ @Test
+ public void testInvoiceGroupingKeyCoder_deterministicSerialization() throws IOException {
+ InvoiceGroupingKey invoiceKey =
+ BillingEvent.parseFromRecord(schemaAndRecord).getInvoiceGroupingKey();
+ InvoiceGroupingKeyCoder coder = new InvoiceGroupingKeyCoder();
+ ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+ coder.encode(invoiceKey, outStream);
+ InputStream inStream = new ByteArrayInputStream(outStream.toByteArray());
+ assertThat(coder.decode(inStream)).isEqualTo(invoiceKey);
+ }
+
+ @Test
+ public void testGetDetailReportHeader() {
+ assertThat(BillingEvent.getHeader())
+ .isEqualTo(
+ "id,billingTime,eventTime,registrarId,billingId,tld,action,"
+ + "domain,repositoryId,years,currency,amount,flags");
+ }
+
+ @Test
+ public void testGetOverallInvoiceHeader() {
+ assertThat(InvoiceGroupingKey.invoiceHeader())
+ .isEqualTo("StartDate,EndDate,ProductAccountKey,Amount,AmountCurrency,BillingProductCode,"
+ + "SalesChannel,LineItemType,UsageGroupingKey,Quantity,Description,UnitPrice,"
+ + "UnitPriceCurrency,PONumber");
+ }
+}
diff --git a/javatests/google/registry/beam/InvoicingPipelineTest.java b/javatests/google/registry/beam/InvoicingPipelineTest.java
new file mode 100644
index 000000000..55302dccc
--- /dev/null
+++ b/javatests/google/registry/beam/InvoicingPipelineTest.java
@@ -0,0 +1,122 @@
+// Copyright 2017 The Nomulus Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package google.registry.beam;
+
+import com.google.common.collect.ImmutableList;
+import google.registry.beam.InvoicingPipeline.GenerateInvoiceRows;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import org.apache.beam.runners.direct.DirectRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link InvoicingPipeline}. */
+@RunWith(JUnit4.class)
+public class InvoicingPipelineTest {
+
+ private static PipelineOptions pipelineOptions;
+
+ @BeforeClass
+ public static void initializePipelineOptions() {
+ pipelineOptions = PipelineOptionsFactory.create();
+ pipelineOptions.setRunner(DirectRunner.class);
+ }
+
+ @Rule public final transient TestPipeline p = TestPipeline.fromOptions(pipelineOptions);
+
+ @Test
+ public void testGenerateInvoiceRowsFn() throws Exception {
+ ImmutableList inputRows =
+ ImmutableList.of(
+ BillingEvent.create(
+ 1,
+ ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
+ ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
+ "theRegistrar",
+ "234",
+ "test",
+ "RENEW",
+ "mydomain.test",
+ "REPO-ID",
+ 3,
+ "USD",
+ 20.5,
+ ""),
+ BillingEvent.create(
+ 1,
+ ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
+ ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
+ "theRegistrar",
+ "234",
+ "test",
+ "RENEW",
+ "mydomain2.test",
+ "REPO-ID",
+ 3,
+ "USD",
+ 20.5,
+ ""),
+ BillingEvent.create(
+ 1,
+ ZonedDateTime.of(2017, 10, 2, 0, 0, 0, 0, ZoneId.of("UTC")),
+ ZonedDateTime.of(2017, 9, 29, 0, 0, 0, 0, ZoneId.of("UTC")),
+ "theRegistrar",
+ "234",
+ "hello",
+ "CREATE",
+ "mydomain3.hello",
+ "REPO-ID",
+ 5,
+ "JPY",
+ 70.75,
+ ""),
+ BillingEvent.create(
+ 1,
+ ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
+ ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
+ "googledomains",
+ "456",
+ "test",
+ "RENEW",
+ "mydomain4.test",
+ "REPO-ID",
+ 1,
+ "USD",
+ 20.5,
+ ""));
+
+ PCollection input = p.apply(Create.of(inputRows));
+ PCollection output = input.apply(new GenerateInvoiceRows());
+
+ ImmutableList outputStrings = ImmutableList.of(
+ "2017-10-01,2020-09-30,234,41.00,USD,10125,1,PURCHASE,theRegistrar - test,2,"
+ + "RENEW | TLD: test | TERM: 3-year,20.50,USD,",
+ "2017-10-01,2022-09-30,234,70.75,JPY,10125,1,PURCHASE,theRegistrar - hello,1,"
+ + "CREATE | TLD: hello | TERM: 5-year,70.75,JPY,",
+ "2017-10-01,2018-09-30,456,20.50,USD,10125,1,PURCHASE,googledomains - test,1,"
+ + "RENEW | TLD: test | TERM: 1-year,20.50,USD,");
+ PAssert.that(output).containsInAnyOrder(outputStrings);
+ p.run();
+ }
+}
diff --git a/javatests/google/registry/beam/InvoicingUtilsTest.java b/javatests/google/registry/beam/InvoicingUtilsTest.java
new file mode 100644
index 000000000..7100c36a8
--- /dev/null
+++ b/javatests/google/registry/beam/InvoicingUtilsTest.java
@@ -0,0 +1,74 @@
+// Copyright 2017 The Nomulus Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package google.registry.beam;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import google.registry.testing.TestDataHelper;
+import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
+import org.apache.beam.sdk.io.FileBasedSink;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link InvoicingUtils}. */
+@RunWith(JUnit4.class)
+public class InvoicingUtilsTest {
+
+ @Test
+ public void testDestinationFunction_generatesProperFileParams() {
+ SerializableFunction destinationFunction =
+ InvoicingUtils.makeDestinationFunction("my/directory");
+
+ BillingEvent billingEvent = mock(BillingEvent.class);
+ // We mock BillingEvent to make the test independent of the implementation of toFilename()
+ when(billingEvent.toFilename()).thenReturn("registrar_tld");
+
+ assertThat(destinationFunction.apply(billingEvent))
+ .isEqualTo(
+ new Params()
+ .withShardTemplate("")
+ .withSuffix(".csv")
+ .withBaseFilename(
+ FileBasedSink.convertToFileResourceIfPossible("my/directory/registrar_tld")));
+ }
+
+ @Test
+ public void testEmptyDestinationParams() {
+ assertThat(InvoicingUtils.makeEmptyDestinationParams("my/directory"))
+ .isEqualTo(
+ new Params()
+ .withBaseFilename(
+ FileBasedSink.convertToFileResourceIfPossible("my/directory/failed")));
+ }
+
+ /** Asserts that the instantiated sql template matches a golden expected file. */
+ @Test
+ public void testMakeQueryProvider() {
+ ValueProvider queryProvider =
+ InvoicingUtils.makeQueryProvider(StaticValueProvider.of("2017-10"), "my-project-id");
+ assertThat(queryProvider.get()).isEqualTo(loadFile("billing_events_test.sql"));
+ }
+
+ /** Returns a {@link String} from a file in the {@code billing/testdata/} directory. */
+ private static String loadFile(String filename) {
+ return TestDataHelper.loadFile(InvoicingUtilsTest.class, filename);
+ }
+}
diff --git a/javatests/google/registry/beam/testdata/billing_events_test.sql b/javatests/google/registry/beam/testdata/billing_events_test.sql
new file mode 100644
index 000000000..da6330411
--- /dev/null
+++ b/javatests/google/registry/beam/testdata/billing_events_test.sql
@@ -0,0 +1,99 @@
+#standardSQL
+ -- Copyright 2017 The Nomulus Authors. All Rights Reserved.
+ --
+ -- Licensed under the Apache License, Version 2.0 (the "License");
+ -- you may not use this file except in compliance with the License.
+ -- You may obtain a copy of the License at
+ --
+ -- http://www.apache.org/licenses/LICENSE-2.0
+ --
+ -- Unless required by applicable law or agreed to in writing, software
+ -- distributed under the License is distributed on an "AS IS" BASIS,
+ -- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ -- See the License for the specific language governing permissions and
+ -- limitations under the License.
+
+ -- This query gathers all non-canceled billing events for a given
+ -- YEAR_MONTH in yyyy-MM format.
+
+SELECT
+ __key__.id AS id,
+ billingTime,
+ eventTime,
+ BillingEvent.clientId AS registrarId,
+ RegistrarData.accountId AS billingId,
+ tld,
+ reason as action,
+ targetId as domain,
+ BillingEvent.domainRepoId as repositoryId,
+ periodYears as years,
+ BillingEvent.currency AS currency,
+ BillingEvent.amount as amount,
+ -- We'll strip out non-useful flags downstream
+ ARRAY_TO_STRING(flags, " ") AS flags
+FROM (
+ SELECT
+ *,
+ -- We store cost as "CURRENCY AMOUNT" such as "JPY 800" or "USD 20.00"
+ SPLIT(cost, ' ')[OFFSET(0)] AS currency,
+ SPLIT(cost, ' ')[OFFSET(1)] AS amount,
+ -- Extract everything after the first dot in the domain as the TLD
+ REGEXP_EXTRACT(targetId, r'[.](.+)') AS tld,
+ -- __key__.path looks like '"DomainBase", "", ...'
+ REGEXP_REPLACE(SPLIT(__key__.path, ', ')[OFFSET(1)], '"', '')
+ AS domainRepoId,
+ COALESCE(cancellationMatchingBillingEvent.path,
+ __key__.path) AS cancellationMatchingPath
+ FROM
+ `my-project-id.latest_datastore_export.OneTime`
+ -- Only include real TLDs (filter prober data)
+ WHERE
+ REGEXP_EXTRACT(targetId, r'[.](.+)') IN (
+ SELECT
+ tldStr
+ FROM
+ `my-project-id.latest_datastore_export.Registry`
+ WHERE
+ tldType = 'REAL') ) AS BillingEvent
+ -- Gather billing ID from registrar table
+ -- This is a 'JOIN' as opposed to 'LEFT JOIN' to filter out
+ -- non-billable registrars
+JOIN (
+ SELECT
+ __key__.name AS clientId,
+ billingIdentifier,
+ r.billingAccountMap.currency[SAFE_OFFSET(index)] AS currency,
+ r.billingAccountMap.accountId[SAFE_OFFSET(index)] AS accountId
+ FROM
+ `my-project-id.latest_datastore_export.Registrar` AS r,
+ UNNEST(GENERATE_ARRAY(0, ARRAY_LENGTH(r.billingAccountMap.currency) - 1))
+ AS index
+ WHERE billingAccountMap IS NOT NULL
+ AND type = 'REAL') AS RegistrarData
+ON
+ BillingEvent.clientId = RegistrarData.clientId
+ AND BillingEvent.currency = RegistrarData.currency
+ -- Gather cancellations
+LEFT JOIN (
+ SELECT __key__.id AS cancellationId,
+ COALESCE(refOneTime.path, refRecurring.path) AS cancelledEventPath,
+ eventTime as cancellationTime,
+ billingTime as cancellationBillingTime
+ FROM
+ (SELECT
+ *,
+ -- Count everything after first dot as TLD (to support multi-part TLDs).
+ REGEXP_EXTRACT(targetId, r'[.](.+)') AS tld
+ FROM
+ `my-project-id.latest_datastore_export.Cancellation`)
+) AS Cancellation
+ON BillingEvent.cancellationMatchingPath = Cancellation.cancelledEventPath
+AND BillingEvent.billingTime = Cancellation.cancellationBillingTime
+WHERE billingTime BETWEEN TIMESTAMP('2017-10-01 00:00:00.000000')
+ AND TIMESTAMP('2017-10-31 23:59:59.999999')
+-- Filter out canceled events
+AND Cancellation.cancellationId IS NULL
+ORDER BY
+ billingTime DESC,
+ id,
+ tld