diff --git a/java/google/registry/beam/spec11/BUILD b/java/google/registry/beam/spec11/BUILD
new file mode 100644
index 000000000..8cc68eee6
--- /dev/null
+++ b/java/google/registry/beam/spec11/BUILD
@@ -0,0 +1,26 @@
+package(
+ default_visibility = ["//visibility:public"],
+)
+
+licenses(["notice"]) # Apache 2.0
+
+java_library(
+ name = "spec11",
+ srcs = glob(["*.java"]),
+ resources = glob(["sql/*"]),
+ deps = [
+ "//java/google/registry/beam",
+ "//java/google/registry/config",
+ "@com_google_auto_value",
+ "@com_google_dagger",
+ "@com_google_flogger",
+ "@com_google_flogger_system_backend",
+ "@com_google_guava",
+ "@javax_inject",
+ "@org_apache_avro",
+ "@org_apache_beam_runners_direct_java",
+ "@org_apache_beam_runners_google_cloud_dataflow_java",
+ "@org_apache_beam_sdks_java_core",
+ "@org_apache_beam_sdks_java_io_google_cloud_platform",
+ ],
+)
diff --git a/java/google/registry/beam/spec11/Spec11Pipeline.java b/java/google/registry/beam/spec11/Spec11Pipeline.java
new file mode 100644
index 000000000..781d8427e
--- /dev/null
+++ b/java/google/registry/beam/spec11/Spec11Pipeline.java
@@ -0,0 +1,118 @@
+// Copyright 2018 The Nomulus Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package google.registry.beam.spec11;
+
+import google.registry.config.RegistryConfig.Config;
+import java.io.Serializable;
+import javax.inject.Inject;
+import org.apache.beam.runners.dataflow.DataflowRunner;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.ToString;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Definition of a Dataflow pipeline template, which generates a given month's spec11 report.
+ *
+ *
To stage this template on GCS, run the {@link
+ * google.registry.tools.DeploySpec11PipelineCommand} Nomulus command.
+ *
+ *
Then, you can run the staged template via the API client library, gCloud or a raw REST call.
+ *
+ * @see Dataflow Templates
+ */
+public class Spec11Pipeline implements Serializable {
+
+ @Inject
+ @Config("projectId")
+ String projectId;
+
+ @Inject
+ @Config("beamStagingUrl")
+ String beamStagingUrl;
+
+ @Inject
+ @Config("spec11TemplateUrl")
+ String spec11TemplateUrl;
+
+ @Inject
+ @Config("spec11BucketUrl")
+ String spec11BucketUrl;
+
+ @Inject
+ Spec11Pipeline() {}
+
+ /** Custom options for running the spec11 pipeline. */
+ interface Spec11PipelineOptions extends DataflowPipelineOptions {
+ /** Returns the yearMonth we're generating the report for, in yyyy-MM format. */
+ @Description("The yearMonth we generate the report for, in yyyy-MM format.")
+ ValueProvider getYearMonth();
+
+ /**
+ * Sets the yearMonth we generate invoices for.
+ *
+ * This is implicitly set when executing the Dataflow template, by specifying the 'yearMonth
+ * parameter.
+ */
+ void setYearMonth(ValueProvider value);
+ }
+
+ /** Deploys the spec11 pipeline as a template on GCS, for a given projectID and GCS bucket. */
+ public void deploy() {
+ // We can't store options as a member variable due to serialization concerns.
+ Spec11PipelineOptions options = PipelineOptionsFactory.as(Spec11PipelineOptions.class);
+ options.setProject(projectId);
+ options.setRunner(DataflowRunner.class);
+ // This causes p.run() to stage the pipeline as a template on GCS, as opposed to running it.
+ options.setTemplateLocation(spec11TemplateUrl);
+ options.setStagingLocation(beamStagingUrl);
+ Pipeline p = Pipeline.create(options);
+ PCollection domains =
+ p.apply(
+ "Read active domains from BigQuery",
+ BigQueryIO.read(Subdomain::parseFromRecord)
+ .fromQuery(
+ // This query must be customized for your own use.
+ "SELECT * FROM YOUR_TABLE_HERE")
+ .withCoder(SerializableCoder.of(Subdomain.class))
+ .usingStandardSql()
+ .withoutValidation()
+ .withTemplateCompatibility());
+ countDomainsAndOutputResults(domains);
+ p.run();
+ }
+
+ /** Globally count the number of elements and output the results to GCS. */
+ void countDomainsAndOutputResults(PCollection domains) {
+ // TODO(b/111545355): Actually process each domain with the SafeBrowsing API
+ domains
+ .apply("Count number of subdomains", Count.globally())
+ .apply("Convert global count to string", ToString.elements())
+ .apply(
+ "Output to text file",
+ TextIO.write()
+ // TODO(b/111545355): Replace this with a templated directory based on yearMonth
+ .to(spec11BucketUrl)
+ .withoutSharding()
+ .withHeader("HELLO WORLD"));
+ }
+}
diff --git a/java/google/registry/beam/spec11/Subdomain.java b/java/google/registry/beam/spec11/Subdomain.java
new file mode 100644
index 000000000..367401c0b
--- /dev/null
+++ b/java/google/registry/beam/spec11/Subdomain.java
@@ -0,0 +1,84 @@
+// Copyright 2018 The Nomulus Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package google.registry.beam.spec11;
+
+import static google.registry.beam.BeamUtils.checkFieldsNotNull;
+import static google.registry.beam.BeamUtils.extractField;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.flogger.FluentLogger;
+import java.io.Serializable;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
+
+/**
+ * A POJO representing a single subdomain, parsed from a {@code SchemaAndRecord}.
+ *
+ * This is a trivially serializable class that allows Beam to transform the results of a Bigquery
+ * query into a standard Java representation, giving us the type guarantees and ease of manipulation
+ * Bigquery lacks, while localizing any Bigquery-side failures to the {@link #parseFromRecord}
+ * function.
+ */
+@AutoValue
+public abstract class Subdomain implements Serializable {
+
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+ private static final ImmutableList FIELD_NAMES =
+ ImmutableList.of("fullyQualifiedDomainName", "statuses", "creationTime");
+
+ /** Returns the fully qualified domain name. */
+ abstract String fullyQualifiedDomainName();
+ /** Returns the UTC DateTime this domain was created. */
+ abstract ZonedDateTime creationTime();
+ /** Returns the space-delimited list of statuses on this domain. */
+ abstract String statuses();
+
+ /**
+ * Constructs a {@link Subdomain} from an Apache Avro {@code SchemaAndRecord}.
+ *
+ * @see
+ * Apache AVRO GenericRecord
+ */
+ static Subdomain parseFromRecord(SchemaAndRecord schemaAndRecord) {
+ checkFieldsNotNull(FIELD_NAMES, schemaAndRecord);
+ GenericRecord record = schemaAndRecord.getRecord();
+ return create(
+ extractField(record, "fullyQualifiedDomainName"),
+ // Bigquery provides UNIX timestamps with microsecond precision.
+ Instant.ofEpochMilli(Long.parseLong(extractField(record, "creationTime")) / 1000)
+ .atZone(ZoneId.of("UTC")),
+ extractField(record, "statuses"));
+ }
+
+ /**
+ * Creates a concrete {@link Subdomain}.
+ *
+ * This should only be used outside this class for testing- instances of {@link Subdomain}
+ * should otherwise come from {@link #parseFromRecord}.
+ */
+ @VisibleForTesting
+ static Subdomain create(
+ String fullyQualifiedDomainName, ZonedDateTime creationTime, String statuses) {
+ return new AutoValue_Subdomain(fullyQualifiedDomainName, creationTime, statuses);
+ }
+}
+
diff --git a/javatests/google/registry/beam/invoicing/BUILD b/javatests/google/registry/beam/invoicing/BUILD
new file mode 100644
index 000000000..31cd2bcac
--- /dev/null
+++ b/javatests/google/registry/beam/invoicing/BUILD
@@ -0,0 +1,39 @@
+package(
+ default_testonly = 1,
+ default_visibility = ["//java/google/registry:registry_project"],
+)
+
+licenses(["notice"]) # Apache 2.0
+
+load("//java/com/google/testing/builddefs:GenTestRules.bzl", "GenTestRules")
+
+java_library(
+ name = "invoicing",
+ srcs = glob(["*.java"]),
+ resources = glob(["testdata/*"]),
+ deps = [
+ "//java/google/registry/beam/invoicing",
+ "//java/google/registry/util",
+ "//javatests/google/registry/testing",
+ "@com_google_apis_google_api_services_bigquery",
+ "@com_google_dagger",
+ "@com_google_guava",
+ "@com_google_truth",
+ "@com_google_truth_extensions_truth_java8_extension",
+ "@junit",
+ "@org_apache_avro",
+ "@org_apache_beam_runners_direct_java",
+ "@org_apache_beam_runners_google_cloud_dataflow_java",
+ "@org_apache_beam_sdks_java_core",
+ "@org_apache_beam_sdks_java_io_google_cloud_platform",
+ "@org_mockito_all",
+ ],
+)
+
+GenTestRules(
+ name = "GeneratedTestRules",
+ default_test_size = "small",
+ medium_tests = ["InvoicingPipelineTest.java"],
+ test_files = glob(["*Test.java"]),
+ deps = [":invoicing"],
+)
diff --git a/javatests/google/registry/beam/invoicing/BillingEventTest.java b/javatests/google/registry/beam/invoicing/BillingEventTest.java
new file mode 100644
index 000000000..822b6ea19
--- /dev/null
+++ b/javatests/google/registry/beam/invoicing/BillingEventTest.java
@@ -0,0 +1,188 @@
+// Copyright 2018 The Nomulus Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package google.registry.beam.invoicing;
+
+import static com.google.common.truth.Truth.assertThat;
+import static google.registry.testing.JUnitBackports.assertThrows;
+
+import google.registry.beam.invoicing.BillingEvent.InvoiceGroupingKey;
+import google.registry.beam.invoicing.BillingEvent.InvoiceGroupingKey.InvoiceGroupingKeyCoder;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link BillingEvent} */
+@RunWith(JUnit4.class)
+public class BillingEventTest {
+
+ private static final String BILLING_EVENT_SCHEMA =
+ "{\"name\": \"BillingEvent\", "
+ + "\"type\": \"record\", "
+ + "\"fields\": ["
+ + "{\"name\": \"id\", \"type\": \"long\"},"
+ + "{\"name\": \"billingTime\", \"type\": \"string\"},"
+ + "{\"name\": \"eventTime\", \"type\": \"string\"},"
+ + "{\"name\": \"registrarId\", \"type\": \"string\"},"
+ + "{\"name\": \"billingId\", \"type\": \"long\"},"
+ + "{\"name\": \"tld\", \"type\": \"string\"},"
+ + "{\"name\": \"action\", \"type\": \"string\"},"
+ + "{\"name\": \"domain\", \"type\": \"string\"},"
+ + "{\"name\": \"repositoryId\", \"type\": \"string\"},"
+ + "{\"name\": \"years\", \"type\": \"int\"},"
+ + "{\"name\": \"currency\", \"type\": \"string\"},"
+ + "{\"name\": \"amount\", \"type\": \"float\"},"
+ + "{\"name\": \"flags\", \"type\": \"string\"}"
+ + "]}";
+
+ private SchemaAndRecord schemaAndRecord;
+
+ @Before
+ public void initializeRecord() {
+ // Create a record with a given JSON schema.
+ GenericRecord record = new GenericData.Record(new Schema.Parser().parse(BILLING_EVENT_SCHEMA));
+ record.put("id", "1");
+ record.put("billingTime", 1508835963000000L);
+ record.put("eventTime", 1484870383000000L);
+ record.put("registrarId", "myRegistrar");
+ record.put("billingId", "12345-CRRHELLO");
+ record.put("tld", "test");
+ record.put("action", "RENEW");
+ record.put("domain", "example.test");
+ record.put("repositoryId", "123456");
+ record.put("years", 5);
+ record.put("currency", "USD");
+ record.put("amount", 20.5);
+ record.put("flags", "AUTO_RENEW SYNTHETIC");
+ schemaAndRecord = new SchemaAndRecord(record, null);
+ }
+
+ @Test
+ public void testParseBillingEventFromRecord_success() {
+ BillingEvent event = BillingEvent.parseFromRecord(schemaAndRecord);
+ assertThat(event.id()).isEqualTo(1);
+ assertThat(event.billingTime())
+ .isEqualTo(ZonedDateTime.of(2017, 10, 24, 9, 6, 3, 0, ZoneId.of("UTC")));
+ assertThat(event.eventTime())
+ .isEqualTo(ZonedDateTime.of(2017, 1, 19, 23, 59, 43, 0, ZoneId.of("UTC")));
+ assertThat(event.registrarId()).isEqualTo("myRegistrar");
+ assertThat(event.billingId()).isEqualTo("12345-CRRHELLO");
+ assertThat(event.tld()).isEqualTo("test");
+ assertThat(event.action()).isEqualTo("RENEW");
+ assertThat(event.domain()).isEqualTo("example.test");
+ assertThat(event.repositoryId()).isEqualTo("123456");
+ assertThat(event.years()).isEqualTo(5);
+ assertThat(event.currency()).isEqualTo("USD");
+ assertThat(event.amount()).isEqualTo(20.5);
+ assertThat(event.flags()).isEqualTo("AUTO_RENEW SYNTHETIC");
+ }
+
+ @Test
+ public void testParseBillingEventFromRecord_sunriseCreate_reducedPrice_success() {
+ schemaAndRecord.getRecord().put("flags", "SUNRISE");
+ BillingEvent event = BillingEvent.parseFromRecord(schemaAndRecord);
+ assertThat(event.amount()).isEqualTo(17.43);
+ assertThat(event.flags()).isEqualTo("SUNRISE");
+ }
+
+ @Test
+ public void testParseBillingEventFromRecord_anchorTenant_zeroPrice_success() {
+ schemaAndRecord.getRecord().put("flags", "SUNRISE ANCHOR_TENANT");
+ BillingEvent event = BillingEvent.parseFromRecord(schemaAndRecord);
+ assertThat(event.amount()).isZero();
+ assertThat(event.flags()).isEqualTo("SUNRISE ANCHOR_TENANT");
+ }
+
+ @Test
+ public void testParseBillingEventFromRecord_nullValue_throwsException() {
+ schemaAndRecord.getRecord().put("tld", null);
+ assertThrows(IllegalStateException.class, () -> BillingEvent.parseFromRecord(schemaAndRecord));
+ }
+
+ @Test
+ public void testConvertBillingEvent_toCsv() {
+ BillingEvent event = BillingEvent.parseFromRecord(schemaAndRecord);
+ assertThat(event.toCsv())
+ .isEqualTo("1,2017-10-24 09:06:03 UTC,2017-01-19 23:59:43 UTC,myRegistrar,"
+ + "12345-CRRHELLO,test,RENEW,example.test,123456,5,USD,20.50,AUTO_RENEW");
+ }
+
+ @Test
+ public void testGenerateBillingEventFilename() {
+ BillingEvent event = BillingEvent.parseFromRecord(schemaAndRecord);
+ assertThat(event.toFilename("2017-10")).isEqualTo("invoice_details_2017-10_myRegistrar_test");
+ }
+
+ @Test
+ public void testGetInvoiceGroupingKey_fromBillingEvent() {
+ BillingEvent event = BillingEvent.parseFromRecord(schemaAndRecord);
+ InvoiceGroupingKey invoiceKey = event.getInvoiceGroupingKey();
+ assertThat(invoiceKey.startDate()).isEqualTo("2017-10-01");
+ assertThat(invoiceKey.endDate()).isEqualTo("2022-09-30");
+ assertThat(invoiceKey.productAccountKey()).isEqualTo("12345-CRRHELLO");
+ assertThat(invoiceKey.usageGroupingKey()).isEqualTo("myRegistrar - test");
+ assertThat(invoiceKey.description()).isEqualTo("RENEW | TLD: test | TERM: 5-year");
+ assertThat(invoiceKey.unitPrice()).isEqualTo(20.5);
+ assertThat(invoiceKey.unitPriceCurrency()).isEqualTo("USD");
+ assertThat(invoiceKey.poNumber()).isEmpty();
+ }
+
+ @Test
+ public void testConvertInvoiceGroupingKey_toCsv() {
+ BillingEvent event = BillingEvent.parseFromRecord(schemaAndRecord);
+ InvoiceGroupingKey invoiceKey = event.getInvoiceGroupingKey();
+ assertThat(invoiceKey.toCsv(3L))
+ .isEqualTo(
+ "2017-10-01,2022-09-30,12345-CRRHELLO,61.50,USD,10125,1,PURCHASE,"
+ + "myRegistrar - test,3,RENEW | TLD: test | TERM: 5-year,20.50,USD,");
+ }
+
+ @Test
+ public void testInvoiceGroupingKeyCoder_deterministicSerialization() throws IOException {
+ InvoiceGroupingKey invoiceKey =
+ BillingEvent.parseFromRecord(schemaAndRecord).getInvoiceGroupingKey();
+ InvoiceGroupingKeyCoder coder = new InvoiceGroupingKeyCoder();
+ ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+ coder.encode(invoiceKey, outStream);
+ InputStream inStream = new ByteArrayInputStream(outStream.toByteArray());
+ assertThat(coder.decode(inStream)).isEqualTo(invoiceKey);
+ }
+
+ @Test
+ public void testGetDetailReportHeader() {
+ assertThat(BillingEvent.getHeader())
+ .isEqualTo(
+ "id,billingTime,eventTime,registrarId,billingId,tld,action,"
+ + "domain,repositoryId,years,currency,amount,flags");
+ }
+
+ @Test
+ public void testGetOverallInvoiceHeader() {
+ assertThat(InvoiceGroupingKey.invoiceHeader())
+ .isEqualTo("StartDate,EndDate,ProductAccountKey,Amount,AmountCurrency,BillingProductCode,"
+ + "SalesChannel,LineItemType,UsageGroupingKey,Quantity,Description,UnitPrice,"
+ + "UnitPriceCurrency,PONumber");
+ }
+}
diff --git a/javatests/google/registry/beam/invoicing/InvoicingPipelineTest.java b/javatests/google/registry/beam/invoicing/InvoicingPipelineTest.java
new file mode 100644
index 000000000..c0658e483
--- /dev/null
+++ b/javatests/google/registry/beam/invoicing/InvoicingPipelineTest.java
@@ -0,0 +1,215 @@
+// Copyright 2018 The Nomulus Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package google.registry.beam.invoicing;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import google.registry.util.ResourceUtils;
+import java.io.File;
+import java.io.IOException;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.util.Map.Entry;
+import org.apache.beam.runners.direct.DirectRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link InvoicingPipeline}. */
+@RunWith(JUnit4.class)
+public class InvoicingPipelineTest {
+
+ private static PipelineOptions pipelineOptions;
+
+ @BeforeClass
+ public static void initializePipelineOptions() {
+ pipelineOptions = PipelineOptionsFactory.create();
+ pipelineOptions.setRunner(DirectRunner.class);
+ }
+
+ @Rule public final transient TestPipeline p = TestPipeline.fromOptions(pipelineOptions);
+ @Rule public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+ private InvoicingPipeline invoicingPipeline;
+
+ @Before
+ public void initializePipeline() throws IOException {
+ invoicingPipeline = new InvoicingPipeline();
+ invoicingPipeline.projectId = "test-project";
+ File beamTempFolder = tempFolder.newFolder();
+ invoicingPipeline.beamBucketUrl = beamTempFolder.getAbsolutePath();
+ invoicingPipeline.beamStagingUrl = beamTempFolder.getAbsolutePath() + "/staging";
+ invoicingPipeline.invoiceTemplateUrl =
+ beamTempFolder.getAbsolutePath() + "/templates/invoicing";
+ invoicingPipeline.billingBucketUrl = tempFolder.getRoot().getAbsolutePath();
+ }
+
+ private ImmutableList getInputEvents() {
+ return ImmutableList.of(
+ BillingEvent.create(
+ 1,
+ ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
+ ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
+ "theRegistrar",
+ "234",
+ "test",
+ "RENEW",
+ "mydomain.test",
+ "REPO-ID",
+ 3,
+ "USD",
+ 20.5,
+ ""),
+ BillingEvent.create(
+ 1,
+ ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
+ ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
+ "theRegistrar",
+ "234",
+ "test",
+ "RENEW",
+ "mydomain2.test",
+ "REPO-ID",
+ 3,
+ "USD",
+ 20.5,
+ ""),
+ BillingEvent.create(
+ 1,
+ ZonedDateTime.of(2017, 10, 2, 0, 0, 0, 0, ZoneId.of("UTC")),
+ ZonedDateTime.of(2017, 9, 29, 0, 0, 0, 0, ZoneId.of("UTC")),
+ "theRegistrar",
+ "234",
+ "hello",
+ "CREATE",
+ "mydomain3.hello",
+ "REPO-ID",
+ 5,
+ "JPY",
+ 70.75,
+ ""),
+ BillingEvent.create(
+ 1,
+ ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
+ ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
+ "googledomains",
+ "456",
+ "test",
+ "RENEW",
+ "mydomain4.test",
+ "REPO-ID",
+ 1,
+ "USD",
+ 20.5,
+ ""),
+ BillingEvent.create(
+ 1,
+ ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
+ ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
+ "anotherRegistrar",
+ "789",
+ "test",
+ "CREATE",
+ "mydomain5.test",
+ "REPO-ID",
+ 1,
+ "USD",
+ 0,
+ "SUNRISE ANCHOR_TENANT"));
+ }
+
+ /** Returns a map from filename to expected contents for detail reports. */
+ private ImmutableMap> getExpectedDetailReportMap() {
+ return ImmutableMap.of(
+ "invoice_details_2017-10_theRegistrar_test.csv",
+ ImmutableList.of(
+ "1,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,theRegistrar,234,"
+ + "test,RENEW,mydomain2.test,REPO-ID,3,USD,20.50,",
+ "1,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,theRegistrar,234,"
+ + "test,RENEW,mydomain.test,REPO-ID,3,USD,20.50,"),
+ "invoice_details_2017-10_theRegistrar_hello.csv",
+ ImmutableList.of(
+ "1,2017-10-02 00:00:00 UTC,2017-09-29 00:00:00 UTC,theRegistrar,234,"
+ + "hello,CREATE,mydomain3.hello,REPO-ID,5,JPY,70.75,"),
+ "invoice_details_2017-10_googledomains_test.csv",
+ ImmutableList.of(
+ "1,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,googledomains,456,"
+ + "test,RENEW,mydomain4.test,REPO-ID,1,USD,20.50,"),
+ "invoice_details_2017-10_anotherRegistrar_test.csv",
+ ImmutableList.of(
+ "1,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,anotherRegistrar,789,"
+ + "test,CREATE,mydomain5.test,REPO-ID,1,USD,0.00,SUNRISE ANCHOR_TENANT"));
+ }
+
+ private ImmutableList getExpectedInvoiceOutput() {
+ return ImmutableList.of(
+ "2017-10-01,2020-09-30,234,41.00,USD,10125,1,PURCHASE,theRegistrar - test,2,"
+ + "RENEW | TLD: test | TERM: 3-year,20.50,USD,",
+ "2017-10-01,2022-09-30,234,70.75,JPY,10125,1,PURCHASE,theRegistrar - hello,1,"
+ + "CREATE | TLD: hello | TERM: 5-year,70.75,JPY,",
+ "2017-10-01,2018-09-30,456,20.50,USD,10125,1,PURCHASE,googledomains - test,1,"
+ + "RENEW | TLD: test | TERM: 1-year,20.50,USD,",
+ "2017-10-01,2018-09-30,789,0.00,USD,10125,1,PURCHASE,anotherRegistrar - test,1,"
+ + "CREATE | TLD: test | TERM: 1-year,0.00,USD,");
+ }
+
+ @Test
+ public void testEndToEndPipeline_generatesExpectedFiles() throws Exception {
+ ImmutableList inputRows = getInputEvents();
+ PCollection input = p.apply(Create.of(inputRows));
+ invoicingPipeline.applyTerminalTransforms(input, StaticValueProvider.of("2017-10"));
+ p.run();
+
+ for (Entry> entry : getExpectedDetailReportMap().entrySet()) {
+ ImmutableList detailReport = resultFileContents(entry.getKey());
+ assertThat(detailReport.get(0))
+ .isEqualTo("id,billingTime,eventTime,registrarId,billingId,tld,action,"
+ + "domain,repositoryId,years,currency,amount,flags");
+ assertThat(detailReport.subList(1, detailReport.size()))
+ .containsExactlyElementsIn(entry.getValue());
+ }
+
+ ImmutableList overallInvoice = resultFileContents("CRR-INV-2017-10.csv");
+ assertThat(overallInvoice.get(0))
+ .isEqualTo(
+ "StartDate,EndDate,ProductAccountKey,Amount,AmountCurrency,BillingProductCode,"
+ + "SalesChannel,LineItemType,UsageGroupingKey,Quantity,Description,UnitPrice,"
+ + "UnitPriceCurrency,PONumber");
+ assertThat(overallInvoice.subList(1, overallInvoice.size()))
+ .containsExactlyElementsIn(getExpectedInvoiceOutput());
+ }
+
+ /** Returns the text contents of a file under the beamBucket/results directory. */
+ private ImmutableList resultFileContents(String filename) throws Exception {
+ File resultFile =
+ new File(
+ String.format(
+ "%s/invoices/2017-10/%s", tempFolder.getRoot().getAbsolutePath(), filename));
+ return ImmutableList.copyOf(
+ ResourceUtils.readResourceUtf8(resultFile.toURI().toURL()).split("\n"));
+ }
+}
diff --git a/javatests/google/registry/beam/invoicing/InvoicingUtilsTest.java b/javatests/google/registry/beam/invoicing/InvoicingUtilsTest.java
new file mode 100644
index 000000000..e041aa7a6
--- /dev/null
+++ b/javatests/google/registry/beam/invoicing/InvoicingUtilsTest.java
@@ -0,0 +1,76 @@
+// Copyright 2018 The Nomulus Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package google.registry.beam.invoicing;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import google.registry.testing.TestDataHelper;
+import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
+import org.apache.beam.sdk.io.FileBasedSink;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link InvoicingUtils}. */
+@RunWith(JUnit4.class)
+public class InvoicingUtilsTest {
+
+ @Test
+ public void testDestinationFunction_generatesProperFileParams() {
+ SerializableFunction destinationFunction =
+ InvoicingUtils.makeDestinationFunction("my/directory", StaticValueProvider.of("2017-10"));
+
+ BillingEvent billingEvent = mock(BillingEvent.class);
+ // We mock BillingEvent to make the test independent of the implementation of toFilename()
+ when(billingEvent.toFilename(any())).thenReturn("invoice_details_2017-10_registrar_tld");
+
+ assertThat(destinationFunction.apply(billingEvent))
+ .isEqualTo(
+ new Params()
+ .withShardTemplate("")
+ .withSuffix(".csv")
+ .withBaseFilename(
+ FileBasedSink.convertToFileResourceIfPossible(
+ "my/directory/2017-10/invoice_details_2017-10_registrar_tld")));
+ }
+
+ @Test
+ public void testEmptyDestinationParams() {
+ assertThat(InvoicingUtils.makeEmptyDestinationParams("my/directory"))
+ .isEqualTo(
+ new Params()
+ .withBaseFilename(
+ FileBasedSink.convertToFileResourceIfPossible("my/directory/FAILURES")));
+ }
+
+ /** Asserts that the instantiated sql template matches a golden expected file. */
+ @Test
+ public void testMakeQueryProvider() {
+ ValueProvider queryProvider =
+ InvoicingUtils.makeQueryProvider(StaticValueProvider.of("2017-10"), "my-project-id");
+ assertThat(queryProvider.get()).isEqualTo(loadFile("billing_events_test.sql"));
+ }
+
+ /** Returns a {@link String} from a file in the {@code billing/testdata/} directory. */
+ private static String loadFile(String filename) {
+ return TestDataHelper.loadFile(InvoicingUtilsTest.class, filename);
+ }
+}
diff --git a/javatests/google/registry/beam/invoicing/testdata/billing_events_test.sql b/javatests/google/registry/beam/invoicing/testdata/billing_events_test.sql
new file mode 100644
index 000000000..4a1b32783
--- /dev/null
+++ b/javatests/google/registry/beam/invoicing/testdata/billing_events_test.sql
@@ -0,0 +1,100 @@
+#standardSQL
+ -- Copyright 2017 The Nomulus Authors. All Rights Reserved.
+ --
+ -- Licensed under the Apache License, Version 2.0 (the "License");
+ -- you may not use this file except in compliance with the License.
+ -- You may obtain a copy of the License at
+ --
+ -- http://www.apache.org/licenses/LICENSE-2.0
+ --
+ -- Unless required by applicable law or agreed to in writing, software
+ -- distributed under the License is distributed on an "AS IS" BASIS,
+ -- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ -- See the License for the specific language governing permissions and
+ -- limitations under the License.
+
+ -- This query gathers all non-canceled billing events for a given
+ -- YEAR_MONTH in yyyy-MM format.
+
+SELECT
+ __key__.id AS id,
+ billingTime,
+ eventTime,
+ BillingEvent.clientId AS registrarId,
+ RegistrarData.accountId AS billingId,
+ tld,
+ reason as action,
+ targetId as domain,
+ BillingEvent.domainRepoId as repositoryId,
+ periodYears as years,
+ BillingEvent.currency AS currency,
+ BillingEvent.amount as amount,
+ -- We'll strip out non-useful flags downstream
+ ARRAY_TO_STRING(flags, " ") AS flags
+FROM (
+ SELECT
+ *,
+ -- We store cost as "CURRENCY AMOUNT" such as "JPY 800" or "USD 20.00"
+ SPLIT(cost, ' ')[OFFSET(0)] AS currency,
+ SPLIT(cost, ' ')[OFFSET(1)] AS amount,
+ -- Extract everything after the first dot in the domain as the TLD
+ REGEXP_EXTRACT(targetId, r'[.](.+)') AS tld,
+ -- __key__.path looks like '"DomainBase", "", ...'
+ REGEXP_REPLACE(SPLIT(__key__.path, ', ')[OFFSET(1)], '"', '')
+ AS domainRepoId,
+ COALESCE(cancellationMatchingBillingEvent.path,
+ __key__.path) AS cancellationMatchingPath
+ FROM
+ `my-project-id.latest_datastore_export.OneTime`
+ -- Only include real TLDs (filter prober data)
+ WHERE
+ REGEXP_EXTRACT(targetId, r'[.](.+)') IN (
+ SELECT
+ tldStr
+ FROM
+ `my-project-id.latest_datastore_export.Registry`
+ WHERE
+ -- TODO(b/18092292): Add a filter for tldState (not PDT/PREDELEGATION)
+ tldType = 'REAL') ) AS BillingEvent
+ -- Gather billing ID from registrar table
+ -- This is a 'JOIN' as opposed to 'LEFT JOIN' to filter out
+ -- non-billable registrars
+JOIN (
+ SELECT
+ __key__.name AS clientId,
+ billingIdentifier,
+ r.billingAccountMap.currency[SAFE_OFFSET(index)] AS currency,
+ r.billingAccountMap.accountId[SAFE_OFFSET(index)] AS accountId
+ FROM
+ `my-project-id.latest_datastore_export.Registrar` AS r,
+ UNNEST(GENERATE_ARRAY(0, ARRAY_LENGTH(r.billingAccountMap.currency) - 1))
+ AS index
+ WHERE billingAccountMap IS NOT NULL
+ AND type = 'REAL') AS RegistrarData
+ON
+ BillingEvent.clientId = RegistrarData.clientId
+ AND BillingEvent.currency = RegistrarData.currency
+ -- Gather cancellations
+LEFT JOIN (
+ SELECT __key__.id AS cancellationId,
+ COALESCE(refOneTime.path, refRecurring.path) AS cancelledEventPath,
+ eventTime as cancellationTime,
+ billingTime as cancellationBillingTime
+ FROM
+ (SELECT
+ *,
+ -- Count everything after first dot as TLD (to support multi-part TLDs).
+ REGEXP_EXTRACT(targetId, r'[.](.+)') AS tld
+ FROM
+ `my-project-id.latest_datastore_export.Cancellation`)
+) AS Cancellation
+ON BillingEvent.cancellationMatchingPath = Cancellation.cancelledEventPath
+AND BillingEvent.billingTime = Cancellation.cancellationBillingTime
+WHERE billingTime BETWEEN TIMESTAMP('2017-10-01 00:00:00.000000')
+ AND TIMESTAMP('2017-10-31 23:59:59.999999')
+-- Filter out canceled events
+AND Cancellation.cancellationId IS NULL
+ORDER BY
+ billingTime DESC,
+ id,
+ tld
diff --git a/javatests/google/registry/beam/spec11/BUILD b/javatests/google/registry/beam/spec11/BUILD
new file mode 100644
index 000000000..283cca647
--- /dev/null
+++ b/javatests/google/registry/beam/spec11/BUILD
@@ -0,0 +1,36 @@
+package(
+ default_testonly = 1,
+ default_visibility = ["//java/google/registry:registry_project"],
+)
+
+licenses(["notice"]) # Apache 2.0
+
+load("//java/com/google/testing/builddefs:GenTestRules.bzl", "GenTestRules")
+
+java_library(
+ name = "spec11",
+ srcs = glob(["*.java"]),
+ deps = [
+ "//java/google/registry/beam/spec11",
+ "//java/google/registry/util",
+ "//javatests/google/registry/testing",
+ "@com_google_dagger",
+ "@com_google_guava",
+ "@com_google_truth",
+ "@com_google_truth_extensions_truth_java8_extension",
+ "@junit",
+ "@org_apache_avro",
+ "@org_apache_beam_runners_direct_java",
+ "@org_apache_beam_runners_google_cloud_dataflow_java",
+ "@org_apache_beam_sdks_java_core",
+ "@org_apache_beam_sdks_java_io_google_cloud_platform",
+ "@org_mockito_all",
+ ],
+)
+
+GenTestRules(
+ name = "GeneratedTestRules",
+ default_test_size = "small",
+ test_files = glob(["*Test.java"]),
+ deps = [":spec11"],
+)
diff --git a/javatests/google/registry/beam/spec11/Spec11PipelineTest.java b/javatests/google/registry/beam/spec11/Spec11PipelineTest.java
new file mode 100644
index 000000000..080965f61
--- /dev/null
+++ b/javatests/google/registry/beam/spec11/Spec11PipelineTest.java
@@ -0,0 +1,94 @@
+// Copyright 2018 The Nomulus Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package google.registry.beam.spec11;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.common.collect.ImmutableList;
+import google.registry.util.ResourceUtils;
+import java.io.File;
+import java.io.IOException;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import org.apache.beam.runners.direct.DirectRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link Spec11Pipeline}. */
+@RunWith(JUnit4.class)
+public class Spec11PipelineTest {
+
+ private static PipelineOptions pipelineOptions;
+
+ @BeforeClass
+ public static void initializePipelineOptions() {
+ pipelineOptions = PipelineOptionsFactory.create();
+ pipelineOptions.setRunner(DirectRunner.class);
+ }
+
+ @Rule public final transient TestPipeline p = TestPipeline.fromOptions(pipelineOptions);
+ @Rule public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+ private Spec11Pipeline spec11Pipeline;
+
+ @Before
+ public void initializePipeline() throws IOException {
+ spec11Pipeline = new Spec11Pipeline();
+ spec11Pipeline.projectId = "test-project";
+ spec11Pipeline.spec11BucketUrl = tempFolder.getRoot().getAbsolutePath() + "/results";
+ File beamTempFolder = tempFolder.newFolder();
+ spec11Pipeline.beamStagingUrl = beamTempFolder.getAbsolutePath() + "/staging";
+ spec11Pipeline.spec11TemplateUrl = beamTempFolder.getAbsolutePath() + "/templates/invoicing";
+ }
+
+ private ImmutableList getInputDomains() {
+ return ImmutableList.of(
+ Subdomain.create(
+ "a.com", ZonedDateTime.of(2017, 9, 29, 0, 0, 0, 0, ZoneId.of("UTC")), "OK"),
+ Subdomain.create(
+ "b.com", ZonedDateTime.of(2017, 9, 29, 0, 0, 0, 0, ZoneId.of("UTC")), "OK"),
+ Subdomain.create(
+ "c.com", ZonedDateTime.of(2017, 9, 29, 0, 0, 0, 0, ZoneId.of("UTC")), "OK"));
+ }
+
+ @Test
+ public void testEndToEndPipeline_generatesExpectedFiles() throws Exception {
+ ImmutableList inputRows = getInputDomains();
+ PCollection input = p.apply(Create.of(inputRows));
+ spec11Pipeline.countDomainsAndOutputResults(input);
+ p.run();
+
+ ImmutableList generatedReport = resultFileContents();
+ assertThat(generatedReport.get(0)).isEqualTo("HELLO WORLD");
+ assertThat(generatedReport.get(1)).isEqualTo("3");
+ }
+
+ /** Returns the text contents of a file under the beamBucket/results directory. */
+ private ImmutableList resultFileContents() throws Exception {
+ File resultFile = new File(String.format("%s/results", tempFolder.getRoot().getAbsolutePath()));
+ return ImmutableList.copyOf(
+ ResourceUtils.readResourceUtf8(resultFile.toURI().toURL()).split("\n"));
+ }
+}