diff --git a/java/google/registry/beam/invoicing/BUILD b/java/google/registry/beam/invoicing/BUILD new file mode 100644 index 000000000..155b119ec --- /dev/null +++ b/java/google/registry/beam/invoicing/BUILD @@ -0,0 +1,29 @@ +package( + default_visibility = ["//visibility:public"], +) + +licenses(["notice"]) # Apache 2.0 + +java_library( + name = "invoicing", + srcs = glob(["*.java"]), + resources = glob(["sql/*"]), + deps = [ + "//java/google/registry/config", + "//java/google/registry/model", + "//java/google/registry/reporting/billing", + "//java/google/registry/util", + "@com_google_apis_google_api_services_bigquery", + "@com_google_auto_value", + "@com_google_dagger", + "@com_google_flogger", + "@com_google_flogger_system_backend", + "@com_google_guava", + "@javax_inject", + "@org_apache_avro", + "@org_apache_beam_runners_direct_java", + "@org_apache_beam_runners_google_cloud_dataflow_java", + "@org_apache_beam_sdks_java_core", + "@org_apache_beam_sdks_java_io_google_cloud_platform", + ], +) diff --git a/java/google/registry/beam/invoicing/BillingEvent.java b/java/google/registry/beam/invoicing/BillingEvent.java new file mode 100644 index 000000000..719179154 --- /dev/null +++ b/java/google/registry/beam/invoicing/BillingEvent.java @@ -0,0 +1,366 @@ +// Copyright 2018 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam.invoicing; + +import com.google.auto.value.AutoValue; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; +import com.google.common.flogger.FluentLogger; +import google.registry.model.billing.BillingEvent.Flag; +import google.registry.reporting.billing.BillingModule; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; +import java.text.DecimalFormat; +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.stream.Collectors; +import org.apache.avro.generic.GenericRecord; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord; + +/** + * A POJO representing a single billable event, parsed from a {@code SchemaAndRecord}. + * + *

