mirror of
https://github.com/google/nomulus.git
synced 2025-05-15 08:57:12 +02:00
Refactor beam invoicing pipeline into its own package
This prepares for the spec11 beam pipeline to live parallel to the invoicing beam pipeline, for better organization. ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=204980582
This commit is contained in:
parent
abfc60ab39
commit
9e53e67128
17 changed files with 10 additions and 1583 deletions
|
@ -6,24 +6,4 @@ licenses(["notice"]) # Apache 2.0
|
||||||
|
|
||||||
java_library(
|
java_library(
|
||||||
name = "beam",
|
name = "beam",
|
||||||
srcs = glob(["*.java"]),
|
|
||||||
resources = glob(["sql/*"]),
|
|
||||||
deps = [
|
|
||||||
"//java/google/registry/config",
|
|
||||||
"//java/google/registry/model",
|
|
||||||
"//java/google/registry/reporting/billing",
|
|
||||||
"//java/google/registry/util",
|
|
||||||
"@com_google_apis_google_api_services_bigquery",
|
|
||||||
"@com_google_auto_value",
|
|
||||||
"@com_google_dagger",
|
|
||||||
"@com_google_flogger",
|
|
||||||
"@com_google_flogger_system_backend",
|
|
||||||
"@com_google_guava",
|
|
||||||
"@javax_inject",
|
|
||||||
"@org_apache_avro",
|
|
||||||
"@org_apache_beam_runners_direct_java",
|
|
||||||
"@org_apache_beam_runners_google_cloud_dataflow_java",
|
|
||||||
"@org_apache_beam_sdks_java_core",
|
|
||||||
"@org_apache_beam_sdks_java_io_google_cloud_platform",
|
|
||||||
],
|
|
||||||
)
|
)
|
||||||
|
|
|
@ -1,97 +0,0 @@
|
||||||
// Copyright 2017 The Nomulus Authors. All Rights Reserved.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package google.registry.beam;
|
|
||||||
|
|
||||||
import com.google.api.services.bigquery.model.TableRow;
|
|
||||||
import org.apache.beam.sdk.Pipeline;
|
|
||||||
import org.apache.beam.sdk.io.TextIO;
|
|
||||||
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
|
|
||||||
import org.apache.beam.sdk.options.Default;
|
|
||||||
import org.apache.beam.sdk.options.Description;
|
|
||||||
import org.apache.beam.sdk.options.PipelineOptions;
|
|
||||||
import org.apache.beam.sdk.options.PipelineOptionsFactory;
|
|
||||||
import org.apache.beam.sdk.transforms.Count;
|
|
||||||
import org.apache.beam.sdk.transforms.DoFn;
|
|
||||||
import org.apache.beam.sdk.transforms.MapElements;
|
|
||||||
import org.apache.beam.sdk.transforms.PTransform;
|
|
||||||
import org.apache.beam.sdk.transforms.ParDo;
|
|
||||||
import org.apache.beam.sdk.values.PCollection;
|
|
||||||
import org.apache.beam.sdk.values.TypeDescriptors;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Main class to stage a templated Dataflow pipeline which reads from Bigquery on GCS.
|
|
||||||
*
|
|
||||||
* <p>To stage this pipeline on GCS, run it with the following command line flags:
|
|
||||||
* <ul>
|
|
||||||
* <li>--runner=DataflowRunner
|
|
||||||
* <li>--project=[YOUR PROJECT ID]
|
|
||||||
* <li>--stagingLocation=gs://[WHERE PIPELINE JAR FILES SHOULD BE STAGED]
|
|
||||||
* <li>--templateLocation=gs://[WHERE TEMPLATE.txt FILE SHOULD BE STAGED]
|
|
||||||
* </ul>
|
|
||||||
*
|
|
||||||
* <p>Then, you can run the staged template via the API client library, gCloud or a raw REST call.
|
|
||||||
*
|
|
||||||
* @see <a href="https://cloud.google.com/dataflow/docs/templates/overview">Dataflow Templates</a>
|
|
||||||
*/
|
|
||||||
public class BigqueryTemplatePipeline {
|
|
||||||
|
|
||||||
/** Custom command-line pipeline options for {@code BigqueryTemplatePipeline}. */
|
|
||||||
public interface BigqueryTemplateOptions extends PipelineOptions {
|
|
||||||
@Description("Bigquery query used to get the initial data for the pipeline.")
|
|
||||||
@Default.String("SELECT * FROM `[YOUR_PROJECT].[DATASET].[TABLE]`")
|
|
||||||
String getBigqueryQuery();
|
|
||||||
void setBigqueryQuery(String value);
|
|
||||||
|
|
||||||
@Description("The GCS bucket we output the result text file to.")
|
|
||||||
@Default.String("[YOUR BUCKET HERE]")
|
|
||||||
String getOutputBucket();
|
|
||||||
void setOutputBucket(String value);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void main(String[] args) {
|
|
||||||
// Parse standard arguments, as well as custom options
|
|
||||||
BigqueryTemplateOptions options =
|
|
||||||
PipelineOptionsFactory.fromArgs(args).withValidation().as(BigqueryTemplateOptions.class);
|
|
||||||
|
|
||||||
// Create pipeline
|
|
||||||
Pipeline p = Pipeline.create(options);
|
|
||||||
p.apply(BigQueryIO.readTableRows().fromQuery(options.getBigqueryQuery()).usingStandardSql())
|
|
||||||
.apply("Count request paths", new CountRequestPaths())
|
|
||||||
.apply(TextIO.write().to(options.getOutputBucket()).withoutSharding().withHeader("HEADER"));
|
|
||||||
p.run();
|
|
||||||
}
|
|
||||||
|
|
||||||
/** A composite {@code PTransform} that counts the number of times each request path appears. */
|
|
||||||
static class CountRequestPaths extends PTransform<PCollection<TableRow>, PCollection<String>> {
|
|
||||||
@Override
|
|
||||||
public PCollection<String> expand(PCollection<TableRow> input) {
|
|
||||||
return input
|
|
||||||
.apply("Extract paths", ParDo.of(new ExtractRequestPathFn()))
|
|
||||||
.apply("Count paths", Count.perElement())
|
|
||||||
.apply(
|
|
||||||
"Format results",
|
|
||||||
MapElements.into(TypeDescriptors.strings())
|
|
||||||
.via(kv -> kv.getKey() + ": " + kv.getValue()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** A {@code DoFn} that extracts the request path from a Bigquery {@code TableRow}. */
|
|
||||||
static class ExtractRequestPathFn extends DoFn<TableRow, String> {
|
|
||||||
@ProcessElement
|
|
||||||
public void processElement(ProcessContext c) {
|
|
||||||
c.output(String.valueOf(c.element().get("requestPath")));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,366 +0,0 @@
|
||||||
// Copyright 2017 The Nomulus Authors. All Rights Reserved.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package google.registry.beam;
|
|
||||||
|
|
||||||
import com.google.auto.value.AutoValue;
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import com.google.common.base.Joiner;
|
|
||||||
import com.google.common.collect.ImmutableList;
|
|
||||||
import com.google.common.flogger.FluentLogger;
|
|
||||||
import google.registry.model.billing.BillingEvent.Flag;
|
|
||||||
import google.registry.reporting.billing.BillingModule;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStream;
|
|
||||||
import java.io.OutputStream;
|
|
||||||
import java.io.Serializable;
|
|
||||||
import java.text.DecimalFormat;
|
|
||||||
import java.time.Instant;
|
|
||||||
import java.time.ZoneId;
|
|
||||||
import java.time.ZonedDateTime;
|
|
||||||
import java.time.format.DateTimeFormatter;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
import org.apache.avro.generic.GenericRecord;
|
|
||||||
import org.apache.beam.sdk.coders.AtomicCoder;
|
|
||||||
import org.apache.beam.sdk.coders.Coder;
|
|
||||||
import org.apache.beam.sdk.coders.StringUtf8Coder;
|
|
||||||
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A POJO representing a single billable event, parsed from a {@code SchemaAndRecord}.
|
|
||||||
*
|
|
||||||
* <p>This is a trivially serializable class that allows Beam to transform the results of a Bigquery
|
|
||||||
* query into a standard Java representation, giving us the type guarantees and ease of manipulation
|
|
||||||
* Bigquery lacks, while localizing any Bigquery-side failures to the {@link #parseFromRecord}
|
|
||||||
* function.
|
|
||||||
*/
|
|
||||||
@AutoValue
|
|
||||||
public abstract class BillingEvent implements Serializable {
|
|
||||||
|
|
||||||
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
|
|
||||||
|
|
||||||
private static final DateTimeFormatter DATE_TIME_FORMATTER =
|
|
||||||
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss zzz");
|
|
||||||
|
|
||||||
|
|
||||||
/** The amount we multiply the price for sunrise creates. This is currently a 15% discount. */
|
|
||||||
private static final double SUNRISE_DISCOUNT_PRICE_MODIFIER = 0.85;
|
|
||||||
|
|
||||||
private static final ImmutableList<String> FIELD_NAMES =
|
|
||||||
ImmutableList.of(
|
|
||||||
"id",
|
|
||||||
"billingTime",
|
|
||||||
"eventTime",
|
|
||||||
"registrarId",
|
|
||||||
"billingId",
|
|
||||||
"tld",
|
|
||||||
"action",
|
|
||||||
"domain",
|
|
||||||
"repositoryId",
|
|
||||||
"years",
|
|
||||||
"currency",
|
|
||||||
"amount",
|
|
||||||
"flags");
|
|
||||||
|
|
||||||
/** Returns the unique Objectify ID for the {@code OneTime} associated with this event. */
|
|
||||||
abstract long id();
|
|
||||||
/** Returns the UTC DateTime this event becomes billable. */
|
|
||||||
abstract ZonedDateTime billingTime();
|
|
||||||
/** Returns the UTC DateTime this event was generated. */
|
|
||||||
abstract ZonedDateTime eventTime();
|
|
||||||
/** Returns the billed registrar's name. */
|
|
||||||
abstract String registrarId();
|
|
||||||
/** Returns the billed registrar's billing account key. */
|
|
||||||
abstract String billingId();
|
|
||||||
/** Returns the tld this event was generated for. */
|
|
||||||
abstract String tld();
|
|
||||||
/** Returns the billable action this event was generated for (i.e. RENEW, CREATE, TRANSFER...) */
|
|
||||||
abstract String action();
|
|
||||||
/** Returns the fully qualified domain name this event was generated for. */
|
|
||||||
abstract String domain();
|
|
||||||
/** Returns the unique RepoID associated with the billed domain. */
|
|
||||||
abstract String repositoryId();
|
|
||||||
/** Returns the number of years this billing event is made out for. */
|
|
||||||
abstract int years();
|
|
||||||
/** Returns the 3-letter currency code for the billing event (i.e. USD or JPY.) */
|
|
||||||
abstract String currency();
|
|
||||||
/** Returns the cost associated with this billing event. */
|
|
||||||
abstract double amount();
|
|
||||||
/** Returns a list of space-delimited flags associated with the event. */
|
|
||||||
abstract String flags();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Constructs a {@code BillingEvent} from a {@code SchemaAndRecord}.
|
|
||||||
*
|
|
||||||
* @see <a
|
|
||||||
* href=http://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/generic/GenericData.Record.html>
|
|
||||||
* Apache AVRO GenericRecord</a>
|
|
||||||
*/
|
|
||||||
static BillingEvent parseFromRecord(SchemaAndRecord schemaAndRecord) {
|
|
||||||
checkFieldsNotNull(schemaAndRecord);
|
|
||||||
GenericRecord record = schemaAndRecord.getRecord();
|
|
||||||
String flags = extractField(record, "flags");
|
|
||||||
double amount = getDiscountedAmount(Double.parseDouble(extractField(record, "amount")), flags);
|
|
||||||
return create(
|
|
||||||
// We need to chain parsers off extractField because GenericRecord only returns
|
|
||||||
// Objects, which contain a string representation of their underlying types.
|
|
||||||
Long.parseLong(extractField(record, "id")),
|
|
||||||
// Bigquery provides UNIX timestamps with microsecond precision.
|
|
||||||
Instant.ofEpochMilli(Long.parseLong(extractField(record, "billingTime")) / 1000)
|
|
||||||
.atZone(ZoneId.of("UTC")),
|
|
||||||
Instant.ofEpochMilli(Long.parseLong(extractField(record, "eventTime")) / 1000)
|
|
||||||
.atZone(ZoneId.of("UTC")),
|
|
||||||
extractField(record, "registrarId"),
|
|
||||||
extractField(record, "billingId"),
|
|
||||||
extractField(record, "tld"),
|
|
||||||
extractField(record, "action"),
|
|
||||||
extractField(record, "domain"),
|
|
||||||
extractField(record, "repositoryId"),
|
|
||||||
Integer.parseInt(extractField(record, "years")),
|
|
||||||
extractField(record, "currency"),
|
|
||||||
amount,
|
|
||||||
flags);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Applies a discount to sunrise creates and anchor tenant creates if applicable.
|
|
||||||
*
|
|
||||||
* Currently sunrise creates are discounted 15% and anchor tenant creates are free for 2 years.
|
|
||||||
* All anchor tenant creates are enforced to be 2 years in
|
|
||||||
* {@link google.registry.flows.domain.DomainCreateFlow#verifyAnchorTenantValidPeriod}.
|
|
||||||
*/
|
|
||||||
private static double getDiscountedAmount(double amount, String flags) {
|
|
||||||
// Apply a configurable discount to sunrise creates.
|
|
||||||
if (flags.contains(Flag.SUNRISE.name())) {
|
|
||||||
amount =
|
|
||||||
Double.parseDouble(
|
|
||||||
new DecimalFormat("#.##").format(amount * SUNRISE_DISCOUNT_PRICE_MODIFIER));
|
|
||||||
}
|
|
||||||
// Anchor tenant creates are free for the initial create. This is enforced to be a 2 year period
|
|
||||||
// upon domain create.
|
|
||||||
if (flags.contains(Flag.ANCHOR_TENANT.name())) {
|
|
||||||
amount = 0;
|
|
||||||
}
|
|
||||||
return amount;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a concrete {@code BillingEvent}.
|
|
||||||
*
|
|
||||||
* <p>This should only be used outside this class for testing- instances of {@code BillingEvent}
|
|
||||||
* should otherwise come from {@link #parseFromRecord}.
|
|
||||||
*/
|
|
||||||
@VisibleForTesting
|
|
||||||
static BillingEvent create(
|
|
||||||
long id,
|
|
||||||
ZonedDateTime billingTime,
|
|
||||||
ZonedDateTime eventTime,
|
|
||||||
String registrarId,
|
|
||||||
String billingId,
|
|
||||||
String tld,
|
|
||||||
String action,
|
|
||||||
String domain,
|
|
||||||
String repositoryId,
|
|
||||||
int years,
|
|
||||||
String currency,
|
|
||||||
double amount,
|
|
||||||
String flags) {
|
|
||||||
return new AutoValue_BillingEvent(
|
|
||||||
id,
|
|
||||||
billingTime,
|
|
||||||
eventTime,
|
|
||||||
registrarId,
|
|
||||||
billingId,
|
|
||||||
tld,
|
|
||||||
action,
|
|
||||||
domain,
|
|
||||||
repositoryId,
|
|
||||||
years,
|
|
||||||
currency,
|
|
||||||
amount,
|
|
||||||
flags);
|
|
||||||
}
|
|
||||||
|
|
||||||
static String getHeader() {
|
|
||||||
return FIELD_NAMES.stream().collect(Collectors.joining(","));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Generates the filename associated with this {@code BillingEvent}.
|
|
||||||
*
|
|
||||||
* <p>When modifying this function, take care to ensure that there's no way to generate an illegal
|
|
||||||
* filepath with the arguments, such as "../sensitive_info".
|
|
||||||
*/
|
|
||||||
String toFilename(String yearMonth) {
|
|
||||||
return String.format(
|
|
||||||
"%s_%s_%s_%s", BillingModule.DETAIL_REPORT_PREFIX, yearMonth, registrarId(), tld());
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Generates a CSV representation of this {@code BillingEvent}. */
|
|
||||||
String toCsv() {
|
|
||||||
return Joiner.on(",")
|
|
||||||
.join(
|
|
||||||
ImmutableList.of(
|
|
||||||
id(),
|
|
||||||
DATE_TIME_FORMATTER.format(billingTime()),
|
|
||||||
DATE_TIME_FORMATTER.format(eventTime()),
|
|
||||||
registrarId(),
|
|
||||||
billingId(),
|
|
||||||
tld(),
|
|
||||||
action(),
|
|
||||||
domain(),
|
|
||||||
repositoryId(),
|
|
||||||
years(),
|
|
||||||
currency(),
|
|
||||||
String.format("%.2f", amount()),
|
|
||||||
// Strip out the 'synthetic' flag, which is internal only.
|
|
||||||
flags().replace("SYNTHETIC", "").trim()));
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Returns the grouping key for this {@code BillingEvent}, to generate the overall invoice. */
|
|
||||||
InvoiceGroupingKey getInvoiceGroupingKey() {
|
|
||||||
return new AutoValue_BillingEvent_InvoiceGroupingKey(
|
|
||||||
billingTime().toLocalDate().withDayOfMonth(1).toString(),
|
|
||||||
billingTime().toLocalDate().withDayOfMonth(1).plusYears(years()).minusDays(1).toString(),
|
|
||||||
billingId(),
|
|
||||||
String.format("%s - %s", registrarId(), tld()),
|
|
||||||
String.format("%s | TLD: %s | TERM: %d-year", action(), tld(), years()),
|
|
||||||
amount(),
|
|
||||||
currency(),
|
|
||||||
"");
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Key for each {@code BillingEvent}, when aggregating for the overall invoice. */
|
|
||||||
@AutoValue
|
|
||||||
abstract static class InvoiceGroupingKey implements Serializable {
|
|
||||||
|
|
||||||
private static final ImmutableList<String> INVOICE_HEADERS =
|
|
||||||
ImmutableList.of(
|
|
||||||
"StartDate",
|
|
||||||
"EndDate",
|
|
||||||
"ProductAccountKey",
|
|
||||||
"Amount",
|
|
||||||
"AmountCurrency",
|
|
||||||
"BillingProductCode",
|
|
||||||
"SalesChannel",
|
|
||||||
"LineItemType",
|
|
||||||
"UsageGroupingKey",
|
|
||||||
"Quantity",
|
|
||||||
"Description",
|
|
||||||
"UnitPrice",
|
|
||||||
"UnitPriceCurrency",
|
|
||||||
"PONumber");
|
|
||||||
|
|
||||||
/** Returns the first day this invoice is valid, in yyyy-MM-dd format. */
|
|
||||||
abstract String startDate();
|
|
||||||
/** Returns the last day this invoice is valid, in yyyy-MM-dd format. */
|
|
||||||
abstract String endDate();
|
|
||||||
/** Returns the billing account id, which is the {@code BillingEvent.billingId}. */
|
|
||||||
abstract String productAccountKey();
|
|
||||||
/** Returns the invoice grouping key, which is in the format "registrarId - tld". */
|
|
||||||
abstract String usageGroupingKey();
|
|
||||||
/** Returns a description of the item, formatted as "action | TLD: tld | TERM: n-year." */
|
|
||||||
abstract String description();
|
|
||||||
/** Returns the cost per invoice item. */
|
|
||||||
abstract Double unitPrice();
|
|
||||||
/** Returns the 3-digit currency code the unit price uses. */
|
|
||||||
abstract String unitPriceCurrency();
|
|
||||||
/** Returns the purchase order number for the item, blank for most registrars. */
|
|
||||||
abstract String poNumber();
|
|
||||||
|
|
||||||
/** Generates the CSV header for the overall invoice. */
|
|
||||||
static String invoiceHeader() {
|
|
||||||
return Joiner.on(",").join(INVOICE_HEADERS);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Generates a CSV representation of n aggregate billing events. */
|
|
||||||
String toCsv(Long quantity) {
|
|
||||||
double totalPrice = unitPrice() * quantity;
|
|
||||||
return Joiner.on(",")
|
|
||||||
.join(
|
|
||||||
ImmutableList.of(
|
|
||||||
startDate(),
|
|
||||||
endDate(),
|
|
||||||
productAccountKey(),
|
|
||||||
String.format("%.2f", totalPrice),
|
|
||||||
unitPriceCurrency(),
|
|
||||||
"10125",
|
|
||||||
"1",
|
|
||||||
"PURCHASE",
|
|
||||||
usageGroupingKey(),
|
|
||||||
String.format("%d", quantity),
|
|
||||||
description(),
|
|
||||||
String.format("%.2f", unitPrice()),
|
|
||||||
unitPriceCurrency(),
|
|
||||||
poNumber()));
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Coder that provides deterministic (de)serialization for {@code InvoiceGroupingKey}. */
|
|
||||||
static class InvoiceGroupingKeyCoder extends AtomicCoder<InvoiceGroupingKey> {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void encode(InvoiceGroupingKey value, OutputStream outStream) throws IOException {
|
|
||||||
Coder<String> stringCoder = StringUtf8Coder.of();
|
|
||||||
stringCoder.encode(value.startDate(), outStream);
|
|
||||||
stringCoder.encode(value.endDate(), outStream);
|
|
||||||
stringCoder.encode(value.productAccountKey(), outStream);
|
|
||||||
stringCoder.encode(value.usageGroupingKey(), outStream);
|
|
||||||
stringCoder.encode(value.description(), outStream);
|
|
||||||
stringCoder.encode(String.valueOf(value.unitPrice()), outStream);
|
|
||||||
stringCoder.encode(value.unitPriceCurrency(), outStream);
|
|
||||||
stringCoder.encode(value.poNumber(), outStream);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public InvoiceGroupingKey decode(InputStream inStream) throws IOException {
|
|
||||||
Coder<String> stringCoder = StringUtf8Coder.of();
|
|
||||||
return new AutoValue_BillingEvent_InvoiceGroupingKey(
|
|
||||||
stringCoder.decode(inStream),
|
|
||||||
stringCoder.decode(inStream),
|
|
||||||
stringCoder.decode(inStream),
|
|
||||||
stringCoder.decode(inStream),
|
|
||||||
stringCoder.decode(inStream),
|
|
||||||
Double.parseDouble(stringCoder.decode(inStream)),
|
|
||||||
stringCoder.decode(inStream),
|
|
||||||
stringCoder.decode(inStream));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Extracts a string representation of a field in a {@code GenericRecord}. */
|
|
||||||
private static String extractField(GenericRecord record, String fieldName) {
|
|
||||||
return String.valueOf(record.get(fieldName));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Checks that no expected fields in the record are missing.
|
|
||||||
*
|
|
||||||
* <p>Note that this simply makes sure the field is not null; it may still generate a parse error
|
|
||||||
* in {@code parseFromRecord}.
|
|
||||||
*/
|
|
||||||
private static void checkFieldsNotNull(SchemaAndRecord schemaAndRecord) {
|
|
||||||
GenericRecord record = schemaAndRecord.getRecord();
|
|
||||||
ImmutableList<String> nullFields =
|
|
||||||
FIELD_NAMES
|
|
||||||
.stream()
|
|
||||||
.filter(fieldName -> record.get(fieldName) == null)
|
|
||||||
.collect(ImmutableList.toImmutableList());
|
|
||||||
if (!nullFields.isEmpty()) {
|
|
||||||
logger.atSevere().log(
|
|
||||||
"Found unexpected null value(s) in field(s) %s for record %s",
|
|
||||||
Joiner.on(", ").join(nullFields), record);
|
|
||||||
throw new IllegalStateException("Read null value from Bigquery query");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,188 +0,0 @@
|
||||||
// Copyright 2017 The Nomulus Authors. All Rights Reserved.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package google.registry.beam;
|
|
||||||
|
|
||||||
import google.registry.beam.BillingEvent.InvoiceGroupingKey;
|
|
||||||
import google.registry.beam.BillingEvent.InvoiceGroupingKey.InvoiceGroupingKeyCoder;
|
|
||||||
import google.registry.config.RegistryConfig.Config;
|
|
||||||
import google.registry.reporting.billing.BillingModule;
|
|
||||||
import google.registry.reporting.billing.GenerateInvoicesAction;
|
|
||||||
import java.io.Serializable;
|
|
||||||
import javax.inject.Inject;
|
|
||||||
import org.apache.beam.runners.dataflow.DataflowRunner;
|
|
||||||
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
|
|
||||||
import org.apache.beam.sdk.Pipeline;
|
|
||||||
import org.apache.beam.sdk.coders.SerializableCoder;
|
|
||||||
import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
|
|
||||||
import org.apache.beam.sdk.io.FileBasedSink;
|
|
||||||
import org.apache.beam.sdk.io.TextIO;
|
|
||||||
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
|
|
||||||
import org.apache.beam.sdk.options.Description;
|
|
||||||
import org.apache.beam.sdk.options.PipelineOptionsFactory;
|
|
||||||
import org.apache.beam.sdk.options.ValueProvider;
|
|
||||||
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
|
|
||||||
import org.apache.beam.sdk.transforms.Count;
|
|
||||||
import org.apache.beam.sdk.transforms.MapElements;
|
|
||||||
import org.apache.beam.sdk.transforms.PTransform;
|
|
||||||
import org.apache.beam.sdk.values.KV;
|
|
||||||
import org.apache.beam.sdk.values.PCollection;
|
|
||||||
import org.apache.beam.sdk.values.TypeDescriptor;
|
|
||||||
import org.apache.beam.sdk.values.TypeDescriptors;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Definition of a Dataflow pipeline template, which generates a given month's invoices.
|
|
||||||
*
|
|
||||||
* <p>To stage this template on GCS, run the {@link
|
|
||||||
* google.registry.tools.DeployInvoicingPipelineCommand} Nomulus command.
|
|
||||||
*
|
|
||||||
* <p>Then, you can run the staged template via the API client library, gCloud or a raw REST call.
|
|
||||||
* For an example using the API client library, see {@link GenerateInvoicesAction}.
|
|
||||||
*
|
|
||||||
* @see <a href="https://cloud.google.com/dataflow/docs/templates/overview">Dataflow Templates</a>
|
|
||||||
*/
|
|
||||||
public class InvoicingPipeline implements Serializable {
|
|
||||||
|
|
||||||
@Inject
|
|
||||||
@Config("projectId")
|
|
||||||
String projectId;
|
|
||||||
|
|
||||||
@Inject
|
|
||||||
@Config("apacheBeamBucketUrl")
|
|
||||||
String beamBucketUrl;
|
|
||||||
|
|
||||||
@Inject
|
|
||||||
@Config("invoiceTemplateUrl")
|
|
||||||
String invoiceTemplateUrl;
|
|
||||||
|
|
||||||
@Inject
|
|
||||||
@Config("invoiceStagingUrl")
|
|
||||||
String invoiceStagingUrl;
|
|
||||||
|
|
||||||
@Inject
|
|
||||||
@Config("billingBucketUrl")
|
|
||||||
String billingBucketUrl;
|
|
||||||
|
|
||||||
@Inject
|
|
||||||
InvoicingPipeline() {}
|
|
||||||
|
|
||||||
/** Custom options for running the invoicing pipeline. */
|
|
||||||
interface InvoicingPipelineOptions extends DataflowPipelineOptions {
|
|
||||||
/** Returns the yearMonth we're generating invoices for, in yyyy-MM format. */
|
|
||||||
@Description("The yearMonth we generate invoices for, in yyyy-MM format.")
|
|
||||||
ValueProvider<String> getYearMonth();
|
|
||||||
/**
|
|
||||||
* Sets the yearMonth we generate invoices for.
|
|
||||||
*
|
|
||||||
* <p>This is implicitly set when executing the Dataflow template, by specifying the 'yearMonth
|
|
||||||
* parameter.
|
|
||||||
*/
|
|
||||||
void setYearMonth(ValueProvider<String> value);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Deploys the invoicing pipeline as a template on GCS, for a given projectID and GCS bucket. */
|
|
||||||
public void deploy() {
|
|
||||||
// We can't store options as a member variable due to serialization concerns.
|
|
||||||
InvoicingPipelineOptions options = PipelineOptionsFactory.as(InvoicingPipelineOptions.class);
|
|
||||||
options.setProject(projectId);
|
|
||||||
options.setRunner(DataflowRunner.class);
|
|
||||||
// This causes p.run() to stage the pipeline as a template on GCS, as opposed to running it.
|
|
||||||
options.setTemplateLocation(invoiceTemplateUrl);
|
|
||||||
options.setStagingLocation(invoiceStagingUrl);
|
|
||||||
Pipeline p = Pipeline.create(options);
|
|
||||||
|
|
||||||
PCollection<BillingEvent> billingEvents =
|
|
||||||
p.apply(
|
|
||||||
"Read BillingEvents from Bigquery",
|
|
||||||
BigQueryIO.read(BillingEvent::parseFromRecord)
|
|
||||||
.fromQuery(InvoicingUtils.makeQueryProvider(options.getYearMonth(), projectId))
|
|
||||||
.withCoder(SerializableCoder.of(BillingEvent.class))
|
|
||||||
.usingStandardSql()
|
|
||||||
.withoutValidation()
|
|
||||||
.withTemplateCompatibility());
|
|
||||||
applyTerminalTransforms(billingEvents, options.getYearMonth());
|
|
||||||
p.run();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Applies output transforms to the {@code BillingEvent} source collection.
|
|
||||||
*
|
|
||||||
* <p>This is factored out purely to facilitate testing.
|
|
||||||
*/
|
|
||||||
void applyTerminalTransforms(
|
|
||||||
PCollection<BillingEvent> billingEvents, ValueProvider<String> yearMonthProvider) {
|
|
||||||
billingEvents
|
|
||||||
.apply("Generate overall invoice rows", new GenerateInvoiceRows())
|
|
||||||
.apply("Write overall invoice to CSV", writeInvoice(yearMonthProvider));
|
|
||||||
|
|
||||||
billingEvents.apply(
|
|
||||||
"Write detail reports to separate CSVs keyed by registrarId_tld pair",
|
|
||||||
writeDetailReports(yearMonthProvider));
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Transform that converts a {@code BillingEvent} into an invoice CSV row. */
|
|
||||||
private static class GenerateInvoiceRows
|
|
||||||
extends PTransform<PCollection<BillingEvent>, PCollection<String>> {
|
|
||||||
@Override
|
|
||||||
public PCollection<String> expand(PCollection<BillingEvent> input) {
|
|
||||||
return input
|
|
||||||
.apply(
|
|
||||||
"Map to invoicing key",
|
|
||||||
MapElements.into(TypeDescriptor.of(InvoiceGroupingKey.class))
|
|
||||||
.via(BillingEvent::getInvoiceGroupingKey))
|
|
||||||
.setCoder(new InvoiceGroupingKeyCoder())
|
|
||||||
.apply("Count occurrences", Count.perElement())
|
|
||||||
.apply(
|
|
||||||
"Format as CSVs",
|
|
||||||
MapElements.into(TypeDescriptors.strings())
|
|
||||||
.via((KV<InvoiceGroupingKey, Long> kv) -> kv.getKey().toCsv(kv.getValue())));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Returns an IO transform that writes the overall invoice to a single CSV file. */
|
|
||||||
private TextIO.Write writeInvoice(ValueProvider<String> yearMonthProvider) {
|
|
||||||
return TextIO.write()
|
|
||||||
.to(
|
|
||||||
NestedValueProvider.of(
|
|
||||||
yearMonthProvider,
|
|
||||||
yearMonth ->
|
|
||||||
String.format(
|
|
||||||
"%s/%s/%s/%s-%s",
|
|
||||||
billingBucketUrl,
|
|
||||||
BillingModule.INVOICES_DIRECTORY,
|
|
||||||
yearMonth,
|
|
||||||
BillingModule.OVERALL_INVOICE_PREFIX,
|
|
||||||
yearMonth)))
|
|
||||||
.withHeader(InvoiceGroupingKey.invoiceHeader())
|
|
||||||
.withoutSharding()
|
|
||||||
.withSuffix(".csv");
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Returns an IO transform that writes detail reports to registrar-tld keyed CSV files. */
|
|
||||||
private TextIO.TypedWrite<BillingEvent, Params> writeDetailReports(
|
|
||||||
ValueProvider<String> yearMonthProvider) {
|
|
||||||
return TextIO.<BillingEvent>writeCustomType()
|
|
||||||
.to(
|
|
||||||
InvoicingUtils.makeDestinationFunction(
|
|
||||||
String.format("%s/%s", billingBucketUrl, BillingModule.INVOICES_DIRECTORY),
|
|
||||||
yearMonthProvider),
|
|
||||||
InvoicingUtils.makeEmptyDestinationParams(billingBucketUrl + "/errors"))
|
|
||||||
.withFormatFunction(BillingEvent::toCsv)
|
|
||||||
.withoutSharding()
|
|
||||||
.withTempDirectory(
|
|
||||||
FileBasedSink.convertToFileResourceIfPossible(beamBucketUrl + "/temporary"))
|
|
||||||
.withHeader(BillingEvent.getHeader())
|
|
||||||
.withSuffix(".csv");
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,112 +0,0 @@
|
||||||
// Copyright 2017 The Nomulus Authors. All Rights Reserved.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package google.registry.beam;
|
|
||||||
|
|
||||||
import com.google.common.io.Resources;
|
|
||||||
import google.registry.util.ResourceUtils;
|
|
||||||
import google.registry.util.SqlTemplate;
|
|
||||||
import java.time.LocalDateTime;
|
|
||||||
import java.time.LocalTime;
|
|
||||||
import java.time.YearMonth;
|
|
||||||
import java.time.format.DateTimeFormatter;
|
|
||||||
import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
|
|
||||||
import org.apache.beam.sdk.io.FileBasedSink;
|
|
||||||
import org.apache.beam.sdk.options.ValueProvider;
|
|
||||||
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
|
|
||||||
import org.apache.beam.sdk.transforms.SerializableFunction;
|
|
||||||
|
|
||||||
/** Pipeline helper functions used to generate invoices from instances of {@link BillingEvent}. */
|
|
||||||
public class InvoicingUtils {
|
|
||||||
|
|
||||||
private InvoicingUtils() {}
|
|
||||||
|
|
||||||
private static final DateTimeFormatter TIMESTAMP_FORMATTER =
|
|
||||||
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns a function mapping from {@code BillingEvent} to filename {@code Params}.
|
|
||||||
*
|
|
||||||
* <p>Beam uses this to determine which file a given {@code BillingEvent} should get placed into.
|
|
||||||
*
|
|
||||||
* @param outputBucket the GCS bucket we're outputting reports to
|
|
||||||
* @param yearMonthProvider a runtime provider for the yyyy-MM we're generating the invoice for
|
|
||||||
*/
|
|
||||||
static SerializableFunction<BillingEvent, Params> makeDestinationFunction(
|
|
||||||
String outputBucket, ValueProvider<String> yearMonthProvider) {
|
|
||||||
return billingEvent ->
|
|
||||||
new Params()
|
|
||||||
.withShardTemplate("")
|
|
||||||
.withSuffix(".csv")
|
|
||||||
.withBaseFilename(
|
|
||||||
NestedValueProvider.of(
|
|
||||||
yearMonthProvider,
|
|
||||||
yearMonth ->
|
|
||||||
FileBasedSink.convertToFileResourceIfPossible(
|
|
||||||
String.format(
|
|
||||||
"%s/%s/%s",
|
|
||||||
outputBucket, yearMonth, billingEvent.toFilename(yearMonth)))));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the default filename parameters for an unmappable {@code BillingEvent}.
|
|
||||||
*
|
|
||||||
* <p>The "failed" file should only be populated when an error occurs, which warrants further
|
|
||||||
* investigation.
|
|
||||||
*/
|
|
||||||
static Params makeEmptyDestinationParams(String outputBucket) {
|
|
||||||
return new Params()
|
|
||||||
.withBaseFilename(
|
|
||||||
FileBasedSink.convertToFileResourceIfPossible(
|
|
||||||
String.format("%s/%s", outputBucket, "FAILURES")));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns a provider that creates a Bigquery query for a given project and yearMonth at runtime.
|
|
||||||
*
|
|
||||||
* <p>We only know yearMonth at runtime, so this provider fills in the {@code
|
|
||||||
* sql/billing_events.sql} template at runtime.
|
|
||||||
*
|
|
||||||
* @param yearMonthProvider a runtime provider that returns which month we're invoicing for.
|
|
||||||
* @param projectId the projectId we're generating invoicing for.
|
|
||||||
*/
|
|
||||||
static ValueProvider<String> makeQueryProvider(
|
|
||||||
ValueProvider<String> yearMonthProvider, String projectId) {
|
|
||||||
return NestedValueProvider.of(
|
|
||||||
yearMonthProvider,
|
|
||||||
(yearMonth) -> {
|
|
||||||
// Get the timestamp endpoints capturing the entire month with microsecond precision
|
|
||||||
YearMonth reportingMonth = YearMonth.parse(yearMonth);
|
|
||||||
LocalDateTime firstMoment = reportingMonth.atDay(1).atTime(LocalTime.MIDNIGHT);
|
|
||||||
LocalDateTime lastMoment = reportingMonth.atEndOfMonth().atTime(LocalTime.MAX);
|
|
||||||
// Construct the month's query by filling in the billing_events.sql template
|
|
||||||
return SqlTemplate.create(getQueryFromFile("billing_events.sql"))
|
|
||||||
.put("FIRST_TIMESTAMP_OF_MONTH", firstMoment.format(TIMESTAMP_FORMATTER))
|
|
||||||
.put("LAST_TIMESTAMP_OF_MONTH", lastMoment.format(TIMESTAMP_FORMATTER))
|
|
||||||
.put("PROJECT_ID", projectId)
|
|
||||||
.put("DATASTORE_EXPORT_DATA_SET", "latest_datastore_export")
|
|
||||||
.put("ONETIME_TABLE", "OneTime")
|
|
||||||
.put("REGISTRY_TABLE", "Registry")
|
|
||||||
.put("REGISTRAR_TABLE", "Registrar")
|
|
||||||
.put("CANCELLATION_TABLE", "Cancellation")
|
|
||||||
.build();
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Returns the {@link String} contents for a file in the {@code beam/sql/} directory. */
|
|
||||||
private static String getQueryFromFile(String filename) {
|
|
||||||
return ResourceUtils.readResourceUtf8(
|
|
||||||
Resources.getResource(InvoicingUtils.class, "sql/" + filename));
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,100 +0,0 @@
|
||||||
#standardSQL
|
|
||||||
-- Copyright 2017 The Nomulus Authors. All Rights Reserved.
|
|
||||||
--
|
|
||||||
-- Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
-- you may not use this file except in compliance with the License.
|
|
||||||
-- You may obtain a copy of the License at
|
|
||||||
--
|
|
||||||
-- http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
--
|
|
||||||
-- Unless required by applicable law or agreed to in writing, software
|
|
||||||
-- distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
-- See the License for the specific language governing permissions and
|
|
||||||
-- limitations under the License.
|
|
||||||
|
|
||||||
-- This query gathers all non-canceled billing events for a given
|
|
||||||
-- YEAR_MONTH in yyyy-MM format.
|
|
||||||
|
|
||||||
SELECT
|
|
||||||
__key__.id AS id,
|
|
||||||
billingTime,
|
|
||||||
eventTime,
|
|
||||||
BillingEvent.clientId AS registrarId,
|
|
||||||
RegistrarData.accountId AS billingId,
|
|
||||||
tld,
|
|
||||||
reason as action,
|
|
||||||
targetId as domain,
|
|
||||||
BillingEvent.domainRepoId as repositoryId,
|
|
||||||
periodYears as years,
|
|
||||||
BillingEvent.currency AS currency,
|
|
||||||
BillingEvent.amount as amount,
|
|
||||||
-- We'll strip out non-useful flags downstream
|
|
||||||
ARRAY_TO_STRING(flags, " ") AS flags
|
|
||||||
FROM (
|
|
||||||
SELECT
|
|
||||||
*,
|
|
||||||
-- We store cost as "CURRENCY AMOUNT" such as "JPY 800" or "USD 20.00"
|
|
||||||
SPLIT(cost, ' ')[OFFSET(0)] AS currency,
|
|
||||||
SPLIT(cost, ' ')[OFFSET(1)] AS amount,
|
|
||||||
-- Extract everything after the first dot in the domain as the TLD
|
|
||||||
REGEXP_EXTRACT(targetId, r'[.](.+)') AS tld,
|
|
||||||
-- __key__.path looks like '"DomainBase", "<repoId>", ...'
|
|
||||||
REGEXP_REPLACE(SPLIT(__key__.path, ', ')[OFFSET(1)], '"', '')
|
|
||||||
AS domainRepoId,
|
|
||||||
COALESCE(cancellationMatchingBillingEvent.path,
|
|
||||||
__key__.path) AS cancellationMatchingPath
|
|
||||||
FROM
|
|
||||||
`%PROJECT_ID%.%DATASTORE_EXPORT_DATA_SET%.%ONETIME_TABLE%`
|
|
||||||
-- Only include real TLDs (filter prober data)
|
|
||||||
WHERE
|
|
||||||
REGEXP_EXTRACT(targetId, r'[.](.+)') IN (
|
|
||||||
SELECT
|
|
||||||
tldStr
|
|
||||||
FROM
|
|
||||||
`%PROJECT_ID%.%DATASTORE_EXPORT_DATA_SET%.%REGISTRY_TABLE%`
|
|
||||||
WHERE
|
|
||||||
-- TODO(b/18092292): Add a filter for tldState (not PDT/PREDELEGATION)
|
|
||||||
tldType = 'REAL') ) AS BillingEvent
|
|
||||||
-- Gather billing ID from registrar table
|
|
||||||
-- This is a 'JOIN' as opposed to 'LEFT JOIN' to filter out
|
|
||||||
-- non-billable registrars
|
|
||||||
JOIN (
|
|
||||||
SELECT
|
|
||||||
__key__.name AS clientId,
|
|
||||||
billingIdentifier,
|
|
||||||
r.billingAccountMap.currency[SAFE_OFFSET(index)] AS currency,
|
|
||||||
r.billingAccountMap.accountId[SAFE_OFFSET(index)] AS accountId
|
|
||||||
FROM
|
|
||||||
`%PROJECT_ID%.%DATASTORE_EXPORT_DATA_SET%.%REGISTRAR_TABLE%` AS r,
|
|
||||||
UNNEST(GENERATE_ARRAY(0, ARRAY_LENGTH(r.billingAccountMap.currency) - 1))
|
|
||||||
AS index
|
|
||||||
WHERE billingAccountMap IS NOT NULL
|
|
||||||
AND type = 'REAL') AS RegistrarData
|
|
||||||
ON
|
|
||||||
BillingEvent.clientId = RegistrarData.clientId
|
|
||||||
AND BillingEvent.currency = RegistrarData.currency
|
|
||||||
-- Gather cancellations
|
|
||||||
LEFT JOIN (
|
|
||||||
SELECT __key__.id AS cancellationId,
|
|
||||||
COALESCE(refOneTime.path, refRecurring.path) AS cancelledEventPath,
|
|
||||||
eventTime as cancellationTime,
|
|
||||||
billingTime as cancellationBillingTime
|
|
||||||
FROM
|
|
||||||
(SELECT
|
|
||||||
*,
|
|
||||||
-- Count everything after first dot as TLD (to support multi-part TLDs).
|
|
||||||
REGEXP_EXTRACT(targetId, r'[.](.+)') AS tld
|
|
||||||
FROM
|
|
||||||
`%PROJECT_ID%.%DATASTORE_EXPORT_DATA_SET%.%CANCELLATION_TABLE%`)
|
|
||||||
) AS Cancellation
|
|
||||||
ON BillingEvent.cancellationMatchingPath = Cancellation.cancelledEventPath
|
|
||||||
AND BillingEvent.billingTime = Cancellation.cancellationBillingTime
|
|
||||||
WHERE billingTime BETWEEN TIMESTAMP('%FIRST_TIMESTAMP_OF_MONTH%')
|
|
||||||
AND TIMESTAMP('%LAST_TIMESTAMP_OF_MONTH%')
|
|
||||||
-- Filter out canceled events
|
|
||||||
AND Cancellation.cancellationId IS NULL
|
|
||||||
ORDER BY
|
|
||||||
billingTime DESC,
|
|
||||||
id,
|
|
||||||
tld
|
|
|
@ -510,7 +510,7 @@ public final class RegistryConfig {
|
||||||
* Returns the URL of the GCS location for storing the monthly invoicing Beam template.
|
* Returns the URL of the GCS location for storing the monthly invoicing Beam template.
|
||||||
*
|
*
|
||||||
* @see google.registry.reporting.billing.GenerateInvoicesAction
|
* @see google.registry.reporting.billing.GenerateInvoicesAction
|
||||||
* @see google.registry.beam.InvoicingPipeline
|
* @see google.registry.beam.invoicing.InvoicingPipeline
|
||||||
*/
|
*/
|
||||||
@Provides
|
@Provides
|
||||||
@Config("invoiceTemplateUrl")
|
@Config("invoiceTemplateUrl")
|
||||||
|
@ -522,7 +522,7 @@ public final class RegistryConfig {
|
||||||
/**
|
/**
|
||||||
* Returns the URL of the GCS location we store jar dependencies for the invoicing pipeline.
|
* Returns the URL of the GCS location we store jar dependencies for the invoicing pipeline.
|
||||||
*
|
*
|
||||||
* @see google.registry.beam.InvoicingPipeline
|
* @see google.registry.beam.invoicing.InvoicingPipeline
|
||||||
*/
|
*/
|
||||||
@Provides
|
@Provides
|
||||||
@Config("invoiceStagingUrl")
|
@Config("invoiceStagingUrl")
|
||||||
|
@ -580,7 +580,7 @@ public final class RegistryConfig {
|
||||||
/**
|
/**
|
||||||
* Returns the URL of the GCS bucket we store invoices and detail reports in.
|
* Returns the URL of the GCS bucket we store invoices and detail reports in.
|
||||||
*
|
*
|
||||||
* @see google.registry.beam.InvoicingPipeline
|
* @see google.registry.beam.invoicing.InvoicingPipeline
|
||||||
*/
|
*/
|
||||||
@Provides
|
@Provides
|
||||||
@Config("billingBucketUrl")
|
@Config("billingBucketUrl")
|
||||||
|
|
|
@ -43,9 +43,9 @@ import org.joda.time.YearMonth;
|
||||||
* Invokes the {@code InvoicingPipeline} beam template via the REST api, and enqueues the {@link
|
* Invokes the {@code InvoicingPipeline} beam template via the REST api, and enqueues the {@link
|
||||||
* PublishInvoicesAction} to publish the subsequent output.
|
* PublishInvoicesAction} to publish the subsequent output.
|
||||||
*
|
*
|
||||||
* <p>This action runs the {@link google.registry.beam.InvoicingPipeline} beam template, staged at
|
* <p>This action runs the {@link google.registry.beam.invoicing.InvoicingPipeline} beam template,
|
||||||
* gs://<projectId>-beam/templates/invoicing. The pipeline then generates invoices for the month and
|
* staged at gs://<projectId>-beam/templates/invoicing. The pipeline then generates invoices for the
|
||||||
* stores them on GCS.
|
* month and stores them on GCS.
|
||||||
*/
|
*/
|
||||||
@Action(path = GenerateInvoicesAction.PATH, method = POST, auth = Auth.AUTH_INTERNAL_ONLY)
|
@Action(path = GenerateInvoicesAction.PATH, method = POST, auth = Auth.AUTH_INTERNAL_ONLY)
|
||||||
public class GenerateInvoicesAction implements Runnable {
|
public class GenerateInvoicesAction implements Runnable {
|
||||||
|
|
|
@ -37,7 +37,7 @@ import javax.inject.Inject;
|
||||||
import org.joda.time.YearMonth;
|
import org.joda.time.YearMonth;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Uploads the results of the {@link google.registry.beam.InvoicingPipeline}.
|
* Uploads the results of the {@link google.registry.beam.invoicing.InvoicingPipeline}.
|
||||||
*
|
*
|
||||||
* <p>This relies on the retry semantics in {@code queue.xml} to ensure proper upload, in spite of
|
* <p>This relies on the retry semantics in {@code queue.xml} to ensure proper upload, in spite of
|
||||||
* fluctuations in generation timing.
|
* fluctuations in generation timing.
|
||||||
|
|
|
@ -35,7 +35,7 @@ java_library(
|
||||||
visibility = [":allowed-tools"],
|
visibility = [":allowed-tools"],
|
||||||
deps = [
|
deps = [
|
||||||
"//java/google/registry/backup",
|
"//java/google/registry/backup",
|
||||||
"//java/google/registry/beam",
|
"//java/google/registry/beam/invoicing",
|
||||||
"//java/google/registry/bigquery",
|
"//java/google/registry/bigquery",
|
||||||
"//java/google/registry/config",
|
"//java/google/registry/config",
|
||||||
"//java/google/registry/dns",
|
"//java/google/registry/dns",
|
||||||
|
|
|
@ -15,10 +15,10 @@
|
||||||
package google.registry.tools;
|
package google.registry.tools;
|
||||||
|
|
||||||
import com.beust.jcommander.Parameters;
|
import com.beust.jcommander.Parameters;
|
||||||
import google.registry.beam.InvoicingPipeline;
|
import google.registry.beam.invoicing.InvoicingPipeline;
|
||||||
import javax.inject.Inject;
|
import javax.inject.Inject;
|
||||||
|
|
||||||
/** Nomulus command that deploys the {@link google.registry.beam.InvoicingPipeline} template. */
|
/** Nomulus command that deploys the {@link InvoicingPipeline} template. */
|
||||||
@Parameters(commandDescription = "Deploy the invoicing pipeline to GCS.")
|
@Parameters(commandDescription = "Deploy the invoicing pipeline to GCS.")
|
||||||
public class DeployInvoicingPipelineCommand implements Command {
|
public class DeployInvoicingPipelineCommand implements Command {
|
||||||
|
|
||||||
|
|
|
@ -9,31 +9,4 @@ load("//java/com/google/testing/builddefs:GenTestRules.bzl", "GenTestRules")
|
||||||
|
|
||||||
java_library(
|
java_library(
|
||||||
name = "beam",
|
name = "beam",
|
||||||
srcs = glob(["*.java"]),
|
|
||||||
resources = glob(["testdata/*"]),
|
|
||||||
deps = [
|
|
||||||
"//java/google/registry/beam",
|
|
||||||
"//java/google/registry/util",
|
|
||||||
"//javatests/google/registry/testing",
|
|
||||||
"@com_google_apis_google_api_services_bigquery",
|
|
||||||
"@com_google_dagger",
|
|
||||||
"@com_google_guava",
|
|
||||||
"@com_google_truth",
|
|
||||||
"@com_google_truth_extensions_truth_java8_extension",
|
|
||||||
"@junit",
|
|
||||||
"@org_apache_avro",
|
|
||||||
"@org_apache_beam_runners_direct_java",
|
|
||||||
"@org_apache_beam_runners_google_cloud_dataflow_java",
|
|
||||||
"@org_apache_beam_sdks_java_core",
|
|
||||||
"@org_apache_beam_sdks_java_io_google_cloud_platform",
|
|
||||||
"@org_mockito_all",
|
|
||||||
],
|
|
||||||
)
|
|
||||||
|
|
||||||
GenTestRules(
|
|
||||||
name = "GeneratedTestRules",
|
|
||||||
default_test_size = "small",
|
|
||||||
medium_tests = ["InvoicingPipelineTest.java"],
|
|
||||||
test_files = glob(["*Test.java"]),
|
|
||||||
deps = [":beam"],
|
|
||||||
)
|
)
|
||||||
|
|
|
@ -1,84 +0,0 @@
|
||||||
// Copyright 2017 The Nomulus Authors. All Rights Reserved.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package google.registry.beam;
|
|
||||||
|
|
||||||
import static com.google.common.truth.Truth.assertThat;
|
|
||||||
|
|
||||||
import com.google.api.services.bigquery.model.TableRow;
|
|
||||||
import com.google.common.collect.ImmutableList;
|
|
||||||
import google.registry.beam.BigqueryTemplatePipeline.CountRequestPaths;
|
|
||||||
import google.registry.beam.BigqueryTemplatePipeline.ExtractRequestPathFn;
|
|
||||||
import java.util.List;
|
|
||||||
import org.apache.beam.runners.direct.DirectRunner;
|
|
||||||
import org.apache.beam.sdk.options.PipelineOptions;
|
|
||||||
import org.apache.beam.sdk.options.PipelineOptionsFactory;
|
|
||||||
import org.apache.beam.sdk.testing.PAssert;
|
|
||||||
import org.apache.beam.sdk.testing.TestPipeline;
|
|
||||||
import org.apache.beam.sdk.transforms.Create;
|
|
||||||
import org.apache.beam.sdk.transforms.DoFnTester;
|
|
||||||
import org.apache.beam.sdk.values.PCollection;
|
|
||||||
import org.junit.BeforeClass;
|
|
||||||
import org.junit.Rule;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.runner.RunWith;
|
|
||||||
import org.junit.runners.JUnit4;
|
|
||||||
|
|
||||||
/** Unit tests for {@link BigqueryTemplatePipeline}*/
|
|
||||||
@RunWith(JUnit4.class)
|
|
||||||
public class BigqueryTemplatePipelineTest {
|
|
||||||
|
|
||||||
private static PipelineOptions pipelineOptions;
|
|
||||||
|
|
||||||
@BeforeClass
|
|
||||||
public static void initializePipelineOptions() {
|
|
||||||
pipelineOptions = PipelineOptionsFactory.create();
|
|
||||||
pipelineOptions.setRunner(DirectRunner.class);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Rule public final transient TestPipeline p = TestPipeline.fromOptions(pipelineOptions);
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testExtractRequestPathFn() throws Exception {
|
|
||||||
ExtractRequestPathFn extractRequestPathFn = new ExtractRequestPathFn();
|
|
||||||
DoFnTester<TableRow, String> fnTester = DoFnTester.of(extractRequestPathFn);
|
|
||||||
TableRow emptyRow = new TableRow();
|
|
||||||
TableRow hasRequestPathRow = new TableRow().set("requestPath", "a/path");
|
|
||||||
TableRow hasOtherValueRow = new TableRow().set("anotherValue", "b/lah");
|
|
||||||
List<String> outputs = fnTester.processBundle(emptyRow, hasRequestPathRow, hasOtherValueRow);
|
|
||||||
assertThat(outputs).containsExactly("null", "a/path", "null");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testEndToEndPipeline() {
|
|
||||||
ImmutableList<TableRow> inputRows =
|
|
||||||
ImmutableList.of(
|
|
||||||
new TableRow(),
|
|
||||||
new TableRow().set("requestPath", "a/path"),
|
|
||||||
new TableRow().set("requestPath", "b/path"),
|
|
||||||
new TableRow().set("requestPath", "b/path"),
|
|
||||||
new TableRow().set("anotherValue", "b/path"));
|
|
||||||
|
|
||||||
PCollection<TableRow> input = p.apply(Create.of(inputRows));
|
|
||||||
PCollection<String> output = input.apply(new CountRequestPaths());
|
|
||||||
|
|
||||||
ImmutableList<String> outputStrings = new ImmutableList.Builder<String>()
|
|
||||||
.add("a/path: 1")
|
|
||||||
.add("b/path: 2")
|
|
||||||
.add("null: 2")
|
|
||||||
.build();
|
|
||||||
PAssert.that(output).containsInAnyOrder(outputStrings);
|
|
||||||
p.run();
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,188 +0,0 @@
|
||||||
// Copyright 2017 The Nomulus Authors. All Rights Reserved.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package google.registry.beam;
|
|
||||||
|
|
||||||
import static com.google.common.truth.Truth.assertThat;
|
|
||||||
import static google.registry.testing.JUnitBackports.assertThrows;
|
|
||||||
|
|
||||||
import google.registry.beam.BillingEvent.InvoiceGroupingKey;
|
|
||||||
import google.registry.beam.BillingEvent.InvoiceGroupingKey.InvoiceGroupingKeyCoder;
|
|
||||||
import java.io.ByteArrayInputStream;
|
|
||||||
import java.io.ByteArrayOutputStream;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStream;
|
|
||||||
import java.time.ZoneId;
|
|
||||||
import java.time.ZonedDateTime;
|
|
||||||
import org.apache.avro.Schema;
|
|
||||||
import org.apache.avro.generic.GenericData;
|
|
||||||
import org.apache.avro.generic.GenericRecord;
|
|
||||||
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.runner.RunWith;
|
|
||||||
import org.junit.runners.JUnit4;
|
|
||||||
|
|
||||||
/** Unit tests for {@link BillingEvent} */
|
|
||||||
@RunWith(JUnit4.class)
|
|
||||||
public class BillingEventTest {
|
|
||||||
|
|
||||||
private static final String BILLING_EVENT_SCHEMA =
|
|
||||||
"{\"name\": \"BillingEvent\", "
|
|
||||||
+ "\"type\": \"record\", "
|
|
||||||
+ "\"fields\": ["
|
|
||||||
+ "{\"name\": \"id\", \"type\": \"long\"},"
|
|
||||||
+ "{\"name\": \"billingTime\", \"type\": \"string\"},"
|
|
||||||
+ "{\"name\": \"eventTime\", \"type\": \"string\"},"
|
|
||||||
+ "{\"name\": \"registrarId\", \"type\": \"string\"},"
|
|
||||||
+ "{\"name\": \"billingId\", \"type\": \"long\"},"
|
|
||||||
+ "{\"name\": \"tld\", \"type\": \"string\"},"
|
|
||||||
+ "{\"name\": \"action\", \"type\": \"string\"},"
|
|
||||||
+ "{\"name\": \"domain\", \"type\": \"string\"},"
|
|
||||||
+ "{\"name\": \"repositoryId\", \"type\": \"string\"},"
|
|
||||||
+ "{\"name\": \"years\", \"type\": \"int\"},"
|
|
||||||
+ "{\"name\": \"currency\", \"type\": \"string\"},"
|
|
||||||
+ "{\"name\": \"amount\", \"type\": \"float\"},"
|
|
||||||
+ "{\"name\": \"flags\", \"type\": \"string\"}"
|
|
||||||
+ "]}";
|
|
||||||
|
|
||||||
private SchemaAndRecord schemaAndRecord;
|
|
||||||
|
|
||||||
@Before
|
|
||||||
public void initializeRecord() {
|
|
||||||
// Create a record with a given JSON schema.
|
|
||||||
GenericRecord record = new GenericData.Record(new Schema.Parser().parse(BILLING_EVENT_SCHEMA));
|
|
||||||
record.put("id", "1");
|
|
||||||
record.put("billingTime", 1508835963000000L);
|
|
||||||
record.put("eventTime", 1484870383000000L);
|
|
||||||
record.put("registrarId", "myRegistrar");
|
|
||||||
record.put("billingId", "12345-CRRHELLO");
|
|
||||||
record.put("tld", "test");
|
|
||||||
record.put("action", "RENEW");
|
|
||||||
record.put("domain", "example.test");
|
|
||||||
record.put("repositoryId", "123456");
|
|
||||||
record.put("years", 5);
|
|
||||||
record.put("currency", "USD");
|
|
||||||
record.put("amount", 20.5);
|
|
||||||
record.put("flags", "AUTO_RENEW SYNTHETIC");
|
|
||||||
schemaAndRecord = new SchemaAndRecord(record, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testParseBillingEventFromRecord_success() {
|
|
||||||
BillingEvent event = BillingEvent.parseFromRecord(schemaAndRecord);
|
|
||||||
assertThat(event.id()).isEqualTo(1);
|
|
||||||
assertThat(event.billingTime())
|
|
||||||
.isEqualTo(ZonedDateTime.of(2017, 10, 24, 9, 6, 3, 0, ZoneId.of("UTC")));
|
|
||||||
assertThat(event.eventTime())
|
|
||||||
.isEqualTo(ZonedDateTime.of(2017, 1, 19, 23, 59, 43, 0, ZoneId.of("UTC")));
|
|
||||||
assertThat(event.registrarId()).isEqualTo("myRegistrar");
|
|
||||||
assertThat(event.billingId()).isEqualTo("12345-CRRHELLO");
|
|
||||||
assertThat(event.tld()).isEqualTo("test");
|
|
||||||
assertThat(event.action()).isEqualTo("RENEW");
|
|
||||||
assertThat(event.domain()).isEqualTo("example.test");
|
|
||||||
assertThat(event.repositoryId()).isEqualTo("123456");
|
|
||||||
assertThat(event.years()).isEqualTo(5);
|
|
||||||
assertThat(event.currency()).isEqualTo("USD");
|
|
||||||
assertThat(event.amount()).isEqualTo(20.5);
|
|
||||||
assertThat(event.flags()).isEqualTo("AUTO_RENEW SYNTHETIC");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testParseBillingEventFromRecord_sunriseCreate_reducedPrice_success() {
|
|
||||||
schemaAndRecord.getRecord().put("flags", "SUNRISE");
|
|
||||||
BillingEvent event = BillingEvent.parseFromRecord(schemaAndRecord);
|
|
||||||
assertThat(event.amount()).isEqualTo(17.43);
|
|
||||||
assertThat(event.flags()).isEqualTo("SUNRISE");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testParseBillingEventFromRecord_anchorTenant_zeroPrice_success() {
|
|
||||||
schemaAndRecord.getRecord().put("flags", "SUNRISE ANCHOR_TENANT");
|
|
||||||
BillingEvent event = BillingEvent.parseFromRecord(schemaAndRecord);
|
|
||||||
assertThat(event.amount()).isZero();
|
|
||||||
assertThat(event.flags()).isEqualTo("SUNRISE ANCHOR_TENANT");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testParseBillingEventFromRecord_nullValue_throwsException() {
|
|
||||||
schemaAndRecord.getRecord().put("tld", null);
|
|
||||||
assertThrows(IllegalStateException.class, () -> BillingEvent.parseFromRecord(schemaAndRecord));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testConvertBillingEvent_toCsv() {
|
|
||||||
BillingEvent event = BillingEvent.parseFromRecord(schemaAndRecord);
|
|
||||||
assertThat(event.toCsv())
|
|
||||||
.isEqualTo("1,2017-10-24 09:06:03 UTC,2017-01-19 23:59:43 UTC,myRegistrar,"
|
|
||||||
+ "12345-CRRHELLO,test,RENEW,example.test,123456,5,USD,20.50,AUTO_RENEW");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testGenerateBillingEventFilename() {
|
|
||||||
BillingEvent event = BillingEvent.parseFromRecord(schemaAndRecord);
|
|
||||||
assertThat(event.toFilename("2017-10")).isEqualTo("invoice_details_2017-10_myRegistrar_test");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testGetInvoiceGroupingKey_fromBillingEvent() {
|
|
||||||
BillingEvent event = BillingEvent.parseFromRecord(schemaAndRecord);
|
|
||||||
InvoiceGroupingKey invoiceKey = event.getInvoiceGroupingKey();
|
|
||||||
assertThat(invoiceKey.startDate()).isEqualTo("2017-10-01");
|
|
||||||
assertThat(invoiceKey.endDate()).isEqualTo("2022-09-30");
|
|
||||||
assertThat(invoiceKey.productAccountKey()).isEqualTo("12345-CRRHELLO");
|
|
||||||
assertThat(invoiceKey.usageGroupingKey()).isEqualTo("myRegistrar - test");
|
|
||||||
assertThat(invoiceKey.description()).isEqualTo("RENEW | TLD: test | TERM: 5-year");
|
|
||||||
assertThat(invoiceKey.unitPrice()).isEqualTo(20.5);
|
|
||||||
assertThat(invoiceKey.unitPriceCurrency()).isEqualTo("USD");
|
|
||||||
assertThat(invoiceKey.poNumber()).isEmpty();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testConvertInvoiceGroupingKey_toCsv() {
|
|
||||||
BillingEvent event = BillingEvent.parseFromRecord(schemaAndRecord);
|
|
||||||
InvoiceGroupingKey invoiceKey = event.getInvoiceGroupingKey();
|
|
||||||
assertThat(invoiceKey.toCsv(3L))
|
|
||||||
.isEqualTo(
|
|
||||||
"2017-10-01,2022-09-30,12345-CRRHELLO,61.50,USD,10125,1,PURCHASE,"
|
|
||||||
+ "myRegistrar - test,3,RENEW | TLD: test | TERM: 5-year,20.50,USD,");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testInvoiceGroupingKeyCoder_deterministicSerialization() throws IOException {
|
|
||||||
InvoiceGroupingKey invoiceKey =
|
|
||||||
BillingEvent.parseFromRecord(schemaAndRecord).getInvoiceGroupingKey();
|
|
||||||
InvoiceGroupingKeyCoder coder = new InvoiceGroupingKeyCoder();
|
|
||||||
ByteArrayOutputStream outStream = new ByteArrayOutputStream();
|
|
||||||
coder.encode(invoiceKey, outStream);
|
|
||||||
InputStream inStream = new ByteArrayInputStream(outStream.toByteArray());
|
|
||||||
assertThat(coder.decode(inStream)).isEqualTo(invoiceKey);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testGetDetailReportHeader() {
|
|
||||||
assertThat(BillingEvent.getHeader())
|
|
||||||
.isEqualTo(
|
|
||||||
"id,billingTime,eventTime,registrarId,billingId,tld,action,"
|
|
||||||
+ "domain,repositoryId,years,currency,amount,flags");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testGetOverallInvoiceHeader() {
|
|
||||||
assertThat(InvoiceGroupingKey.invoiceHeader())
|
|
||||||
.isEqualTo("StartDate,EndDate,ProductAccountKey,Amount,AmountCurrency,BillingProductCode,"
|
|
||||||
+ "SalesChannel,LineItemType,UsageGroupingKey,Quantity,Description,UnitPrice,"
|
|
||||||
+ "UnitPriceCurrency,PONumber");
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,215 +0,0 @@
|
||||||
// Copyright 2017 The Nomulus Authors. All Rights Reserved.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package google.registry.beam;
|
|
||||||
|
|
||||||
import static com.google.common.truth.Truth.assertThat;
|
|
||||||
|
|
||||||
import com.google.common.collect.ImmutableList;
|
|
||||||
import com.google.common.collect.ImmutableMap;
|
|
||||||
import google.registry.util.ResourceUtils;
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.time.ZoneId;
|
|
||||||
import java.time.ZonedDateTime;
|
|
||||||
import java.util.Map.Entry;
|
|
||||||
import org.apache.beam.runners.direct.DirectRunner;
|
|
||||||
import org.apache.beam.sdk.options.PipelineOptions;
|
|
||||||
import org.apache.beam.sdk.options.PipelineOptionsFactory;
|
|
||||||
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
|
|
||||||
import org.apache.beam.sdk.testing.TestPipeline;
|
|
||||||
import org.apache.beam.sdk.transforms.Create;
|
|
||||||
import org.apache.beam.sdk.values.PCollection;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.BeforeClass;
|
|
||||||
import org.junit.Rule;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.rules.TemporaryFolder;
|
|
||||||
import org.junit.runner.RunWith;
|
|
||||||
import org.junit.runners.JUnit4;
|
|
||||||
|
|
||||||
/** Unit tests for {@link InvoicingPipeline}. */
|
|
||||||
@RunWith(JUnit4.class)
|
|
||||||
public class InvoicingPipelineTest {
|
|
||||||
|
|
||||||
private static PipelineOptions pipelineOptions;
|
|
||||||
|
|
||||||
@BeforeClass
|
|
||||||
public static void initializePipelineOptions() {
|
|
||||||
pipelineOptions = PipelineOptionsFactory.create();
|
|
||||||
pipelineOptions.setRunner(DirectRunner.class);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Rule public final transient TestPipeline p = TestPipeline.fromOptions(pipelineOptions);
|
|
||||||
@Rule public final TemporaryFolder tempFolder = new TemporaryFolder();
|
|
||||||
|
|
||||||
private InvoicingPipeline invoicingPipeline;
|
|
||||||
|
|
||||||
@Before
|
|
||||||
public void initializePipeline() throws IOException {
|
|
||||||
invoicingPipeline = new InvoicingPipeline();
|
|
||||||
invoicingPipeline.projectId = "test-project";
|
|
||||||
File beamTempFolder = tempFolder.newFolder();
|
|
||||||
invoicingPipeline.beamBucketUrl = beamTempFolder.getAbsolutePath();
|
|
||||||
invoicingPipeline.invoiceStagingUrl = beamTempFolder.getAbsolutePath() + "/staging";
|
|
||||||
invoicingPipeline.invoiceTemplateUrl =
|
|
||||||
beamTempFolder.getAbsolutePath() + "/templates/invoicing";
|
|
||||||
invoicingPipeline.billingBucketUrl = tempFolder.getRoot().getAbsolutePath();
|
|
||||||
}
|
|
||||||
|
|
||||||
private ImmutableList<BillingEvent> getInputEvents() {
|
|
||||||
return ImmutableList.of(
|
|
||||||
BillingEvent.create(
|
|
||||||
1,
|
|
||||||
ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
|
|
||||||
ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
|
|
||||||
"theRegistrar",
|
|
||||||
"234",
|
|
||||||
"test",
|
|
||||||
"RENEW",
|
|
||||||
"mydomain.test",
|
|
||||||
"REPO-ID",
|
|
||||||
3,
|
|
||||||
"USD",
|
|
||||||
20.5,
|
|
||||||
""),
|
|
||||||
BillingEvent.create(
|
|
||||||
1,
|
|
||||||
ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
|
|
||||||
ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
|
|
||||||
"theRegistrar",
|
|
||||||
"234",
|
|
||||||
"test",
|
|
||||||
"RENEW",
|
|
||||||
"mydomain2.test",
|
|
||||||
"REPO-ID",
|
|
||||||
3,
|
|
||||||
"USD",
|
|
||||||
20.5,
|
|
||||||
""),
|
|
||||||
BillingEvent.create(
|
|
||||||
1,
|
|
||||||
ZonedDateTime.of(2017, 10, 2, 0, 0, 0, 0, ZoneId.of("UTC")),
|
|
||||||
ZonedDateTime.of(2017, 9, 29, 0, 0, 0, 0, ZoneId.of("UTC")),
|
|
||||||
"theRegistrar",
|
|
||||||
"234",
|
|
||||||
"hello",
|
|
||||||
"CREATE",
|
|
||||||
"mydomain3.hello",
|
|
||||||
"REPO-ID",
|
|
||||||
5,
|
|
||||||
"JPY",
|
|
||||||
70.75,
|
|
||||||
""),
|
|
||||||
BillingEvent.create(
|
|
||||||
1,
|
|
||||||
ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
|
|
||||||
ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
|
|
||||||
"googledomains",
|
|
||||||
"456",
|
|
||||||
"test",
|
|
||||||
"RENEW",
|
|
||||||
"mydomain4.test",
|
|
||||||
"REPO-ID",
|
|
||||||
1,
|
|
||||||
"USD",
|
|
||||||
20.5,
|
|
||||||
""),
|
|
||||||
BillingEvent.create(
|
|
||||||
1,
|
|
||||||
ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
|
|
||||||
ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
|
|
||||||
"anotherRegistrar",
|
|
||||||
"789",
|
|
||||||
"test",
|
|
||||||
"CREATE",
|
|
||||||
"mydomain5.test",
|
|
||||||
"REPO-ID",
|
|
||||||
1,
|
|
||||||
"USD",
|
|
||||||
0,
|
|
||||||
"SUNRISE ANCHOR_TENANT"));
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Returns a map from filename to expected contents for detail reports. */
|
|
||||||
private ImmutableMap<String, ImmutableList<String>> getExpectedDetailReportMap() {
|
|
||||||
return ImmutableMap.of(
|
|
||||||
"invoice_details_2017-10_theRegistrar_test.csv",
|
|
||||||
ImmutableList.of(
|
|
||||||
"1,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,theRegistrar,234,"
|
|
||||||
+ "test,RENEW,mydomain2.test,REPO-ID,3,USD,20.50,",
|
|
||||||
"1,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,theRegistrar,234,"
|
|
||||||
+ "test,RENEW,mydomain.test,REPO-ID,3,USD,20.50,"),
|
|
||||||
"invoice_details_2017-10_theRegistrar_hello.csv",
|
|
||||||
ImmutableList.of(
|
|
||||||
"1,2017-10-02 00:00:00 UTC,2017-09-29 00:00:00 UTC,theRegistrar,234,"
|
|
||||||
+ "hello,CREATE,mydomain3.hello,REPO-ID,5,JPY,70.75,"),
|
|
||||||
"invoice_details_2017-10_googledomains_test.csv",
|
|
||||||
ImmutableList.of(
|
|
||||||
"1,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,googledomains,456,"
|
|
||||||
+ "test,RENEW,mydomain4.test,REPO-ID,1,USD,20.50,"),
|
|
||||||
"invoice_details_2017-10_anotherRegistrar_test.csv",
|
|
||||||
ImmutableList.of(
|
|
||||||
"1,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,anotherRegistrar,789,"
|
|
||||||
+ "test,CREATE,mydomain5.test,REPO-ID,1,USD,0.00,SUNRISE ANCHOR_TENANT"));
|
|
||||||
}
|
|
||||||
|
|
||||||
private ImmutableList<String> getExpectedInvoiceOutput() {
|
|
||||||
return ImmutableList.of(
|
|
||||||
"2017-10-01,2020-09-30,234,41.00,USD,10125,1,PURCHASE,theRegistrar - test,2,"
|
|
||||||
+ "RENEW | TLD: test | TERM: 3-year,20.50,USD,",
|
|
||||||
"2017-10-01,2022-09-30,234,70.75,JPY,10125,1,PURCHASE,theRegistrar - hello,1,"
|
|
||||||
+ "CREATE | TLD: hello | TERM: 5-year,70.75,JPY,",
|
|
||||||
"2017-10-01,2018-09-30,456,20.50,USD,10125,1,PURCHASE,googledomains - test,1,"
|
|
||||||
+ "RENEW | TLD: test | TERM: 1-year,20.50,USD,",
|
|
||||||
"2017-10-01,2018-09-30,789,0.00,USD,10125,1,PURCHASE,anotherRegistrar - test,1,"
|
|
||||||
+ "CREATE | TLD: test | TERM: 1-year,0.00,USD,");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testEndToEndPipeline_generatesExpectedFiles() throws Exception {
|
|
||||||
ImmutableList<BillingEvent> inputRows = getInputEvents();
|
|
||||||
PCollection<BillingEvent> input = p.apply(Create.of(inputRows));
|
|
||||||
invoicingPipeline.applyTerminalTransforms(input, StaticValueProvider.of("2017-10"));
|
|
||||||
p.run();
|
|
||||||
|
|
||||||
for (Entry<String, ImmutableList<String>> entry : getExpectedDetailReportMap().entrySet()) {
|
|
||||||
ImmutableList<String> detailReport = resultFileContents(entry.getKey());
|
|
||||||
assertThat(detailReport.get(0))
|
|
||||||
.isEqualTo("id,billingTime,eventTime,registrarId,billingId,tld,action,"
|
|
||||||
+ "domain,repositoryId,years,currency,amount,flags");
|
|
||||||
assertThat(detailReport.subList(1, detailReport.size()))
|
|
||||||
.containsExactlyElementsIn(entry.getValue());
|
|
||||||
}
|
|
||||||
|
|
||||||
ImmutableList<String> overallInvoice = resultFileContents("CRR-INV-2017-10.csv");
|
|
||||||
assertThat(overallInvoice.get(0))
|
|
||||||
.isEqualTo(
|
|
||||||
"StartDate,EndDate,ProductAccountKey,Amount,AmountCurrency,BillingProductCode,"
|
|
||||||
+ "SalesChannel,LineItemType,UsageGroupingKey,Quantity,Description,UnitPrice,"
|
|
||||||
+ "UnitPriceCurrency,PONumber");
|
|
||||||
assertThat(overallInvoice.subList(1, overallInvoice.size()))
|
|
||||||
.containsExactlyElementsIn(getExpectedInvoiceOutput());
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Returns the text contents of a file under the beamBucket/results directory. */
|
|
||||||
private ImmutableList<String> resultFileContents(String filename) throws Exception {
|
|
||||||
File resultFile =
|
|
||||||
new File(
|
|
||||||
String.format(
|
|
||||||
"%s/invoices/2017-10/%s", tempFolder.getRoot().getAbsolutePath(), filename));
|
|
||||||
return ImmutableList.copyOf(
|
|
||||||
ResourceUtils.readResourceUtf8(resultFile.toURI().toURL()).split("\n"));
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,76 +0,0 @@
|
||||||
// Copyright 2017 The Nomulus Authors. All Rights Reserved.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package google.registry.beam;
|
|
||||||
|
|
||||||
import static com.google.common.truth.Truth.assertThat;
|
|
||||||
import static org.mockito.Matchers.any;
|
|
||||||
import static org.mockito.Mockito.mock;
|
|
||||||
import static org.mockito.Mockito.when;
|
|
||||||
|
|
||||||
import google.registry.testing.TestDataHelper;
|
|
||||||
import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
|
|
||||||
import org.apache.beam.sdk.io.FileBasedSink;
|
|
||||||
import org.apache.beam.sdk.options.ValueProvider;
|
|
||||||
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
|
|
||||||
import org.apache.beam.sdk.transforms.SerializableFunction;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.runner.RunWith;
|
|
||||||
import org.junit.runners.JUnit4;
|
|
||||||
|
|
||||||
/** Unit tests for {@link InvoicingUtils}. */
|
|
||||||
@RunWith(JUnit4.class)
|
|
||||||
public class InvoicingUtilsTest {
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testDestinationFunction_generatesProperFileParams() {
|
|
||||||
SerializableFunction<BillingEvent, Params> destinationFunction =
|
|
||||||
InvoicingUtils.makeDestinationFunction("my/directory", StaticValueProvider.of("2017-10"));
|
|
||||||
|
|
||||||
BillingEvent billingEvent = mock(BillingEvent.class);
|
|
||||||
// We mock BillingEvent to make the test independent of the implementation of toFilename()
|
|
||||||
when(billingEvent.toFilename(any())).thenReturn("invoice_details_2017-10_registrar_tld");
|
|
||||||
|
|
||||||
assertThat(destinationFunction.apply(billingEvent))
|
|
||||||
.isEqualTo(
|
|
||||||
new Params()
|
|
||||||
.withShardTemplate("")
|
|
||||||
.withSuffix(".csv")
|
|
||||||
.withBaseFilename(
|
|
||||||
FileBasedSink.convertToFileResourceIfPossible(
|
|
||||||
"my/directory/2017-10/invoice_details_2017-10_registrar_tld")));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testEmptyDestinationParams() {
|
|
||||||
assertThat(InvoicingUtils.makeEmptyDestinationParams("my/directory"))
|
|
||||||
.isEqualTo(
|
|
||||||
new Params()
|
|
||||||
.withBaseFilename(
|
|
||||||
FileBasedSink.convertToFileResourceIfPossible("my/directory/FAILURES")));
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Asserts that the instantiated sql template matches a golden expected file. */
|
|
||||||
@Test
|
|
||||||
public void testMakeQueryProvider() {
|
|
||||||
ValueProvider<String> queryProvider =
|
|
||||||
InvoicingUtils.makeQueryProvider(StaticValueProvider.of("2017-10"), "my-project-id");
|
|
||||||
assertThat(queryProvider.get()).isEqualTo(loadFile("billing_events_test.sql"));
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Returns a {@link String} from a file in the {@code billing/testdata/} directory. */
|
|
||||||
private static String loadFile(String filename) {
|
|
||||||
return TestDataHelper.loadFile(InvoicingUtilsTest.class, filename);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,100 +0,0 @@
|
||||||
#standardSQL
|
|
||||||
-- Copyright 2017 The Nomulus Authors. All Rights Reserved.
|
|
||||||
--
|
|
||||||
-- Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
-- you may not use this file except in compliance with the License.
|
|
||||||
-- You may obtain a copy of the License at
|
|
||||||
--
|
|
||||||
-- http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
--
|
|
||||||
-- Unless required by applicable law or agreed to in writing, software
|
|
||||||
-- distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
-- See the License for the specific language governing permissions and
|
|
||||||
-- limitations under the License.
|
|
||||||
|
|
||||||
-- This query gathers all non-canceled billing events for a given
|
|
||||||
-- YEAR_MONTH in yyyy-MM format.
|
|
||||||
|
|
||||||
SELECT
|
|
||||||
__key__.id AS id,
|
|
||||||
billingTime,
|
|
||||||
eventTime,
|
|
||||||
BillingEvent.clientId AS registrarId,
|
|
||||||
RegistrarData.accountId AS billingId,
|
|
||||||
tld,
|
|
||||||
reason as action,
|
|
||||||
targetId as domain,
|
|
||||||
BillingEvent.domainRepoId as repositoryId,
|
|
||||||
periodYears as years,
|
|
||||||
BillingEvent.currency AS currency,
|
|
||||||
BillingEvent.amount as amount,
|
|
||||||
-- We'll strip out non-useful flags downstream
|
|
||||||
ARRAY_TO_STRING(flags, " ") AS flags
|
|
||||||
FROM (
|
|
||||||
SELECT
|
|
||||||
*,
|
|
||||||
-- We store cost as "CURRENCY AMOUNT" such as "JPY 800" or "USD 20.00"
|
|
||||||
SPLIT(cost, ' ')[OFFSET(0)] AS currency,
|
|
||||||
SPLIT(cost, ' ')[OFFSET(1)] AS amount,
|
|
||||||
-- Extract everything after the first dot in the domain as the TLD
|
|
||||||
REGEXP_EXTRACT(targetId, r'[.](.+)') AS tld,
|
|
||||||
-- __key__.path looks like '"DomainBase", "<repoId>", ...'
|
|
||||||
REGEXP_REPLACE(SPLIT(__key__.path, ', ')[OFFSET(1)], '"', '')
|
|
||||||
AS domainRepoId,
|
|
||||||
COALESCE(cancellationMatchingBillingEvent.path,
|
|
||||||
__key__.path) AS cancellationMatchingPath
|
|
||||||
FROM
|
|
||||||
`my-project-id.latest_datastore_export.OneTime`
|
|
||||||
-- Only include real TLDs (filter prober data)
|
|
||||||
WHERE
|
|
||||||
REGEXP_EXTRACT(targetId, r'[.](.+)') IN (
|
|
||||||
SELECT
|
|
||||||
tldStr
|
|
||||||
FROM
|
|
||||||
`my-project-id.latest_datastore_export.Registry`
|
|
||||||
WHERE
|
|
||||||
-- TODO(b/18092292): Add a filter for tldState (not PDT/PREDELEGATION)
|
|
||||||
tldType = 'REAL') ) AS BillingEvent
|
|
||||||
-- Gather billing ID from registrar table
|
|
||||||
-- This is a 'JOIN' as opposed to 'LEFT JOIN' to filter out
|
|
||||||
-- non-billable registrars
|
|
||||||
JOIN (
|
|
||||||
SELECT
|
|
||||||
__key__.name AS clientId,
|
|
||||||
billingIdentifier,
|
|
||||||
r.billingAccountMap.currency[SAFE_OFFSET(index)] AS currency,
|
|
||||||
r.billingAccountMap.accountId[SAFE_OFFSET(index)] AS accountId
|
|
||||||
FROM
|
|
||||||
`my-project-id.latest_datastore_export.Registrar` AS r,
|
|
||||||
UNNEST(GENERATE_ARRAY(0, ARRAY_LENGTH(r.billingAccountMap.currency) - 1))
|
|
||||||
AS index
|
|
||||||
WHERE billingAccountMap IS NOT NULL
|
|
||||||
AND type = 'REAL') AS RegistrarData
|
|
||||||
ON
|
|
||||||
BillingEvent.clientId = RegistrarData.clientId
|
|
||||||
AND BillingEvent.currency = RegistrarData.currency
|
|
||||||
-- Gather cancellations
|
|
||||||
LEFT JOIN (
|
|
||||||
SELECT __key__.id AS cancellationId,
|
|
||||||
COALESCE(refOneTime.path, refRecurring.path) AS cancelledEventPath,
|
|
||||||
eventTime as cancellationTime,
|
|
||||||
billingTime as cancellationBillingTime
|
|
||||||
FROM
|
|
||||||
(SELECT
|
|
||||||
*,
|
|
||||||
-- Count everything after first dot as TLD (to support multi-part TLDs).
|
|
||||||
REGEXP_EXTRACT(targetId, r'[.](.+)') AS tld
|
|
||||||
FROM
|
|
||||||
`my-project-id.latest_datastore_export.Cancellation`)
|
|
||||||
) AS Cancellation
|
|
||||||
ON BillingEvent.cancellationMatchingPath = Cancellation.cancelledEventPath
|
|
||||||
AND BillingEvent.billingTime = Cancellation.cancellationBillingTime
|
|
||||||
WHERE billingTime BETWEEN TIMESTAMP('2017-10-01 00:00:00.000000')
|
|
||||||
AND TIMESTAMP('2017-10-31 23:59:59.999999')
|
|
||||||
-- Filter out canceled events
|
|
||||||
AND Cancellation.cancellationId IS NULL
|
|
||||||
ORDER BY
|
|
||||||
billingTime DESC,
|
|
||||||
id,
|
|
||||||
tld
|
|
Loading…
Add table
Add a link
Reference in a new issue