diff --git a/java/google/registry/beam/BUILD b/java/google/registry/beam/BUILD
index 7dc73cbd6..e96d0117d 100644
--- a/java/google/registry/beam/BUILD
+++ b/java/google/registry/beam/BUILD
@@ -6,24 +6,4 @@ licenses(["notice"]) # Apache 2.0
java_library(
name = "beam",
- srcs = glob(["*.java"]),
- resources = glob(["sql/*"]),
- deps = [
- "//java/google/registry/config",
- "//java/google/registry/model",
- "//java/google/registry/reporting/billing",
- "//java/google/registry/util",
- "@com_google_apis_google_api_services_bigquery",
- "@com_google_auto_value",
- "@com_google_dagger",
- "@com_google_flogger",
- "@com_google_flogger_system_backend",
- "@com_google_guava",
- "@javax_inject",
- "@org_apache_avro",
- "@org_apache_beam_runners_direct_java",
- "@org_apache_beam_runners_google_cloud_dataflow_java",
- "@org_apache_beam_sdks_java_core",
- "@org_apache_beam_sdks_java_io_google_cloud_platform",
- ],
)
diff --git a/java/google/registry/beam/BigqueryTemplatePipeline.java b/java/google/registry/beam/BigqueryTemplatePipeline.java
deleted file mode 100644
index 29d0d5134..000000000
--- a/java/google/registry/beam/BigqueryTemplatePipeline.java
+++ /dev/null
@@ -1,97 +0,0 @@
-// Copyright 2017 The Nomulus Authors. All Rights Reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package google.registry.beam;
-
-import com.google.api.services.bigquery.model.TableRow;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TypeDescriptors;
-
-/**
- * Main class to stage a templated Dataflow pipeline which reads from Bigquery on GCS.
- *
- *
To stage this pipeline on GCS, run it with the following command line flags:
- *
- * - --runner=DataflowRunner
- *
- --project=[YOUR PROJECT ID]
- *
- --stagingLocation=gs://[WHERE PIPELINE JAR FILES SHOULD BE STAGED]
- *
- --templateLocation=gs://[WHERE TEMPLATE.txt FILE SHOULD BE STAGED]
- *
- *
- * Then, you can run the staged template via the API client library, gCloud or a raw REST call.
- *
- * @see Dataflow Templates
- */
-public class BigqueryTemplatePipeline {
-
- /** Custom command-line pipeline options for {@code BigqueryTemplatePipeline}. */
- public interface BigqueryTemplateOptions extends PipelineOptions {
- @Description("Bigquery query used to get the initial data for the pipeline.")
- @Default.String("SELECT * FROM `[YOUR_PROJECT].[DATASET].[TABLE]`")
- String getBigqueryQuery();
- void setBigqueryQuery(String value);
-
- @Description("The GCS bucket we output the result text file to.")
- @Default.String("[YOUR BUCKET HERE]")
- String getOutputBucket();
- void setOutputBucket(String value);
- }
-
- public static void main(String[] args) {
- // Parse standard arguments, as well as custom options
- BigqueryTemplateOptions options =
- PipelineOptionsFactory.fromArgs(args).withValidation().as(BigqueryTemplateOptions.class);
-
- // Create pipeline
- Pipeline p = Pipeline.create(options);
- p.apply(BigQueryIO.readTableRows().fromQuery(options.getBigqueryQuery()).usingStandardSql())
- .apply("Count request paths", new CountRequestPaths())
- .apply(TextIO.write().to(options.getOutputBucket()).withoutSharding().withHeader("HEADER"));
- p.run();
- }
-
- /** A composite {@code PTransform} that counts the number of times each request path appears. */
- static class CountRequestPaths extends PTransform, PCollection> {
- @Override
- public PCollection expand(PCollection input) {
- return input
- .apply("Extract paths", ParDo.of(new ExtractRequestPathFn()))
- .apply("Count paths", Count.perElement())
- .apply(
- "Format results",
- MapElements.into(TypeDescriptors.strings())
- .via(kv -> kv.getKey() + ": " + kv.getValue()));
- }
- }
-
- /** A {@code DoFn} that extracts the request path from a Bigquery {@code TableRow}. */
- static class ExtractRequestPathFn extends DoFn {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(String.valueOf(c.element().get("requestPath")));
- }
- }
-}
diff --git a/java/google/registry/beam/BillingEvent.java b/java/google/registry/beam/BillingEvent.java
deleted file mode 100644
index 9f612bb96..000000000
--- a/java/google/registry/beam/BillingEvent.java
+++ /dev/null
@@ -1,366 +0,0 @@
-// Copyright 2017 The Nomulus Authors. All Rights Reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package google.registry.beam;
-
-import com.google.auto.value.AutoValue;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableList;
-import com.google.common.flogger.FluentLogger;
-import google.registry.model.billing.BillingEvent.Flag;
-import google.registry.reporting.billing.BillingModule;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.text.DecimalFormat;
-import java.time.Instant;
-import java.time.ZoneId;
-import java.time.ZonedDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.stream.Collectors;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
-
-/**
- * A POJO representing a single billable event, parsed from a {@code SchemaAndRecord}.
- *
- * This is a trivially serializable class that allows Beam to transform the results of a Bigquery
- * query into a standard Java representation, giving us the type guarantees and ease of manipulation
- * Bigquery lacks, while localizing any Bigquery-side failures to the {@link #parseFromRecord}
- * function.
- */
-@AutoValue
-public abstract class BillingEvent implements Serializable {
-
- private static final FluentLogger logger = FluentLogger.forEnclosingClass();
-
- private static final DateTimeFormatter DATE_TIME_FORMATTER =
- DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss zzz");
-
-
- /** The amount we multiply the price for sunrise creates. This is currently a 15% discount. */
- private static final double SUNRISE_DISCOUNT_PRICE_MODIFIER = 0.85;
-
- private static final ImmutableList FIELD_NAMES =
- ImmutableList.of(
- "id",
- "billingTime",
- "eventTime",
- "registrarId",
- "billingId",
- "tld",
- "action",
- "domain",
- "repositoryId",
- "years",
- "currency",
- "amount",
- "flags");
-
- /** Returns the unique Objectify ID for the {@code OneTime} associated with this event. */
- abstract long id();
- /** Returns the UTC DateTime this event becomes billable. */
- abstract ZonedDateTime billingTime();
- /** Returns the UTC DateTime this event was generated. */
- abstract ZonedDateTime eventTime();
- /** Returns the billed registrar's name. */
- abstract String registrarId();
- /** Returns the billed registrar's billing account key. */
- abstract String billingId();
- /** Returns the tld this event was generated for. */
- abstract String tld();
- /** Returns the billable action this event was generated for (i.e. RENEW, CREATE, TRANSFER...) */
- abstract String action();
- /** Returns the fully qualified domain name this event was generated for. */
- abstract String domain();
- /** Returns the unique RepoID associated with the billed domain. */
- abstract String repositoryId();
- /** Returns the number of years this billing event is made out for. */
- abstract int years();
- /** Returns the 3-letter currency code for the billing event (i.e. USD or JPY.) */
- abstract String currency();
- /** Returns the cost associated with this billing event. */
- abstract double amount();
- /** Returns a list of space-delimited flags associated with the event. */
- abstract String flags();
-
- /**
- * Constructs a {@code BillingEvent} from a {@code SchemaAndRecord}.
- *
- * @see
- * Apache AVRO GenericRecord
- */
- static BillingEvent parseFromRecord(SchemaAndRecord schemaAndRecord) {
- checkFieldsNotNull(schemaAndRecord);
- GenericRecord record = schemaAndRecord.getRecord();
- String flags = extractField(record, "flags");
- double amount = getDiscountedAmount(Double.parseDouble(extractField(record, "amount")), flags);
- return create(
- // We need to chain parsers off extractField because GenericRecord only returns
- // Objects, which contain a string representation of their underlying types.
- Long.parseLong(extractField(record, "id")),
- // Bigquery provides UNIX timestamps with microsecond precision.
- Instant.ofEpochMilli(Long.parseLong(extractField(record, "billingTime")) / 1000)
- .atZone(ZoneId.of("UTC")),
- Instant.ofEpochMilli(Long.parseLong(extractField(record, "eventTime")) / 1000)
- .atZone(ZoneId.of("UTC")),
- extractField(record, "registrarId"),
- extractField(record, "billingId"),
- extractField(record, "tld"),
- extractField(record, "action"),
- extractField(record, "domain"),
- extractField(record, "repositoryId"),
- Integer.parseInt(extractField(record, "years")),
- extractField(record, "currency"),
- amount,
- flags);
- }
-
- /**
- * Applies a discount to sunrise creates and anchor tenant creates if applicable.
- *
- * Currently sunrise creates are discounted 15% and anchor tenant creates are free for 2 years.
- * All anchor tenant creates are enforced to be 2 years in
- * {@link google.registry.flows.domain.DomainCreateFlow#verifyAnchorTenantValidPeriod}.
- */
- private static double getDiscountedAmount(double amount, String flags) {
- // Apply a configurable discount to sunrise creates.
- if (flags.contains(Flag.SUNRISE.name())) {
- amount =
- Double.parseDouble(
- new DecimalFormat("#.##").format(amount * SUNRISE_DISCOUNT_PRICE_MODIFIER));
- }
- // Anchor tenant creates are free for the initial create. This is enforced to be a 2 year period
- // upon domain create.
- if (flags.contains(Flag.ANCHOR_TENANT.name())) {
- amount = 0;
- }
- return amount;
- }
-
- /**
- * Creates a concrete {@code BillingEvent}.
- *
- * This should only be used outside this class for testing- instances of {@code BillingEvent}
- * should otherwise come from {@link #parseFromRecord}.
- */
- @VisibleForTesting
- static BillingEvent create(
- long id,
- ZonedDateTime billingTime,
- ZonedDateTime eventTime,
- String registrarId,
- String billingId,
- String tld,
- String action,
- String domain,
- String repositoryId,
- int years,
- String currency,
- double amount,
- String flags) {
- return new AutoValue_BillingEvent(
- id,
- billingTime,
- eventTime,
- registrarId,
- billingId,
- tld,
- action,
- domain,
- repositoryId,
- years,
- currency,
- amount,
- flags);
- }
-
- static String getHeader() {
- return FIELD_NAMES.stream().collect(Collectors.joining(","));
- }
-
- /**
- * Generates the filename associated with this {@code BillingEvent}.
- *
- *
When modifying this function, take care to ensure that there's no way to generate an illegal
- * filepath with the arguments, such as "../sensitive_info".
- */
- String toFilename(String yearMonth) {
- return String.format(
- "%s_%s_%s_%s", BillingModule.DETAIL_REPORT_PREFIX, yearMonth, registrarId(), tld());
- }
-
- /** Generates a CSV representation of this {@code BillingEvent}. */
- String toCsv() {
- return Joiner.on(",")
- .join(
- ImmutableList.of(
- id(),
- DATE_TIME_FORMATTER.format(billingTime()),
- DATE_TIME_FORMATTER.format(eventTime()),
- registrarId(),
- billingId(),
- tld(),
- action(),
- domain(),
- repositoryId(),
- years(),
- currency(),
- String.format("%.2f", amount()),
- // Strip out the 'synthetic' flag, which is internal only.
- flags().replace("SYNTHETIC", "").trim()));
- }
-
- /** Returns the grouping key for this {@code BillingEvent}, to generate the overall invoice. */
- InvoiceGroupingKey getInvoiceGroupingKey() {
- return new AutoValue_BillingEvent_InvoiceGroupingKey(
- billingTime().toLocalDate().withDayOfMonth(1).toString(),
- billingTime().toLocalDate().withDayOfMonth(1).plusYears(years()).minusDays(1).toString(),
- billingId(),
- String.format("%s - %s", registrarId(), tld()),
- String.format("%s | TLD: %s | TERM: %d-year", action(), tld(), years()),
- amount(),
- currency(),
- "");
- }
-
- /** Key for each {@code BillingEvent}, when aggregating for the overall invoice. */
- @AutoValue
- abstract static class InvoiceGroupingKey implements Serializable {
-
- private static final ImmutableList INVOICE_HEADERS =
- ImmutableList.of(
- "StartDate",
- "EndDate",
- "ProductAccountKey",
- "Amount",
- "AmountCurrency",
- "BillingProductCode",
- "SalesChannel",
- "LineItemType",
- "UsageGroupingKey",
- "Quantity",
- "Description",
- "UnitPrice",
- "UnitPriceCurrency",
- "PONumber");
-
- /** Returns the first day this invoice is valid, in yyyy-MM-dd format. */
- abstract String startDate();
- /** Returns the last day this invoice is valid, in yyyy-MM-dd format. */
- abstract String endDate();
- /** Returns the billing account id, which is the {@code BillingEvent.billingId}. */
- abstract String productAccountKey();
- /** Returns the invoice grouping key, which is in the format "registrarId - tld". */
- abstract String usageGroupingKey();
- /** Returns a description of the item, formatted as "action | TLD: tld | TERM: n-year." */
- abstract String description();
- /** Returns the cost per invoice item. */
- abstract Double unitPrice();
- /** Returns the 3-digit currency code the unit price uses. */
- abstract String unitPriceCurrency();
- /** Returns the purchase order number for the item, blank for most registrars. */
- abstract String poNumber();
-
- /** Generates the CSV header for the overall invoice. */
- static String invoiceHeader() {
- return Joiner.on(",").join(INVOICE_HEADERS);
- }
-
- /** Generates a CSV representation of n aggregate billing events. */
- String toCsv(Long quantity) {
- double totalPrice = unitPrice() * quantity;
- return Joiner.on(",")
- .join(
- ImmutableList.of(
- startDate(),
- endDate(),
- productAccountKey(),
- String.format("%.2f", totalPrice),
- unitPriceCurrency(),
- "10125",
- "1",
- "PURCHASE",
- usageGroupingKey(),
- String.format("%d", quantity),
- description(),
- String.format("%.2f", unitPrice()),
- unitPriceCurrency(),
- poNumber()));
- }
-
- /** Coder that provides deterministic (de)serialization for {@code InvoiceGroupingKey}. */
- static class InvoiceGroupingKeyCoder extends AtomicCoder {
-
- @Override
- public void encode(InvoiceGroupingKey value, OutputStream outStream) throws IOException {
- Coder stringCoder = StringUtf8Coder.of();
- stringCoder.encode(value.startDate(), outStream);
- stringCoder.encode(value.endDate(), outStream);
- stringCoder.encode(value.productAccountKey(), outStream);
- stringCoder.encode(value.usageGroupingKey(), outStream);
- stringCoder.encode(value.description(), outStream);
- stringCoder.encode(String.valueOf(value.unitPrice()), outStream);
- stringCoder.encode(value.unitPriceCurrency(), outStream);
- stringCoder.encode(value.poNumber(), outStream);
- }
-
- @Override
- public InvoiceGroupingKey decode(InputStream inStream) throws IOException {
- Coder stringCoder = StringUtf8Coder.of();
- return new AutoValue_BillingEvent_InvoiceGroupingKey(
- stringCoder.decode(inStream),
- stringCoder.decode(inStream),
- stringCoder.decode(inStream),
- stringCoder.decode(inStream),
- stringCoder.decode(inStream),
- Double.parseDouble(stringCoder.decode(inStream)),
- stringCoder.decode(inStream),
- stringCoder.decode(inStream));
- }
- }
- }
-
- /** Extracts a string representation of a field in a {@code GenericRecord}. */
- private static String extractField(GenericRecord record, String fieldName) {
- return String.valueOf(record.get(fieldName));
- }
-
- /**
- * Checks that no expected fields in the record are missing.
- *
- * Note that this simply makes sure the field is not null; it may still generate a parse error
- * in {@code parseFromRecord}.
- */
- private static void checkFieldsNotNull(SchemaAndRecord schemaAndRecord) {
- GenericRecord record = schemaAndRecord.getRecord();
- ImmutableList nullFields =
- FIELD_NAMES
- .stream()
- .filter(fieldName -> record.get(fieldName) == null)
- .collect(ImmutableList.toImmutableList());
- if (!nullFields.isEmpty()) {
- logger.atSevere().log(
- "Found unexpected null value(s) in field(s) %s for record %s",
- Joiner.on(", ").join(nullFields), record);
- throw new IllegalStateException("Read null value from Bigquery query");
- }
- }
-}
diff --git a/java/google/registry/beam/InvoicingPipeline.java b/java/google/registry/beam/InvoicingPipeline.java
deleted file mode 100644
index 9b7f0c410..000000000
--- a/java/google/registry/beam/InvoicingPipeline.java
+++ /dev/null
@@ -1,188 +0,0 @@
-// Copyright 2017 The Nomulus Authors. All Rights Reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package google.registry.beam;
-
-import google.registry.beam.BillingEvent.InvoiceGroupingKey;
-import google.registry.beam.BillingEvent.InvoiceGroupingKey.InvoiceGroupingKeyCoder;
-import google.registry.config.RegistryConfig.Config;
-import google.registry.reporting.billing.BillingModule;
-import google.registry.reporting.billing.GenerateInvoicesAction;
-import java.io.Serializable;
-import javax.inject.Inject;
-import org.apache.beam.runners.dataflow.DataflowRunner;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
-import org.apache.beam.sdk.io.FileBasedSink;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import org.apache.beam.sdk.values.TypeDescriptors;
-
-/**
- * Definition of a Dataflow pipeline template, which generates a given month's invoices.
- *
- * To stage this template on GCS, run the {@link
- * google.registry.tools.DeployInvoicingPipelineCommand} Nomulus command.
- *
- *
Then, you can run the staged template via the API client library, gCloud or a raw REST call.
- * For an example using the API client library, see {@link GenerateInvoicesAction}.
- *
- * @see Dataflow Templates
- */
-public class InvoicingPipeline implements Serializable {
-
- @Inject
- @Config("projectId")
- String projectId;
-
- @Inject
- @Config("apacheBeamBucketUrl")
- String beamBucketUrl;
-
- @Inject
- @Config("invoiceTemplateUrl")
- String invoiceTemplateUrl;
-
- @Inject
- @Config("invoiceStagingUrl")
- String invoiceStagingUrl;
-
- @Inject
- @Config("billingBucketUrl")
- String billingBucketUrl;
-
- @Inject
- InvoicingPipeline() {}
-
- /** Custom options for running the invoicing pipeline. */
- interface InvoicingPipelineOptions extends DataflowPipelineOptions {
- /** Returns the yearMonth we're generating invoices for, in yyyy-MM format. */
- @Description("The yearMonth we generate invoices for, in yyyy-MM format.")
- ValueProvider getYearMonth();
- /**
- * Sets the yearMonth we generate invoices for.
- *
- * This is implicitly set when executing the Dataflow template, by specifying the 'yearMonth
- * parameter.
- */
- void setYearMonth(ValueProvider value);
- }
-
- /** Deploys the invoicing pipeline as a template on GCS, for a given projectID and GCS bucket. */
- public void deploy() {
- // We can't store options as a member variable due to serialization concerns.
- InvoicingPipelineOptions options = PipelineOptionsFactory.as(InvoicingPipelineOptions.class);
- options.setProject(projectId);
- options.setRunner(DataflowRunner.class);
- // This causes p.run() to stage the pipeline as a template on GCS, as opposed to running it.
- options.setTemplateLocation(invoiceTemplateUrl);
- options.setStagingLocation(invoiceStagingUrl);
- Pipeline p = Pipeline.create(options);
-
- PCollection billingEvents =
- p.apply(
- "Read BillingEvents from Bigquery",
- BigQueryIO.read(BillingEvent::parseFromRecord)
- .fromQuery(InvoicingUtils.makeQueryProvider(options.getYearMonth(), projectId))
- .withCoder(SerializableCoder.of(BillingEvent.class))
- .usingStandardSql()
- .withoutValidation()
- .withTemplateCompatibility());
- applyTerminalTransforms(billingEvents, options.getYearMonth());
- p.run();
- }
-
- /**
- * Applies output transforms to the {@code BillingEvent} source collection.
- *
- * This is factored out purely to facilitate testing.
- */
- void applyTerminalTransforms(
- PCollection billingEvents, ValueProvider yearMonthProvider) {
- billingEvents
- .apply("Generate overall invoice rows", new GenerateInvoiceRows())
- .apply("Write overall invoice to CSV", writeInvoice(yearMonthProvider));
-
- billingEvents.apply(
- "Write detail reports to separate CSVs keyed by registrarId_tld pair",
- writeDetailReports(yearMonthProvider));
- }
-
- /** Transform that converts a {@code BillingEvent} into an invoice CSV row. */
- private static class GenerateInvoiceRows
- extends PTransform, PCollection> {
- @Override
- public PCollection expand(PCollection input) {
- return input
- .apply(
- "Map to invoicing key",
- MapElements.into(TypeDescriptor.of(InvoiceGroupingKey.class))
- .via(BillingEvent::getInvoiceGroupingKey))
- .setCoder(new InvoiceGroupingKeyCoder())
- .apply("Count occurrences", Count.perElement())
- .apply(
- "Format as CSVs",
- MapElements.into(TypeDescriptors.strings())
- .via((KV kv) -> kv.getKey().toCsv(kv.getValue())));
- }
- }
-
- /** Returns an IO transform that writes the overall invoice to a single CSV file. */
- private TextIO.Write writeInvoice(ValueProvider yearMonthProvider) {
- return TextIO.write()
- .to(
- NestedValueProvider.of(
- yearMonthProvider,
- yearMonth ->
- String.format(
- "%s/%s/%s/%s-%s",
- billingBucketUrl,
- BillingModule.INVOICES_DIRECTORY,
- yearMonth,
- BillingModule.OVERALL_INVOICE_PREFIX,
- yearMonth)))
- .withHeader(InvoiceGroupingKey.invoiceHeader())
- .withoutSharding()
- .withSuffix(".csv");
- }
-
- /** Returns an IO transform that writes detail reports to registrar-tld keyed CSV files. */
- private TextIO.TypedWrite writeDetailReports(
- ValueProvider yearMonthProvider) {
- return TextIO.writeCustomType()
- .to(
- InvoicingUtils.makeDestinationFunction(
- String.format("%s/%s", billingBucketUrl, BillingModule.INVOICES_DIRECTORY),
- yearMonthProvider),
- InvoicingUtils.makeEmptyDestinationParams(billingBucketUrl + "/errors"))
- .withFormatFunction(BillingEvent::toCsv)
- .withoutSharding()
- .withTempDirectory(
- FileBasedSink.convertToFileResourceIfPossible(beamBucketUrl + "/temporary"))
- .withHeader(BillingEvent.getHeader())
- .withSuffix(".csv");
- }
-}
diff --git a/java/google/registry/beam/InvoicingUtils.java b/java/google/registry/beam/InvoicingUtils.java
deleted file mode 100644
index 1e52c0f26..000000000
--- a/java/google/registry/beam/InvoicingUtils.java
+++ /dev/null
@@ -1,112 +0,0 @@
-// Copyright 2017 The Nomulus Authors. All Rights Reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package google.registry.beam;
-
-import com.google.common.io.Resources;
-import google.registry.util.ResourceUtils;
-import google.registry.util.SqlTemplate;
-import java.time.LocalDateTime;
-import java.time.LocalTime;
-import java.time.YearMonth;
-import java.time.format.DateTimeFormatter;
-import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
-import org.apache.beam.sdk.io.FileBasedSink;
-import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-
-/** Pipeline helper functions used to generate invoices from instances of {@link BillingEvent}. */
-public class InvoicingUtils {
-
- private InvoicingUtils() {}
-
- private static final DateTimeFormatter TIMESTAMP_FORMATTER =
- DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
-
- /**
- * Returns a function mapping from {@code BillingEvent} to filename {@code Params}.
- *
- * Beam uses this to determine which file a given {@code BillingEvent} should get placed into.
- *
- * @param outputBucket the GCS bucket we're outputting reports to
- * @param yearMonthProvider a runtime provider for the yyyy-MM we're generating the invoice for
- */
- static SerializableFunction makeDestinationFunction(
- String outputBucket, ValueProvider yearMonthProvider) {
- return billingEvent ->
- new Params()
- .withShardTemplate("")
- .withSuffix(".csv")
- .withBaseFilename(
- NestedValueProvider.of(
- yearMonthProvider,
- yearMonth ->
- FileBasedSink.convertToFileResourceIfPossible(
- String.format(
- "%s/%s/%s",
- outputBucket, yearMonth, billingEvent.toFilename(yearMonth)))));
- }
-
- /**
- * Returns the default filename parameters for an unmappable {@code BillingEvent}.
- *
- * The "failed" file should only be populated when an error occurs, which warrants further
- * investigation.
- */
- static Params makeEmptyDestinationParams(String outputBucket) {
- return new Params()
- .withBaseFilename(
- FileBasedSink.convertToFileResourceIfPossible(
- String.format("%s/%s", outputBucket, "FAILURES")));
- }
-
- /**
- * Returns a provider that creates a Bigquery query for a given project and yearMonth at runtime.
- *
- *
We only know yearMonth at runtime, so this provider fills in the {@code
- * sql/billing_events.sql} template at runtime.
- *
- * @param yearMonthProvider a runtime provider that returns which month we're invoicing for.
- * @param projectId the projectId we're generating invoicing for.
- */
- static ValueProvider makeQueryProvider(
- ValueProvider yearMonthProvider, String projectId) {
- return NestedValueProvider.of(
- yearMonthProvider,
- (yearMonth) -> {
- // Get the timestamp endpoints capturing the entire month with microsecond precision
- YearMonth reportingMonth = YearMonth.parse(yearMonth);
- LocalDateTime firstMoment = reportingMonth.atDay(1).atTime(LocalTime.MIDNIGHT);
- LocalDateTime lastMoment = reportingMonth.atEndOfMonth().atTime(LocalTime.MAX);
- // Construct the month's query by filling in the billing_events.sql template
- return SqlTemplate.create(getQueryFromFile("billing_events.sql"))
- .put("FIRST_TIMESTAMP_OF_MONTH", firstMoment.format(TIMESTAMP_FORMATTER))
- .put("LAST_TIMESTAMP_OF_MONTH", lastMoment.format(TIMESTAMP_FORMATTER))
- .put("PROJECT_ID", projectId)
- .put("DATASTORE_EXPORT_DATA_SET", "latest_datastore_export")
- .put("ONETIME_TABLE", "OneTime")
- .put("REGISTRY_TABLE", "Registry")
- .put("REGISTRAR_TABLE", "Registrar")
- .put("CANCELLATION_TABLE", "Cancellation")
- .build();
- });
- }
-
- /** Returns the {@link String} contents for a file in the {@code beam/sql/} directory. */
- private static String getQueryFromFile(String filename) {
- return ResourceUtils.readResourceUtf8(
- Resources.getResource(InvoicingUtils.class, "sql/" + filename));
- }
-}
diff --git a/java/google/registry/beam/sql/billing_events.sql b/java/google/registry/beam/sql/billing_events.sql
deleted file mode 100644
index 2757fbe5f..000000000
--- a/java/google/registry/beam/sql/billing_events.sql
+++ /dev/null
@@ -1,100 +0,0 @@
-#standardSQL
- -- Copyright 2017 The Nomulus Authors. All Rights Reserved.
- --
- -- Licensed under the Apache License, Version 2.0 (the "License");
- -- you may not use this file except in compliance with the License.
- -- You may obtain a copy of the License at
- --
- -- http://www.apache.org/licenses/LICENSE-2.0
- --
- -- Unless required by applicable law or agreed to in writing, software
- -- distributed under the License is distributed on an "AS IS" BASIS,
- -- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- -- See the License for the specific language governing permissions and
- -- limitations under the License.
-
- -- This query gathers all non-canceled billing events for a given
- -- YEAR_MONTH in yyyy-MM format.
-
-SELECT
- __key__.id AS id,
- billingTime,
- eventTime,
- BillingEvent.clientId AS registrarId,
- RegistrarData.accountId AS billingId,
- tld,
- reason as action,
- targetId as domain,
- BillingEvent.domainRepoId as repositoryId,
- periodYears as years,
- BillingEvent.currency AS currency,
- BillingEvent.amount as amount,
- -- We'll strip out non-useful flags downstream
- ARRAY_TO_STRING(flags, " ") AS flags
-FROM (
- SELECT
- *,
- -- We store cost as "CURRENCY AMOUNT" such as "JPY 800" or "USD 20.00"
- SPLIT(cost, ' ')[OFFSET(0)] AS currency,
- SPLIT(cost, ' ')[OFFSET(1)] AS amount,
- -- Extract everything after the first dot in the domain as the TLD
- REGEXP_EXTRACT(targetId, r'[.](.+)') AS tld,
- -- __key__.path looks like '"DomainBase", "", ...'
- REGEXP_REPLACE(SPLIT(__key__.path, ', ')[OFFSET(1)], '"', '')
- AS domainRepoId,
- COALESCE(cancellationMatchingBillingEvent.path,
- __key__.path) AS cancellationMatchingPath
- FROM
- `%PROJECT_ID%.%DATASTORE_EXPORT_DATA_SET%.%ONETIME_TABLE%`
- -- Only include real TLDs (filter prober data)
- WHERE
- REGEXP_EXTRACT(targetId, r'[.](.+)') IN (
- SELECT
- tldStr
- FROM
- `%PROJECT_ID%.%DATASTORE_EXPORT_DATA_SET%.%REGISTRY_TABLE%`
- WHERE
- -- TODO(b/18092292): Add a filter for tldState (not PDT/PREDELEGATION)
- tldType = 'REAL') ) AS BillingEvent
- -- Gather billing ID from registrar table
- -- This is a 'JOIN' as opposed to 'LEFT JOIN' to filter out
- -- non-billable registrars
-JOIN (
- SELECT
- __key__.name AS clientId,
- billingIdentifier,
- r.billingAccountMap.currency[SAFE_OFFSET(index)] AS currency,
- r.billingAccountMap.accountId[SAFE_OFFSET(index)] AS accountId
- FROM
- `%PROJECT_ID%.%DATASTORE_EXPORT_DATA_SET%.%REGISTRAR_TABLE%` AS r,
- UNNEST(GENERATE_ARRAY(0, ARRAY_LENGTH(r.billingAccountMap.currency) - 1))
- AS index
- WHERE billingAccountMap IS NOT NULL
- AND type = 'REAL') AS RegistrarData
-ON
- BillingEvent.clientId = RegistrarData.clientId
- AND BillingEvent.currency = RegistrarData.currency
- -- Gather cancellations
-LEFT JOIN (
- SELECT __key__.id AS cancellationId,
- COALESCE(refOneTime.path, refRecurring.path) AS cancelledEventPath,
- eventTime as cancellationTime,
- billingTime as cancellationBillingTime
- FROM
- (SELECT
- *,
- -- Count everything after first dot as TLD (to support multi-part TLDs).
- REGEXP_EXTRACT(targetId, r'[.](.+)') AS tld
- FROM
- `%PROJECT_ID%.%DATASTORE_EXPORT_DATA_SET%.%CANCELLATION_TABLE%`)
-) AS Cancellation
-ON BillingEvent.cancellationMatchingPath = Cancellation.cancelledEventPath
-AND BillingEvent.billingTime = Cancellation.cancellationBillingTime
-WHERE billingTime BETWEEN TIMESTAMP('%FIRST_TIMESTAMP_OF_MONTH%')
- AND TIMESTAMP('%LAST_TIMESTAMP_OF_MONTH%')
--- Filter out canceled events
-AND Cancellation.cancellationId IS NULL
-ORDER BY
- billingTime DESC,
- id,
- tld
diff --git a/java/google/registry/config/RegistryConfig.java b/java/google/registry/config/RegistryConfig.java
index e13ad841c..428925a41 100644
--- a/java/google/registry/config/RegistryConfig.java
+++ b/java/google/registry/config/RegistryConfig.java
@@ -510,7 +510,7 @@ public final class RegistryConfig {
* Returns the URL of the GCS location for storing the monthly invoicing Beam template.
*
* @see google.registry.reporting.billing.GenerateInvoicesAction
- * @see google.registry.beam.InvoicingPipeline
+ * @see google.registry.beam.invoicing.InvoicingPipeline
*/
@Provides
@Config("invoiceTemplateUrl")
@@ -522,7 +522,7 @@ public final class RegistryConfig {
/**
* Returns the URL of the GCS location we store jar dependencies for the invoicing pipeline.
*
- * @see google.registry.beam.InvoicingPipeline
+ * @see google.registry.beam.invoicing.InvoicingPipeline
*/
@Provides
@Config("invoiceStagingUrl")
@@ -580,7 +580,7 @@ public final class RegistryConfig {
/**
* Returns the URL of the GCS bucket we store invoices and detail reports in.
*
- * @see google.registry.beam.InvoicingPipeline
+ * @see google.registry.beam.invoicing.InvoicingPipeline
*/
@Provides
@Config("billingBucketUrl")
diff --git a/java/google/registry/reporting/billing/GenerateInvoicesAction.java b/java/google/registry/reporting/billing/GenerateInvoicesAction.java
index a57c445c1..fc40e938f 100644
--- a/java/google/registry/reporting/billing/GenerateInvoicesAction.java
+++ b/java/google/registry/reporting/billing/GenerateInvoicesAction.java
@@ -43,9 +43,9 @@ import org.joda.time.YearMonth;
* Invokes the {@code InvoicingPipeline} beam template via the REST api, and enqueues the {@link
* PublishInvoicesAction} to publish the subsequent output.
*
- * This action runs the {@link google.registry.beam.InvoicingPipeline} beam template, staged at
- * gs://-beam/templates/invoicing. The pipeline then generates invoices for the month and
- * stores them on GCS.
+ * This action runs the {@link google.registry.beam.invoicing.InvoicingPipeline} beam template,
+ * staged at gs://-beam/templates/invoicing. The pipeline then generates invoices for the
+ * month and stores them on GCS.
*/
@Action(path = GenerateInvoicesAction.PATH, method = POST, auth = Auth.AUTH_INTERNAL_ONLY)
public class GenerateInvoicesAction implements Runnable {
diff --git a/java/google/registry/reporting/billing/PublishInvoicesAction.java b/java/google/registry/reporting/billing/PublishInvoicesAction.java
index 1d61d7e00..e170a9dd6 100644
--- a/java/google/registry/reporting/billing/PublishInvoicesAction.java
+++ b/java/google/registry/reporting/billing/PublishInvoicesAction.java
@@ -37,7 +37,7 @@ import javax.inject.Inject;
import org.joda.time.YearMonth;
/**
- * Uploads the results of the {@link google.registry.beam.InvoicingPipeline}.
+ * Uploads the results of the {@link google.registry.beam.invoicing.InvoicingPipeline}.
*
* This relies on the retry semantics in {@code queue.xml} to ensure proper upload, in spite of
* fluctuations in generation timing.
diff --git a/java/google/registry/tools/BUILD b/java/google/registry/tools/BUILD
index b5c3197c9..3def12767 100644
--- a/java/google/registry/tools/BUILD
+++ b/java/google/registry/tools/BUILD
@@ -35,7 +35,7 @@ java_library(
visibility = [":allowed-tools"],
deps = [
"//java/google/registry/backup",
- "//java/google/registry/beam",
+ "//java/google/registry/beam/invoicing",
"//java/google/registry/bigquery",
"//java/google/registry/config",
"//java/google/registry/dns",
diff --git a/java/google/registry/tools/DeployInvoicingPipelineCommand.java b/java/google/registry/tools/DeployInvoicingPipelineCommand.java
index 13e074f5d..d76829062 100644
--- a/java/google/registry/tools/DeployInvoicingPipelineCommand.java
+++ b/java/google/registry/tools/DeployInvoicingPipelineCommand.java
@@ -15,10 +15,10 @@
package google.registry.tools;
import com.beust.jcommander.Parameters;
-import google.registry.beam.InvoicingPipeline;
+import google.registry.beam.invoicing.InvoicingPipeline;
import javax.inject.Inject;
-/** Nomulus command that deploys the {@link google.registry.beam.InvoicingPipeline} template. */
+/** Nomulus command that deploys the {@link InvoicingPipeline} template. */
@Parameters(commandDescription = "Deploy the invoicing pipeline to GCS.")
public class DeployInvoicingPipelineCommand implements Command {
diff --git a/javatests/google/registry/beam/BUILD b/javatests/google/registry/beam/BUILD
index bd1f701c9..86750c423 100644
--- a/javatests/google/registry/beam/BUILD
+++ b/javatests/google/registry/beam/BUILD
@@ -9,31 +9,4 @@ load("//java/com/google/testing/builddefs:GenTestRules.bzl", "GenTestRules")
java_library(
name = "beam",
- srcs = glob(["*.java"]),
- resources = glob(["testdata/*"]),
- deps = [
- "//java/google/registry/beam",
- "//java/google/registry/util",
- "//javatests/google/registry/testing",
- "@com_google_apis_google_api_services_bigquery",
- "@com_google_dagger",
- "@com_google_guava",
- "@com_google_truth",
- "@com_google_truth_extensions_truth_java8_extension",
- "@junit",
- "@org_apache_avro",
- "@org_apache_beam_runners_direct_java",
- "@org_apache_beam_runners_google_cloud_dataflow_java",
- "@org_apache_beam_sdks_java_core",
- "@org_apache_beam_sdks_java_io_google_cloud_platform",
- "@org_mockito_all",
- ],
-)
-
-GenTestRules(
- name = "GeneratedTestRules",
- default_test_size = "small",
- medium_tests = ["InvoicingPipelineTest.java"],
- test_files = glob(["*Test.java"]),
- deps = [":beam"],
)
diff --git a/javatests/google/registry/beam/BigqueryTemplatePipelineTest.java b/javatests/google/registry/beam/BigqueryTemplatePipelineTest.java
deleted file mode 100644
index 5a441e77c..000000000
--- a/javatests/google/registry/beam/BigqueryTemplatePipelineTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-// Copyright 2017 The Nomulus Authors. All Rights Reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package google.registry.beam;
-
-import static com.google.common.truth.Truth.assertThat;
-
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.common.collect.ImmutableList;
-import google.registry.beam.BigqueryTemplatePipeline.CountRequestPaths;
-import google.registry.beam.BigqueryTemplatePipeline.ExtractRequestPathFn;
-import java.util.List;
-import org.apache.beam.runners.direct.DirectRunner;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFnTester;
-import org.apache.beam.sdk.values.PCollection;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Unit tests for {@link BigqueryTemplatePipeline}*/
-@RunWith(JUnit4.class)
-public class BigqueryTemplatePipelineTest {
-
- private static PipelineOptions pipelineOptions;
-
- @BeforeClass
- public static void initializePipelineOptions() {
- pipelineOptions = PipelineOptionsFactory.create();
- pipelineOptions.setRunner(DirectRunner.class);
- }
-
- @Rule public final transient TestPipeline p = TestPipeline.fromOptions(pipelineOptions);
-
- @Test
- public void testExtractRequestPathFn() throws Exception {
- ExtractRequestPathFn extractRequestPathFn = new ExtractRequestPathFn();
- DoFnTester fnTester = DoFnTester.of(extractRequestPathFn);
- TableRow emptyRow = new TableRow();
- TableRow hasRequestPathRow = new TableRow().set("requestPath", "a/path");
- TableRow hasOtherValueRow = new TableRow().set("anotherValue", "b/lah");
- List outputs = fnTester.processBundle(emptyRow, hasRequestPathRow, hasOtherValueRow);
- assertThat(outputs).containsExactly("null", "a/path", "null");
- }
-
- @Test
- public void testEndToEndPipeline() {
- ImmutableList inputRows =
- ImmutableList.of(
- new TableRow(),
- new TableRow().set("requestPath", "a/path"),
- new TableRow().set("requestPath", "b/path"),
- new TableRow().set("requestPath", "b/path"),
- new TableRow().set("anotherValue", "b/path"));
-
- PCollection input = p.apply(Create.of(inputRows));
- PCollection output = input.apply(new CountRequestPaths());
-
- ImmutableList outputStrings = new ImmutableList.Builder()
- .add("a/path: 1")
- .add("b/path: 2")
- .add("null: 2")
- .build();
- PAssert.that(output).containsInAnyOrder(outputStrings);
- p.run();
- }
-}
diff --git a/javatests/google/registry/beam/BillingEventTest.java b/javatests/google/registry/beam/BillingEventTest.java
deleted file mode 100644
index 25773188e..000000000
--- a/javatests/google/registry/beam/BillingEventTest.java
+++ /dev/null
@@ -1,188 +0,0 @@
-// Copyright 2017 The Nomulus Authors. All Rights Reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package google.registry.beam;
-
-import static com.google.common.truth.Truth.assertThat;
-import static google.registry.testing.JUnitBackports.assertThrows;
-
-import google.registry.beam.BillingEvent.InvoiceGroupingKey;
-import google.registry.beam.BillingEvent.InvoiceGroupingKey.InvoiceGroupingKeyCoder;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.time.ZoneId;
-import java.time.ZonedDateTime;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Unit tests for {@link BillingEvent} */
-@RunWith(JUnit4.class)
-public class BillingEventTest {
-
- private static final String BILLING_EVENT_SCHEMA =
- "{\"name\": \"BillingEvent\", "
- + "\"type\": \"record\", "
- + "\"fields\": ["
- + "{\"name\": \"id\", \"type\": \"long\"},"
- + "{\"name\": \"billingTime\", \"type\": \"string\"},"
- + "{\"name\": \"eventTime\", \"type\": \"string\"},"
- + "{\"name\": \"registrarId\", \"type\": \"string\"},"
- + "{\"name\": \"billingId\", \"type\": \"long\"},"
- + "{\"name\": \"tld\", \"type\": \"string\"},"
- + "{\"name\": \"action\", \"type\": \"string\"},"
- + "{\"name\": \"domain\", \"type\": \"string\"},"
- + "{\"name\": \"repositoryId\", \"type\": \"string\"},"
- + "{\"name\": \"years\", \"type\": \"int\"},"
- + "{\"name\": \"currency\", \"type\": \"string\"},"
- + "{\"name\": \"amount\", \"type\": \"float\"},"
- + "{\"name\": \"flags\", \"type\": \"string\"}"
- + "]}";
-
- private SchemaAndRecord schemaAndRecord;
-
- @Before
- public void initializeRecord() {
- // Create a record with a given JSON schema.
- GenericRecord record = new GenericData.Record(new Schema.Parser().parse(BILLING_EVENT_SCHEMA));
- record.put("id", "1");
- record.put("billingTime", 1508835963000000L);
- record.put("eventTime", 1484870383000000L);
- record.put("registrarId", "myRegistrar");
- record.put("billingId", "12345-CRRHELLO");
- record.put("tld", "test");
- record.put("action", "RENEW");
- record.put("domain", "example.test");
- record.put("repositoryId", "123456");
- record.put("years", 5);
- record.put("currency", "USD");
- record.put("amount", 20.5);
- record.put("flags", "AUTO_RENEW SYNTHETIC");
- schemaAndRecord = new SchemaAndRecord(record, null);
- }
-
- @Test
- public void testParseBillingEventFromRecord_success() {
- BillingEvent event = BillingEvent.parseFromRecord(schemaAndRecord);
- assertThat(event.id()).isEqualTo(1);
- assertThat(event.billingTime())
- .isEqualTo(ZonedDateTime.of(2017, 10, 24, 9, 6, 3, 0, ZoneId.of("UTC")));
- assertThat(event.eventTime())
- .isEqualTo(ZonedDateTime.of(2017, 1, 19, 23, 59, 43, 0, ZoneId.of("UTC")));
- assertThat(event.registrarId()).isEqualTo("myRegistrar");
- assertThat(event.billingId()).isEqualTo("12345-CRRHELLO");
- assertThat(event.tld()).isEqualTo("test");
- assertThat(event.action()).isEqualTo("RENEW");
- assertThat(event.domain()).isEqualTo("example.test");
- assertThat(event.repositoryId()).isEqualTo("123456");
- assertThat(event.years()).isEqualTo(5);
- assertThat(event.currency()).isEqualTo("USD");
- assertThat(event.amount()).isEqualTo(20.5);
- assertThat(event.flags()).isEqualTo("AUTO_RENEW SYNTHETIC");
- }
-
- @Test
- public void testParseBillingEventFromRecord_sunriseCreate_reducedPrice_success() {
- schemaAndRecord.getRecord().put("flags", "SUNRISE");
- BillingEvent event = BillingEvent.parseFromRecord(schemaAndRecord);
- assertThat(event.amount()).isEqualTo(17.43);
- assertThat(event.flags()).isEqualTo("SUNRISE");
- }
-
- @Test
- public void testParseBillingEventFromRecord_anchorTenant_zeroPrice_success() {
- schemaAndRecord.getRecord().put("flags", "SUNRISE ANCHOR_TENANT");
- BillingEvent event = BillingEvent.parseFromRecord(schemaAndRecord);
- assertThat(event.amount()).isZero();
- assertThat(event.flags()).isEqualTo("SUNRISE ANCHOR_TENANT");
- }
-
- @Test
- public void testParseBillingEventFromRecord_nullValue_throwsException() {
- schemaAndRecord.getRecord().put("tld", null);
- assertThrows(IllegalStateException.class, () -> BillingEvent.parseFromRecord(schemaAndRecord));
- }
-
- @Test
- public void testConvertBillingEvent_toCsv() {
- BillingEvent event = BillingEvent.parseFromRecord(schemaAndRecord);
- assertThat(event.toCsv())
- .isEqualTo("1,2017-10-24 09:06:03 UTC,2017-01-19 23:59:43 UTC,myRegistrar,"
- + "12345-CRRHELLO,test,RENEW,example.test,123456,5,USD,20.50,AUTO_RENEW");
- }
-
- @Test
- public void testGenerateBillingEventFilename() {
- BillingEvent event = BillingEvent.parseFromRecord(schemaAndRecord);
- assertThat(event.toFilename("2017-10")).isEqualTo("invoice_details_2017-10_myRegistrar_test");
- }
-
- @Test
- public void testGetInvoiceGroupingKey_fromBillingEvent() {
- BillingEvent event = BillingEvent.parseFromRecord(schemaAndRecord);
- InvoiceGroupingKey invoiceKey = event.getInvoiceGroupingKey();
- assertThat(invoiceKey.startDate()).isEqualTo("2017-10-01");
- assertThat(invoiceKey.endDate()).isEqualTo("2022-09-30");
- assertThat(invoiceKey.productAccountKey()).isEqualTo("12345-CRRHELLO");
- assertThat(invoiceKey.usageGroupingKey()).isEqualTo("myRegistrar - test");
- assertThat(invoiceKey.description()).isEqualTo("RENEW | TLD: test | TERM: 5-year");
- assertThat(invoiceKey.unitPrice()).isEqualTo(20.5);
- assertThat(invoiceKey.unitPriceCurrency()).isEqualTo("USD");
- assertThat(invoiceKey.poNumber()).isEmpty();
- }
-
- @Test
- public void testConvertInvoiceGroupingKey_toCsv() {
- BillingEvent event = BillingEvent.parseFromRecord(schemaAndRecord);
- InvoiceGroupingKey invoiceKey = event.getInvoiceGroupingKey();
- assertThat(invoiceKey.toCsv(3L))
- .isEqualTo(
- "2017-10-01,2022-09-30,12345-CRRHELLO,61.50,USD,10125,1,PURCHASE,"
- + "myRegistrar - test,3,RENEW | TLD: test | TERM: 5-year,20.50,USD,");
- }
-
- @Test
- public void testInvoiceGroupingKeyCoder_deterministicSerialization() throws IOException {
- InvoiceGroupingKey invoiceKey =
- BillingEvent.parseFromRecord(schemaAndRecord).getInvoiceGroupingKey();
- InvoiceGroupingKeyCoder coder = new InvoiceGroupingKeyCoder();
- ByteArrayOutputStream outStream = new ByteArrayOutputStream();
- coder.encode(invoiceKey, outStream);
- InputStream inStream = new ByteArrayInputStream(outStream.toByteArray());
- assertThat(coder.decode(inStream)).isEqualTo(invoiceKey);
- }
-
- @Test
- public void testGetDetailReportHeader() {
- assertThat(BillingEvent.getHeader())
- .isEqualTo(
- "id,billingTime,eventTime,registrarId,billingId,tld,action,"
- + "domain,repositoryId,years,currency,amount,flags");
- }
-
- @Test
- public void testGetOverallInvoiceHeader() {
- assertThat(InvoiceGroupingKey.invoiceHeader())
- .isEqualTo("StartDate,EndDate,ProductAccountKey,Amount,AmountCurrency,BillingProductCode,"
- + "SalesChannel,LineItemType,UsageGroupingKey,Quantity,Description,UnitPrice,"
- + "UnitPriceCurrency,PONumber");
- }
-}
diff --git a/javatests/google/registry/beam/InvoicingPipelineTest.java b/javatests/google/registry/beam/InvoicingPipelineTest.java
deleted file mode 100644
index 678a458be..000000000
--- a/javatests/google/registry/beam/InvoicingPipelineTest.java
+++ /dev/null
@@ -1,215 +0,0 @@
-// Copyright 2017 The Nomulus Authors. All Rights Reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package google.registry.beam;
-
-import static com.google.common.truth.Truth.assertThat;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import google.registry.util.ResourceUtils;
-import java.io.File;
-import java.io.IOException;
-import java.time.ZoneId;
-import java.time.ZonedDateTime;
-import java.util.Map.Entry;
-import org.apache.beam.runners.direct.DirectRunner;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.values.PCollection;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Unit tests for {@link InvoicingPipeline}. */
-@RunWith(JUnit4.class)
-public class InvoicingPipelineTest {
-
- private static PipelineOptions pipelineOptions;
-
- @BeforeClass
- public static void initializePipelineOptions() {
- pipelineOptions = PipelineOptionsFactory.create();
- pipelineOptions.setRunner(DirectRunner.class);
- }
-
- @Rule public final transient TestPipeline p = TestPipeline.fromOptions(pipelineOptions);
- @Rule public final TemporaryFolder tempFolder = new TemporaryFolder();
-
- private InvoicingPipeline invoicingPipeline;
-
- @Before
- public void initializePipeline() throws IOException {
- invoicingPipeline = new InvoicingPipeline();
- invoicingPipeline.projectId = "test-project";
- File beamTempFolder = tempFolder.newFolder();
- invoicingPipeline.beamBucketUrl = beamTempFolder.getAbsolutePath();
- invoicingPipeline.invoiceStagingUrl = beamTempFolder.getAbsolutePath() + "/staging";
- invoicingPipeline.invoiceTemplateUrl =
- beamTempFolder.getAbsolutePath() + "/templates/invoicing";
- invoicingPipeline.billingBucketUrl = tempFolder.getRoot().getAbsolutePath();
- }
-
- private ImmutableList getInputEvents() {
- return ImmutableList.of(
- BillingEvent.create(
- 1,
- ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
- ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
- "theRegistrar",
- "234",
- "test",
- "RENEW",
- "mydomain.test",
- "REPO-ID",
- 3,
- "USD",
- 20.5,
- ""),
- BillingEvent.create(
- 1,
- ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
- ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
- "theRegistrar",
- "234",
- "test",
- "RENEW",
- "mydomain2.test",
- "REPO-ID",
- 3,
- "USD",
- 20.5,
- ""),
- BillingEvent.create(
- 1,
- ZonedDateTime.of(2017, 10, 2, 0, 0, 0, 0, ZoneId.of("UTC")),
- ZonedDateTime.of(2017, 9, 29, 0, 0, 0, 0, ZoneId.of("UTC")),
- "theRegistrar",
- "234",
- "hello",
- "CREATE",
- "mydomain3.hello",
- "REPO-ID",
- 5,
- "JPY",
- 70.75,
- ""),
- BillingEvent.create(
- 1,
- ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
- ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
- "googledomains",
- "456",
- "test",
- "RENEW",
- "mydomain4.test",
- "REPO-ID",
- 1,
- "USD",
- 20.5,
- ""),
- BillingEvent.create(
- 1,
- ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
- ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
- "anotherRegistrar",
- "789",
- "test",
- "CREATE",
- "mydomain5.test",
- "REPO-ID",
- 1,
- "USD",
- 0,
- "SUNRISE ANCHOR_TENANT"));
- }
-
- /** Returns a map from filename to expected contents for detail reports. */
- private ImmutableMap> getExpectedDetailReportMap() {
- return ImmutableMap.of(
- "invoice_details_2017-10_theRegistrar_test.csv",
- ImmutableList.of(
- "1,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,theRegistrar,234,"
- + "test,RENEW,mydomain2.test,REPO-ID,3,USD,20.50,",
- "1,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,theRegistrar,234,"
- + "test,RENEW,mydomain.test,REPO-ID,3,USD,20.50,"),
- "invoice_details_2017-10_theRegistrar_hello.csv",
- ImmutableList.of(
- "1,2017-10-02 00:00:00 UTC,2017-09-29 00:00:00 UTC,theRegistrar,234,"
- + "hello,CREATE,mydomain3.hello,REPO-ID,5,JPY,70.75,"),
- "invoice_details_2017-10_googledomains_test.csv",
- ImmutableList.of(
- "1,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,googledomains,456,"
- + "test,RENEW,mydomain4.test,REPO-ID,1,USD,20.50,"),
- "invoice_details_2017-10_anotherRegistrar_test.csv",
- ImmutableList.of(
- "1,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,anotherRegistrar,789,"
- + "test,CREATE,mydomain5.test,REPO-ID,1,USD,0.00,SUNRISE ANCHOR_TENANT"));
- }
-
- private ImmutableList getExpectedInvoiceOutput() {
- return ImmutableList.of(
- "2017-10-01,2020-09-30,234,41.00,USD,10125,1,PURCHASE,theRegistrar - test,2,"
- + "RENEW | TLD: test | TERM: 3-year,20.50,USD,",
- "2017-10-01,2022-09-30,234,70.75,JPY,10125,1,PURCHASE,theRegistrar - hello,1,"
- + "CREATE | TLD: hello | TERM: 5-year,70.75,JPY,",
- "2017-10-01,2018-09-30,456,20.50,USD,10125,1,PURCHASE,googledomains - test,1,"
- + "RENEW | TLD: test | TERM: 1-year,20.50,USD,",
- "2017-10-01,2018-09-30,789,0.00,USD,10125,1,PURCHASE,anotherRegistrar - test,1,"
- + "CREATE | TLD: test | TERM: 1-year,0.00,USD,");
- }
-
- @Test
- public void testEndToEndPipeline_generatesExpectedFiles() throws Exception {
- ImmutableList inputRows = getInputEvents();
- PCollection input = p.apply(Create.of(inputRows));
- invoicingPipeline.applyTerminalTransforms(input, StaticValueProvider.of("2017-10"));
- p.run();
-
- for (Entry> entry : getExpectedDetailReportMap().entrySet()) {
- ImmutableList detailReport = resultFileContents(entry.getKey());
- assertThat(detailReport.get(0))
- .isEqualTo("id,billingTime,eventTime,registrarId,billingId,tld,action,"
- + "domain,repositoryId,years,currency,amount,flags");
- assertThat(detailReport.subList(1, detailReport.size()))
- .containsExactlyElementsIn(entry.getValue());
- }
-
- ImmutableList overallInvoice = resultFileContents("CRR-INV-2017-10.csv");
- assertThat(overallInvoice.get(0))
- .isEqualTo(
- "StartDate,EndDate,ProductAccountKey,Amount,AmountCurrency,BillingProductCode,"
- + "SalesChannel,LineItemType,UsageGroupingKey,Quantity,Description,UnitPrice,"
- + "UnitPriceCurrency,PONumber");
- assertThat(overallInvoice.subList(1, overallInvoice.size()))
- .containsExactlyElementsIn(getExpectedInvoiceOutput());
- }
-
- /** Returns the text contents of a file under the beamBucket/results directory. */
- private ImmutableList resultFileContents(String filename) throws Exception {
- File resultFile =
- new File(
- String.format(
- "%s/invoices/2017-10/%s", tempFolder.getRoot().getAbsolutePath(), filename));
- return ImmutableList.copyOf(
- ResourceUtils.readResourceUtf8(resultFile.toURI().toURL()).split("\n"));
- }
-}
diff --git a/javatests/google/registry/beam/InvoicingUtilsTest.java b/javatests/google/registry/beam/InvoicingUtilsTest.java
deleted file mode 100644
index 4b19c173d..000000000
--- a/javatests/google/registry/beam/InvoicingUtilsTest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-// Copyright 2017 The Nomulus Authors. All Rights Reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package google.registry.beam;
-
-import static com.google.common.truth.Truth.assertThat;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import google.registry.testing.TestDataHelper;
-import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
-import org.apache.beam.sdk.io.FileBasedSink;
-import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Unit tests for {@link InvoicingUtils}. */
-@RunWith(JUnit4.class)
-public class InvoicingUtilsTest {
-
- @Test
- public void testDestinationFunction_generatesProperFileParams() {
- SerializableFunction destinationFunction =
- InvoicingUtils.makeDestinationFunction("my/directory", StaticValueProvider.of("2017-10"));
-
- BillingEvent billingEvent = mock(BillingEvent.class);
- // We mock BillingEvent to make the test independent of the implementation of toFilename()
- when(billingEvent.toFilename(any())).thenReturn("invoice_details_2017-10_registrar_tld");
-
- assertThat(destinationFunction.apply(billingEvent))
- .isEqualTo(
- new Params()
- .withShardTemplate("")
- .withSuffix(".csv")
- .withBaseFilename(
- FileBasedSink.convertToFileResourceIfPossible(
- "my/directory/2017-10/invoice_details_2017-10_registrar_tld")));
- }
-
- @Test
- public void testEmptyDestinationParams() {
- assertThat(InvoicingUtils.makeEmptyDestinationParams("my/directory"))
- .isEqualTo(
- new Params()
- .withBaseFilename(
- FileBasedSink.convertToFileResourceIfPossible("my/directory/FAILURES")));
- }
-
- /** Asserts that the instantiated sql template matches a golden expected file. */
- @Test
- public void testMakeQueryProvider() {
- ValueProvider queryProvider =
- InvoicingUtils.makeQueryProvider(StaticValueProvider.of("2017-10"), "my-project-id");
- assertThat(queryProvider.get()).isEqualTo(loadFile("billing_events_test.sql"));
- }
-
- /** Returns a {@link String} from a file in the {@code billing/testdata/} directory. */
- private static String loadFile(String filename) {
- return TestDataHelper.loadFile(InvoicingUtilsTest.class, filename);
- }
-}
diff --git a/javatests/google/registry/beam/testdata/billing_events_test.sql b/javatests/google/registry/beam/testdata/billing_events_test.sql
deleted file mode 100644
index 4a1b32783..000000000
--- a/javatests/google/registry/beam/testdata/billing_events_test.sql
+++ /dev/null
@@ -1,100 +0,0 @@
-#standardSQL
- -- Copyright 2017 The Nomulus Authors. All Rights Reserved.
- --
- -- Licensed under the Apache License, Version 2.0 (the "License");
- -- you may not use this file except in compliance with the License.
- -- You may obtain a copy of the License at
- --
- -- http://www.apache.org/licenses/LICENSE-2.0
- --
- -- Unless required by applicable law or agreed to in writing, software
- -- distributed under the License is distributed on an "AS IS" BASIS,
- -- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- -- See the License for the specific language governing permissions and
- -- limitations under the License.
-
- -- This query gathers all non-canceled billing events for a given
- -- YEAR_MONTH in yyyy-MM format.
-
-SELECT
- __key__.id AS id,
- billingTime,
- eventTime,
- BillingEvent.clientId AS registrarId,
- RegistrarData.accountId AS billingId,
- tld,
- reason as action,
- targetId as domain,
- BillingEvent.domainRepoId as repositoryId,
- periodYears as years,
- BillingEvent.currency AS currency,
- BillingEvent.amount as amount,
- -- We'll strip out non-useful flags downstream
- ARRAY_TO_STRING(flags, " ") AS flags
-FROM (
- SELECT
- *,
- -- We store cost as "CURRENCY AMOUNT" such as "JPY 800" or "USD 20.00"
- SPLIT(cost, ' ')[OFFSET(0)] AS currency,
- SPLIT(cost, ' ')[OFFSET(1)] AS amount,
- -- Extract everything after the first dot in the domain as the TLD
- REGEXP_EXTRACT(targetId, r'[.](.+)') AS tld,
- -- __key__.path looks like '"DomainBase", "", ...'
- REGEXP_REPLACE(SPLIT(__key__.path, ', ')[OFFSET(1)], '"', '')
- AS domainRepoId,
- COALESCE(cancellationMatchingBillingEvent.path,
- __key__.path) AS cancellationMatchingPath
- FROM
- `my-project-id.latest_datastore_export.OneTime`
- -- Only include real TLDs (filter prober data)
- WHERE
- REGEXP_EXTRACT(targetId, r'[.](.+)') IN (
- SELECT
- tldStr
- FROM
- `my-project-id.latest_datastore_export.Registry`
- WHERE
- -- TODO(b/18092292): Add a filter for tldState (not PDT/PREDELEGATION)
- tldType = 'REAL') ) AS BillingEvent
- -- Gather billing ID from registrar table
- -- This is a 'JOIN' as opposed to 'LEFT JOIN' to filter out
- -- non-billable registrars
-JOIN (
- SELECT
- __key__.name AS clientId,
- billingIdentifier,
- r.billingAccountMap.currency[SAFE_OFFSET(index)] AS currency,
- r.billingAccountMap.accountId[SAFE_OFFSET(index)] AS accountId
- FROM
- `my-project-id.latest_datastore_export.Registrar` AS r,
- UNNEST(GENERATE_ARRAY(0, ARRAY_LENGTH(r.billingAccountMap.currency) - 1))
- AS index
- WHERE billingAccountMap IS NOT NULL
- AND type = 'REAL') AS RegistrarData
-ON
- BillingEvent.clientId = RegistrarData.clientId
- AND BillingEvent.currency = RegistrarData.currency
- -- Gather cancellations
-LEFT JOIN (
- SELECT __key__.id AS cancellationId,
- COALESCE(refOneTime.path, refRecurring.path) AS cancelledEventPath,
- eventTime as cancellationTime,
- billingTime as cancellationBillingTime
- FROM
- (SELECT
- *,
- -- Count everything after first dot as TLD (to support multi-part TLDs).
- REGEXP_EXTRACT(targetId, r'[.](.+)') AS tld
- FROM
- `my-project-id.latest_datastore_export.Cancellation`)
-) AS Cancellation
-ON BillingEvent.cancellationMatchingPath = Cancellation.cancelledEventPath
-AND BillingEvent.billingTime = Cancellation.cancellationBillingTime
-WHERE billingTime BETWEEN TIMESTAMP('2017-10-01 00:00:00.000000')
- AND TIMESTAMP('2017-10-31 23:59:59.999999')
--- Filter out canceled events
-AND Cancellation.cancellationId IS NULL
-ORDER BY
- billingTime DESC,
- id,
- tld