This is a trivially serializable class that allows Beam to transform the results of a Bigquery + * query into a standard Java representation, giving us the type guarantees and ease of manipulation + * Bigquery lacks, while localizing any Bigquery-side failures to the {@link #parseFromRecord} + * function. + */ +@AutoValue +public abstract class BillingEvent implements Serializable { + + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); + + private static final DateTimeFormatter DATE_TIME_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss zzz"); + + + /** The amount we multiply the price for sunrise creates. This is currently a 15% discount. */ + private static final double SUNRISE_DISCOUNT_PRICE_MODIFIER = 0.85; + + private static final ImmutableList FIELD_NAMES = + ImmutableList.of( + "id", + "billingTime", + "eventTime", + "registrarId", + "billingId", + "tld", + "action", + "domain", + "repositoryId", + "years", + "currency", + "amount", + "flags"); + + /** Returns the unique Objectify ID for the {@code OneTime} associated with this event. */ + abstract long id(); + /** Returns the UTC DateTime this event becomes billable. */ + abstract ZonedDateTime billingTime(); + /** Returns the UTC DateTime this event was generated. */ + abstract ZonedDateTime eventTime(); + /** Returns the billed registrar's name. */ + abstract String registrarId(); + /** Returns the billed registrar's billing account key. */ + abstract String billingId(); + /** Returns the tld this event was generated for. */ + abstract String tld(); + /** Returns the billable action this event was generated for (i.e. RENEW, CREATE, TRANSFER...) */ + abstract String action(); + /** Returns the fully qualified domain name this event was generated for. */ + abstract String domain(); + /** Returns the unique RepoID associated with the billed domain. */ + abstract String repositoryId(); + /** Returns the number of years this billing event is made out for. */ + abstract int years(); + /** Returns the 3-letter currency code for the billing event (i.e. USD or JPY.) */ + abstract String currency(); + /** Returns the cost associated with this billing event. */ + abstract double amount(); + /** Returns a list of space-delimited flags associated with the event. */ + abstract String flags(); + + /** + * Constructs a {@code BillingEvent} from a {@code SchemaAndRecord}. + * + * @see + * Apache AVRO GenericRecord + */ + static BillingEvent parseFromRecord(SchemaAndRecord schemaAndRecord) { + checkFieldsNotNull(schemaAndRecord); + GenericRecord record = schemaAndRecord.getRecord(); + String flags = extractField(record, "flags"); + double amount = getDiscountedAmount(Double.parseDouble(extractField(record, "amount")), flags); + return create( + // We need to chain parsers off extractField because GenericRecord only returns + // Objects, which contain a string representation of their underlying types. + Long.parseLong(extractField(record, "id")), + // Bigquery provides UNIX timestamps with microsecond precision. + Instant.ofEpochMilli(Long.parseLong(extractField(record, "billingTime")) / 1000) + .atZone(ZoneId.of("UTC")), + Instant.ofEpochMilli(Long.parseLong(extractField(record, "eventTime")) / 1000) + .atZone(ZoneId.of("UTC")), + extractField(record, "registrarId"), + extractField(record, "billingId"), + extractField(record, "tld"), + extractField(record, "action"), + extractField(record, "domain"), + extractField(record, "repositoryId"), + Integer.parseInt(extractField(record, "years")), + extractField(record, "currency"), + amount, + flags); + } + + /** + * Applies a discount to sunrise creates and anchor tenant creates if applicable. + * + * Currently sunrise creates are discounted 15% and anchor tenant creates are free for 2 years. + * All anchor tenant creates are enforced to be 2 years in + * {@link google.registry.flows.domain.DomainCreateFlow#verifyAnchorTenantValidPeriod}. + */ + private static double getDiscountedAmount(double amount, String flags) { + // Apply a configurable discount to sunrise creates. + if (flags.contains(Flag.SUNRISE.name())) { + amount = + Double.parseDouble( + new DecimalFormat("#.##").format(amount * SUNRISE_DISCOUNT_PRICE_MODIFIER)); + } + // Anchor tenant creates are free for the initial create. This is enforced to be a 2 year period + // upon domain create. + if (flags.contains(Flag.ANCHOR_TENANT.name())) { + amount = 0; + } + return amount; + } + + /** + * Creates a concrete {@code BillingEvent}. + * + *

This should only be used outside this class for testing- instances of {@code BillingEvent} + * should otherwise come from {@link #parseFromRecord}. + */ + @VisibleForTesting + static BillingEvent create( + long id, + ZonedDateTime billingTime, + ZonedDateTime eventTime, + String registrarId, + String billingId, + String tld, + String action, + String domain, + String repositoryId, + int years, + String currency, + double amount, + String flags) { + return new AutoValue_BillingEvent( + id, + billingTime, + eventTime, + registrarId, + billingId, + tld, + action, + domain, + repositoryId, + years, + currency, + amount, + flags); + } + + static String getHeader() { + return FIELD_NAMES.stream().collect(Collectors.joining(",")); + } + + /** + * Generates the filename associated with this {@code BillingEvent}. + * + *

When modifying this function, take care to ensure that there's no way to generate an illegal + * filepath with the arguments, such as "../sensitive_info". + */ + String toFilename(String yearMonth) { + return String.format( + "%s_%s_%s_%s", BillingModule.DETAIL_REPORT_PREFIX, yearMonth, registrarId(), tld()); + } + + /** Generates a CSV representation of this {@code BillingEvent}. */ + String toCsv() { + return Joiner.on(",") + .join( + ImmutableList.of( + id(), + DATE_TIME_FORMATTER.format(billingTime()), + DATE_TIME_FORMATTER.format(eventTime()), + registrarId(), + billingId(), + tld(), + action(), + domain(), + repositoryId(), + years(), + currency(), + String.format("%.2f", amount()), + // Strip out the 'synthetic' flag, which is internal only. + flags().replace("SYNTHETIC", "").trim())); + } + + /** Returns the grouping key for this {@code BillingEvent}, to generate the overall invoice. */ + InvoiceGroupingKey getInvoiceGroupingKey() { + return new AutoValue_BillingEvent_InvoiceGroupingKey( + billingTime().toLocalDate().withDayOfMonth(1).toString(), + billingTime().toLocalDate().withDayOfMonth(1).plusYears(years()).minusDays(1).toString(), + billingId(), + String.format("%s - %s", registrarId(), tld()), + String.format("%s | TLD: %s | TERM: %d-year", action(), tld(), years()), + amount(), + currency(), + ""); + } + + /** Key for each {@code BillingEvent}, when aggregating for the overall invoice. */ + @AutoValue + abstract static class InvoiceGroupingKey implements Serializable { + + private static final ImmutableList INVOICE_HEADERS = + ImmutableList.of( + "StartDate", + "EndDate", + "ProductAccountKey", + "Amount", + "AmountCurrency", + "BillingProductCode", + "SalesChannel", + "LineItemType", + "UsageGroupingKey", + "Quantity", + "Description", + "UnitPrice", + "UnitPriceCurrency", + "PONumber"); + + /** Returns the first day this invoice is valid, in yyyy-MM-dd format. */ + abstract String startDate(); + /** Returns the last day this invoice is valid, in yyyy-MM-dd format. */ + abstract String endDate(); + /** Returns the billing account id, which is the {@code BillingEvent.billingId}. */ + abstract String productAccountKey(); + /** Returns the invoice grouping key, which is in the format "registrarId - tld". */ + abstract String usageGroupingKey(); + /** Returns a description of the item, formatted as "action | TLD: tld | TERM: n-year." */ + abstract String description(); + /** Returns the cost per invoice item. */ + abstract Double unitPrice(); + /** Returns the 3-digit currency code the unit price uses. */ + abstract String unitPriceCurrency(); + /** Returns the purchase order number for the item, blank for most registrars. */ + abstract String poNumber(); + + /** Generates the CSV header for the overall invoice. */ + static String invoiceHeader() { + return Joiner.on(",").join(INVOICE_HEADERS); + } + + /** Generates a CSV representation of n aggregate billing events. */ + String toCsv(Long quantity) { + double totalPrice = unitPrice() * quantity; + return Joiner.on(",") + .join( + ImmutableList.of( + startDate(), + endDate(), + productAccountKey(), + String.format("%.2f", totalPrice), + unitPriceCurrency(), + "10125", + "1", + "PURCHASE", + usageGroupingKey(), + String.format("%d", quantity), + description(), + String.format("%.2f", unitPrice()), + unitPriceCurrency(), + poNumber())); + } + + /** Coder that provides deterministic (de)serialization for {@code InvoiceGroupingKey}. */ + static class InvoiceGroupingKeyCoder extends AtomicCoder { + + @Override + public void encode(InvoiceGroupingKey value, OutputStream outStream) throws IOException { + Coder stringCoder = StringUtf8Coder.of(); + stringCoder.encode(value.startDate(), outStream); + stringCoder.encode(value.endDate(), outStream); + stringCoder.encode(value.productAccountKey(), outStream); + stringCoder.encode(value.usageGroupingKey(), outStream); + stringCoder.encode(value.description(), outStream); + stringCoder.encode(String.valueOf(value.unitPrice()), outStream); + stringCoder.encode(value.unitPriceCurrency(), outStream); + stringCoder.encode(value.poNumber(), outStream); + } + + @Override + public InvoiceGroupingKey decode(InputStream inStream) throws IOException { + Coder stringCoder = StringUtf8Coder.of(); + return new AutoValue_BillingEvent_InvoiceGroupingKey( + stringCoder.decode(inStream), + stringCoder.decode(inStream), + stringCoder.decode(inStream), + stringCoder.decode(inStream), + stringCoder.decode(inStream), + Double.parseDouble(stringCoder.decode(inStream)), + stringCoder.decode(inStream), + stringCoder.decode(inStream)); + } + } + } + + /** Extracts a string representation of a field in a {@code GenericRecord}. */ + private static String extractField(GenericRecord record, String fieldName) { + return String.valueOf(record.get(fieldName)); + } + + /** + * Checks that no expected fields in the record are missing. + * + *

Note that this simply makes sure the field is not null; it may still generate a parse error + * in {@code parseFromRecord}. + */ + private static void checkFieldsNotNull(SchemaAndRecord schemaAndRecord) { + GenericRecord record = schemaAndRecord.getRecord(); + ImmutableList nullFields = + FIELD_NAMES + .stream() + .filter(fieldName -> record.get(fieldName) == null) + .collect(ImmutableList.toImmutableList()); + if (!nullFields.isEmpty()) { + logger.atSevere().log( + "Found unexpected null value(s) in field(s) %s for record %s", + Joiner.on(", ").join(nullFields), record); + throw new IllegalStateException("Read null value from Bigquery query"); + } + } +} diff --git a/java/google/registry/beam/invoicing/InvoicingPipeline.java b/java/google/registry/beam/invoicing/InvoicingPipeline.java new file mode 100644 index 000000000..88c81a1cb --- /dev/null +++ b/java/google/registry/beam/invoicing/InvoicingPipeline.java @@ -0,0 +1,188 @@ +// Copyright 2018 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam.invoicing; + +import google.registry.beam.invoicing.BillingEvent.InvoiceGroupingKey; +import google.registry.beam.invoicing.BillingEvent.InvoiceGroupingKey.InvoiceGroupingKeyCoder; +import google.registry.config.RegistryConfig.Config; +import google.registry.reporting.billing.BillingModule; +import google.registry.reporting.billing.GenerateInvoicesAction; +import java.io.Serializable; +import javax.inject.Inject; +import org.apache.beam.runners.dataflow.DataflowRunner; +import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params; +import org.apache.beam.sdk.io.FileBasedSink; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeDescriptors; + +/** + * Definition of a Dataflow pipeline template, which generates a given month's invoices. + * + *

To stage this template on GCS, run the {@link + * google.registry.tools.DeployInvoicingPipelineCommand} Nomulus command. + * + *

Then, you can run the staged template via the API client library, gCloud or a raw REST call. + * For an example using the API client library, see {@link GenerateInvoicesAction}. + * + * @see Dataflow Templates + */ +public class InvoicingPipeline implements Serializable { + + @Inject + @Config("projectId") + String projectId; + + @Inject + @Config("apacheBeamBucketUrl") + String beamBucketUrl; + + @Inject + @Config("invoiceTemplateUrl") + String invoiceTemplateUrl; + + @Inject + @Config("invoiceStagingUrl") + String invoiceStagingUrl; + + @Inject + @Config("billingBucketUrl") + String billingBucketUrl; + + @Inject + InvoicingPipeline() {} + + /** Custom options for running the invoicing pipeline. */ + interface InvoicingPipelineOptions extends DataflowPipelineOptions { + /** Returns the yearMonth we're generating invoices for, in yyyy-MM format. */ + @Description("The yearMonth we generate invoices for, in yyyy-MM format.") + ValueProvider getYearMonth(); + /** + * Sets the yearMonth we generate invoices for. + * + *

This is implicitly set when executing the Dataflow template, by specifying the 'yearMonth + * parameter. + */ + void setYearMonth(ValueProvider value); + } + + /** Deploys the invoicing pipeline as a template on GCS, for a given projectID and GCS bucket. */ + public void deploy() { + // We can't store options as a member variable due to serialization concerns. + InvoicingPipelineOptions options = PipelineOptionsFactory.as(InvoicingPipelineOptions.class); + options.setProject(projectId); + options.setRunner(DataflowRunner.class); + // This causes p.run() to stage the pipeline as a template on GCS, as opposed to running it. + options.setTemplateLocation(invoiceTemplateUrl); + options.setStagingLocation(invoiceStagingUrl); + Pipeline p = Pipeline.create(options); + + PCollection billingEvents = + p.apply( + "Read BillingEvents from Bigquery", + BigQueryIO.read(BillingEvent::parseFromRecord) + .fromQuery(InvoicingUtils.makeQueryProvider(options.getYearMonth(), projectId)) + .withCoder(SerializableCoder.of(BillingEvent.class)) + .usingStandardSql() + .withoutValidation() + .withTemplateCompatibility()); + applyTerminalTransforms(billingEvents, options.getYearMonth()); + p.run(); + } + + /** + * Applies output transforms to the {@code BillingEvent} source collection. + * + *

This is factored out purely to facilitate testing. + */ + void applyTerminalTransforms( + PCollection billingEvents, ValueProvider yearMonthProvider) { + billingEvents + .apply("Generate overall invoice rows", new GenerateInvoiceRows()) + .apply("Write overall invoice to CSV", writeInvoice(yearMonthProvider)); + + billingEvents.apply( + "Write detail reports to separate CSVs keyed by registrarId_tld pair", + writeDetailReports(yearMonthProvider)); + } + + /** Transform that converts a {@code BillingEvent} into an invoice CSV row. */ + private static class GenerateInvoiceRows + extends PTransform, PCollection> { + @Override + public PCollection expand(PCollection input) { + return input + .apply( + "Map to invoicing key", + MapElements.into(TypeDescriptor.of(InvoiceGroupingKey.class)) + .via(BillingEvent::getInvoiceGroupingKey)) + .setCoder(new InvoiceGroupingKeyCoder()) + .apply("Count occurrences", Count.perElement()) + .apply( + "Format as CSVs", + MapElements.into(TypeDescriptors.strings()) + .via((KV kv) -> kv.getKey().toCsv(kv.getValue()))); + } + } + + /** Returns an IO transform that writes the overall invoice to a single CSV file. */ + private TextIO.Write writeInvoice(ValueProvider yearMonthProvider) { + return TextIO.write() + .to( + NestedValueProvider.of( + yearMonthProvider, + yearMonth -> + String.format( + "%s/%s/%s/%s-%s", + billingBucketUrl, + BillingModule.INVOICES_DIRECTORY, + yearMonth, + BillingModule.OVERALL_INVOICE_PREFIX, + yearMonth))) + .withHeader(InvoiceGroupingKey.invoiceHeader()) + .withoutSharding() + .withSuffix(".csv"); + } + + /** Returns an IO transform that writes detail reports to registrar-tld keyed CSV files. */ + private TextIO.TypedWrite writeDetailReports( + ValueProvider yearMonthProvider) { + return TextIO.writeCustomType() + .to( + InvoicingUtils.makeDestinationFunction( + String.format("%s/%s", billingBucketUrl, BillingModule.INVOICES_DIRECTORY), + yearMonthProvider), + InvoicingUtils.makeEmptyDestinationParams(billingBucketUrl + "/errors")) + .withFormatFunction(BillingEvent::toCsv) + .withoutSharding() + .withTempDirectory( + FileBasedSink.convertToFileResourceIfPossible(beamBucketUrl + "/temporary")) + .withHeader(BillingEvent.getHeader()) + .withSuffix(".csv"); + } +} diff --git a/java/google/registry/beam/invoicing/InvoicingUtils.java b/java/google/registry/beam/invoicing/InvoicingUtils.java new file mode 100644 index 000000000..77c2a3c9c --- /dev/null +++ b/java/google/registry/beam/invoicing/InvoicingUtils.java @@ -0,0 +1,112 @@ +// Copyright 2018 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam.invoicing; + +import com.google.common.io.Resources; +import google.registry.util.ResourceUtils; +import google.registry.util.SqlTemplate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.YearMonth; +import java.time.format.DateTimeFormatter; +import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params; +import org.apache.beam.sdk.io.FileBasedSink; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; +import org.apache.beam.sdk.transforms.SerializableFunction; + +/** Pipeline helper functions used to generate invoices from instances of {@link BillingEvent}. */ +public class InvoicingUtils { + + private InvoicingUtils() {} + + private static final DateTimeFormatter TIMESTAMP_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"); + + /** + * Returns a function mapping from {@code BillingEvent} to filename {@code Params}. + * + *

Beam uses this to determine which file a given {@code BillingEvent} should get placed into. + * + * @param outputBucket the GCS bucket we're outputting reports to + * @param yearMonthProvider a runtime provider for the yyyy-MM we're generating the invoice for + */ + static SerializableFunction makeDestinationFunction( + String outputBucket, ValueProvider yearMonthProvider) { + return billingEvent -> + new Params() + .withShardTemplate("") + .withSuffix(".csv") + .withBaseFilename( + NestedValueProvider.of( + yearMonthProvider, + yearMonth -> + FileBasedSink.convertToFileResourceIfPossible( + String.format( + "%s/%s/%s", + outputBucket, yearMonth, billingEvent.toFilename(yearMonth))))); + } + + /** + * Returns the default filename parameters for an unmappable {@code BillingEvent}. + * + *

The "failed" file should only be populated when an error occurs, which warrants further + * investigation. + */ + static Params makeEmptyDestinationParams(String outputBucket) { + return new Params() + .withBaseFilename( + FileBasedSink.convertToFileResourceIfPossible( + String.format("%s/%s", outputBucket, "FAILURES"))); + } + + /** + * Returns a provider that creates a Bigquery query for a given project and yearMonth at runtime. + * + *

We only know yearMonth at runtime, so this provider fills in the {@code + * sql/billing_events.sql} template at runtime. + * + * @param yearMonthProvider a runtime provider that returns which month we're invoicing for. + * @param projectId the projectId we're generating invoicing for. + */ + static ValueProvider makeQueryProvider( + ValueProvider yearMonthProvider, String projectId) { + return NestedValueProvider.of( + yearMonthProvider, + (yearMonth) -> { + // Get the timestamp endpoints capturing the entire month with microsecond precision + YearMonth reportingMonth = YearMonth.parse(yearMonth); + LocalDateTime firstMoment = reportingMonth.atDay(1).atTime(LocalTime.MIDNIGHT); + LocalDateTime lastMoment = reportingMonth.atEndOfMonth().atTime(LocalTime.MAX); + // Construct the month's query by filling in the billing_events.sql template + return SqlTemplate.create(getQueryFromFile("billing_events.sql")) + .put("FIRST_TIMESTAMP_OF_MONTH", firstMoment.format(TIMESTAMP_FORMATTER)) + .put("LAST_TIMESTAMP_OF_MONTH", lastMoment.format(TIMESTAMP_FORMATTER)) + .put("PROJECT_ID", projectId) + .put("DATASTORE_EXPORT_DATA_SET", "latest_datastore_export") + .put("ONETIME_TABLE", "OneTime") + .put("REGISTRY_TABLE", "Registry") + .put("REGISTRAR_TABLE", "Registrar") + .put("CANCELLATION_TABLE", "Cancellation") + .build(); + }); + } + + /** Returns the {@link String} contents for a file in the {@code beam/sql/} directory. */ + private static String getQueryFromFile(String filename) { + return ResourceUtils.readResourceUtf8( + Resources.getResource(InvoicingUtils.class, "sql/" + filename)); + } +} diff --git a/java/google/registry/beam/invoicing/sql/billing_events.sql b/java/google/registry/beam/invoicing/sql/billing_events.sql new file mode 100644 index 000000000..2757fbe5f --- /dev/null +++ b/java/google/registry/beam/invoicing/sql/billing_events.sql @@ -0,0 +1,100 @@ +#standardSQL + -- Copyright 2017 The Nomulus Authors. All Rights Reserved. + -- + -- Licensed under the Apache License, Version 2.0 (the "License"); + -- you may not use this file except in compliance with the License. + -- You may obtain a copy of the License at + -- + -- http://www.apache.org/licenses/LICENSE-2.0 + -- + -- Unless required by applicable law or agreed to in writing, software + -- distributed under the License is distributed on an "AS IS" BASIS, + -- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + -- See the License for the specific language governing permissions and + -- limitations under the License. + + -- This query gathers all non-canceled billing events for a given + -- YEAR_MONTH in yyyy-MM format. + +SELECT + __key__.id AS id, + billingTime, + eventTime, + BillingEvent.clientId AS registrarId, + RegistrarData.accountId AS billingId, + tld, + reason as action, + targetId as domain, + BillingEvent.domainRepoId as repositoryId, + periodYears as years, + BillingEvent.currency AS currency, + BillingEvent.amount as amount, + -- We'll strip out non-useful flags downstream + ARRAY_TO_STRING(flags, " ") AS flags +FROM ( + SELECT + *, + -- We store cost as "CURRENCY AMOUNT" such as "JPY 800" or "USD 20.00" + SPLIT(cost, ' ')[OFFSET(0)] AS currency, + SPLIT(cost, ' ')[OFFSET(1)] AS amount, + -- Extract everything after the first dot in the domain as the TLD + REGEXP_EXTRACT(targetId, r'[.](.+)') AS tld, + -- __key__.path looks like '"DomainBase", "", ...' + REGEXP_REPLACE(SPLIT(__key__.path, ', ')[OFFSET(1)], '"', '') + AS domainRepoId, + COALESCE(cancellationMatchingBillingEvent.path, + __key__.path) AS cancellationMatchingPath + FROM + `%PROJECT_ID%.%DATASTORE_EXPORT_DATA_SET%.%ONETIME_TABLE%` + -- Only include real TLDs (filter prober data) + WHERE + REGEXP_EXTRACT(targetId, r'[.](.+)') IN ( + SELECT + tldStr + FROM + `%PROJECT_ID%.%DATASTORE_EXPORT_DATA_SET%.%REGISTRY_TABLE%` + WHERE + -- TODO(b/18092292): Add a filter for tldState (not PDT/PREDELEGATION) + tldType = 'REAL') ) AS BillingEvent + -- Gather billing ID from registrar table + -- This is a 'JOIN' as opposed to 'LEFT JOIN' to filter out + -- non-billable registrars +JOIN ( + SELECT + __key__.name AS clientId, + billingIdentifier, + r.billingAccountMap.currency[SAFE_OFFSET(index)] AS currency, + r.billingAccountMap.accountId[SAFE_OFFSET(index)] AS accountId + FROM + `%PROJECT_ID%.%DATASTORE_EXPORT_DATA_SET%.%REGISTRAR_TABLE%` AS r, + UNNEST(GENERATE_ARRAY(0, ARRAY_LENGTH(r.billingAccountMap.currency) - 1)) + AS index + WHERE billingAccountMap IS NOT NULL + AND type = 'REAL') AS RegistrarData +ON + BillingEvent.clientId = RegistrarData.clientId + AND BillingEvent.currency = RegistrarData.currency + -- Gather cancellations +LEFT JOIN ( + SELECT __key__.id AS cancellationId, + COALESCE(refOneTime.path, refRecurring.path) AS cancelledEventPath, + eventTime as cancellationTime, + billingTime as cancellationBillingTime + FROM + (SELECT + *, + -- Count everything after first dot as TLD (to support multi-part TLDs). + REGEXP_EXTRACT(targetId, r'[.](.+)') AS tld + FROM + `%PROJECT_ID%.%DATASTORE_EXPORT_DATA_SET%.%CANCELLATION_TABLE%`) +) AS Cancellation +ON BillingEvent.cancellationMatchingPath = Cancellation.cancelledEventPath +AND BillingEvent.billingTime = Cancellation.cancellationBillingTime +WHERE billingTime BETWEEN TIMESTAMP('%FIRST_TIMESTAMP_OF_MONTH%') + AND TIMESTAMP('%LAST_TIMESTAMP_OF_MONTH%') +-- Filter out canceled events +AND Cancellation.cancellationId IS NULL +ORDER BY + billingTime DESC, + id, + tld