mirror of
https://github.com/google/nomulus.git
synced 2025-04-29 19:47:51 +02:00
Migrate the billing pipeline to flex template (#1100)
This is similar to the migration of the spec11 pipeline in #1073. Also removed a few Dagger providers that are no longer needed. TESTED=tested the dataflow job on alpha. <!-- Reviewable:start --> --- This change is [<img src="https://reviewable.io/review_button.svg" height="34" align="absmiddle" alt="Reviewable"/>](https://reviewable.io/reviews/google/nomulus/1100) <!-- Reviewable:end -->
This commit is contained in:
parent
b600faf08e
commit
feab3633a6
26 changed files with 554 additions and 807 deletions
|
@ -810,6 +810,10 @@ if (environment in ['alpha', 'crash']) {
|
|||
mainClass: 'google.registry.beam.spec11.Spec11Pipeline',
|
||||
metaData: 'google/registry/beam/spec11_pipeline_metadata.json'
|
||||
],
|
||||
[
|
||||
mainClass: 'google.registry.beam.invoicing.InvoicingPipeline',
|
||||
metaData: 'google/registry/beam/invoicing_pipeline_metadata.json'
|
||||
],
|
||||
]
|
||||
project.tasks.create("stage_beam_pipelines") {
|
||||
doLast {
|
||||
|
|
|
@ -260,6 +260,11 @@ public abstract class BillingEvent implements Serializable {
|
|||
poNumber());
|
||||
}
|
||||
|
||||
/** Returns the grouping key for this {@code BillingEvent}, to generate the detailed report. */
|
||||
String getDetailedReportGroupingKey() {
|
||||
return String.format("%s_%s", registrarId(), tld());
|
||||
}
|
||||
|
||||
/** Key for each {@code BillingEvent}, when aggregating for the overall invoice. */
|
||||
@AutoValue
|
||||
abstract static class InvoiceGroupingKey implements Serializable {
|
||||
|
|
|
@ -14,28 +14,28 @@
|
|||
|
||||
package google.registry.beam.invoicing;
|
||||
|
||||
import com.google.auth.oauth2.GoogleCredentials;
|
||||
import static google.registry.beam.BeamUtils.getQueryFromFile;
|
||||
import static org.apache.beam.sdk.values.TypeDescriptors.strings;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import google.registry.beam.invoicing.BillingEvent.InvoiceGroupingKey;
|
||||
import google.registry.beam.invoicing.BillingEvent.InvoiceGroupingKey.InvoiceGroupingKeyCoder;
|
||||
import google.registry.config.CredentialModule.LocalCredential;
|
||||
import google.registry.config.RegistryConfig.Config;
|
||||
import google.registry.reporting.billing.BillingModule;
|
||||
import google.registry.reporting.billing.GenerateInvoicesAction;
|
||||
import google.registry.util.GoogleCredentialsBundle;
|
||||
import google.registry.util.SqlTemplate;
|
||||
import java.io.Serializable;
|
||||
import javax.inject.Inject;
|
||||
import org.apache.beam.runners.dataflow.DataflowRunner;
|
||||
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.LocalTime;
|
||||
import java.time.YearMonth;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import org.apache.beam.sdk.Pipeline;
|
||||
import org.apache.beam.sdk.PipelineResult;
|
||||
import org.apache.beam.sdk.coders.SerializableCoder;
|
||||
import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
|
||||
import org.apache.beam.sdk.io.FileBasedSink;
|
||||
import org.apache.beam.sdk.coders.StringUtf8Coder;
|
||||
import org.apache.beam.sdk.io.FileIO;
|
||||
import org.apache.beam.sdk.io.TextIO;
|
||||
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
|
||||
import org.apache.beam.sdk.options.Description;
|
||||
import org.apache.beam.sdk.options.PipelineOptionsFactory;
|
||||
import org.apache.beam.sdk.options.ValueProvider;
|
||||
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
|
||||
import org.apache.beam.sdk.transforms.Contextful;
|
||||
import org.apache.beam.sdk.transforms.Count;
|
||||
import org.apache.beam.sdk.transforms.Filter;
|
||||
import org.apache.beam.sdk.transforms.MapElements;
|
||||
|
@ -43,107 +43,54 @@ import org.apache.beam.sdk.transforms.PTransform;
|
|||
import org.apache.beam.sdk.values.KV;
|
||||
import org.apache.beam.sdk.values.PCollection;
|
||||
import org.apache.beam.sdk.values.TypeDescriptor;
|
||||
import org.apache.beam.sdk.values.TypeDescriptors;
|
||||
|
||||
/**
|
||||
* Definition of a Dataflow pipeline template, which generates a given month's invoices.
|
||||
* Definition of a Dataflow Flex pipeline template, which generates a given month's invoices.
|
||||
*
|
||||
* <p>To stage this template on GCS, run the {@link
|
||||
* google.registry.tools.DeployInvoicingPipelineCommand} Nomulus command.
|
||||
* <p>To stage this template locally, run the {@code stage_beam_pipeline.sh} shell script.
|
||||
*
|
||||
* <p>Then, you can run the staged template via the API client library, gCloud or a raw REST call.
|
||||
* For an example using the API client library, see {@link GenerateInvoicesAction}.
|
||||
*
|
||||
* @see <a href="https://cloud.google.com/dataflow/docs/templates/overview">Dataflow Templates</a>
|
||||
* @see <a href="https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates">Using
|
||||
* Flex Templates</a>
|
||||
*/
|
||||
public class InvoicingPipeline implements Serializable {
|
||||
|
||||
private final String projectId;
|
||||
private final String beamJobRegion;
|
||||
private final String beamBucketUrl;
|
||||
private final String invoiceTemplateUrl;
|
||||
private final String beamStagingUrl;
|
||||
private final String billingBucketUrl;
|
||||
private final String invoiceFilePrefix;
|
||||
private final GoogleCredentials googleCredentials;
|
||||
private static final DateTimeFormatter TIMESTAMP_FORMATTER =
|
||||
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
|
||||
|
||||
@Inject
|
||||
public InvoicingPipeline(
|
||||
@Config("projectId") String projectId,
|
||||
@Config("defaultJobRegion") String beamJobRegion,
|
||||
@Config("apacheBeamBucketUrl") String beamBucketUrl,
|
||||
@Config("invoiceTemplateUrl") String invoiceTemplateUrl,
|
||||
@Config("beamStagingUrl") String beamStagingUrl,
|
||||
@Config("billingBucketUrl") String billingBucketUrl,
|
||||
@Config("invoiceFilePrefix") String invoiceFilePrefix,
|
||||
@LocalCredential GoogleCredentialsBundle googleCredentialsBundle) {
|
||||
this.projectId = projectId;
|
||||
this.beamJobRegion = beamJobRegion;
|
||||
this.beamBucketUrl = beamBucketUrl;
|
||||
this.invoiceTemplateUrl = invoiceTemplateUrl;
|
||||
this.beamStagingUrl = beamStagingUrl;
|
||||
this.billingBucketUrl = billingBucketUrl;
|
||||
this.invoiceFilePrefix = invoiceFilePrefix;
|
||||
this.googleCredentials = googleCredentialsBundle.getGoogleCredentials();
|
||||
private final InvoicingPipelineOptions options;
|
||||
private final Pipeline pipeline;
|
||||
|
||||
@VisibleForTesting
|
||||
InvoicingPipeline(InvoicingPipelineOptions options, Pipeline pipeline) {
|
||||
this.options = options;
|
||||
this.pipeline = pipeline;
|
||||
}
|
||||
|
||||
/** Custom options for running the invoicing pipeline. */
|
||||
public interface InvoicingPipelineOptions extends DataflowPipelineOptions {
|
||||
/** Returns the yearMonth we're generating invoices for, in yyyy-MM format. */
|
||||
@Description("The yearMonth we generate invoices for, in yyyy-MM format.")
|
||||
ValueProvider<String> getYearMonth();
|
||||
/**
|
||||
* Sets the yearMonth we generate invoices for.
|
||||
*
|
||||
* <p>This is implicitly set when executing the Dataflow template, by specifying the 'yearMonth
|
||||
* parameter.
|
||||
*/
|
||||
void setYearMonth(ValueProvider<String> value);
|
||||
InvoicingPipeline(InvoicingPipelineOptions options) {
|
||||
this(options, Pipeline.create(options));
|
||||
}
|
||||
|
||||
/** Deploys the invoicing pipeline as a template on GCS, for a given projectID and GCS bucket. */
|
||||
public void deploy() {
|
||||
// We can't store options as a member variable due to serialization concerns.
|
||||
InvoicingPipelineOptions options = PipelineOptionsFactory.as(InvoicingPipelineOptions.class);
|
||||
options.setProject(projectId);
|
||||
options.setRegion(beamJobRegion);
|
||||
options.setRunner(DataflowRunner.class);
|
||||
// This causes p.run() to stage the pipeline as a template on GCS, as opposed to running it.
|
||||
options.setTemplateLocation(invoiceTemplateUrl);
|
||||
options.setStagingLocation(beamStagingUrl);
|
||||
// This credential is used when Dataflow deploys the template to GCS in target GCP project.
|
||||
// So, make sure the credential has write permission to GCS in that project.
|
||||
options.setGcpCredential(googleCredentials);
|
||||
|
||||
Pipeline p = Pipeline.create(options);
|
||||
PipelineResult run() {
|
||||
setupPipeline();
|
||||
return pipeline.run();
|
||||
}
|
||||
|
||||
void setupPipeline() {
|
||||
PCollection<BillingEvent> billingEvents =
|
||||
p.apply(
|
||||
pipeline.apply(
|
||||
"Read BillingEvents from Bigquery",
|
||||
BigQueryIO.read(BillingEvent::parseFromRecord)
|
||||
.fromQuery(InvoicingUtils.makeQueryProvider(options.getYearMonth(), projectId))
|
||||
.fromQuery(makeQuery(options.getYearMonth(), options.getProject()))
|
||||
.withCoder(SerializableCoder.of(BillingEvent.class))
|
||||
.usingStandardSql()
|
||||
.withoutValidation()
|
||||
.withTemplateCompatibility());
|
||||
applyTerminalTransforms(billingEvents, options.getYearMonth());
|
||||
p.run();
|
||||
}
|
||||
|
||||
/**
|
||||
* Applies output transforms to the {@code BillingEvent} source collection.
|
||||
*
|
||||
* <p>This is factored out purely to facilitate testing.
|
||||
*/
|
||||
void applyTerminalTransforms(
|
||||
PCollection<BillingEvent> billingEvents, ValueProvider<String> yearMonthProvider) {
|
||||
billingEvents
|
||||
.apply("Generate overall invoice rows", new GenerateInvoiceRows())
|
||||
.apply("Write overall invoice to CSV", writeInvoice(yearMonthProvider));
|
||||
saveInvoiceCsv(billingEvents, options);
|
||||
|
||||
billingEvents.apply(
|
||||
"Write detail reports to separate CSVs keyed by registrarId_tld pair",
|
||||
writeDetailReports(yearMonthProvider));
|
||||
saveDetailedCsv(billingEvents, options);
|
||||
}
|
||||
|
||||
/** Transform that converts a {@code BillingEvent} into an invoice CSV row. */
|
||||
|
@ -156,49 +103,85 @@ public class InvoicingPipeline implements Serializable {
|
|||
"Map to invoicing key",
|
||||
MapElements.into(TypeDescriptor.of(InvoiceGroupingKey.class))
|
||||
.via(BillingEvent::getInvoiceGroupingKey))
|
||||
.apply(Filter.by((InvoiceGroupingKey key) -> key.unitPrice() != 0))
|
||||
.apply(
|
||||
"Filter out free events", Filter.by((InvoiceGroupingKey key) -> key.unitPrice() != 0))
|
||||
.setCoder(new InvoiceGroupingKeyCoder())
|
||||
.apply("Count occurrences", Count.perElement())
|
||||
.apply(
|
||||
"Format as CSVs",
|
||||
MapElements.into(TypeDescriptors.strings())
|
||||
MapElements.into(strings())
|
||||
.via((KV<InvoiceGroupingKey, Long> kv) -> kv.getKey().toCsv(kv.getValue())));
|
||||
}
|
||||
}
|
||||
|
||||
/** Returns an IO transform that writes the overall invoice to a single CSV file. */
|
||||
private TextIO.Write writeInvoice(ValueProvider<String> yearMonthProvider) {
|
||||
return TextIO.write()
|
||||
.to(
|
||||
NestedValueProvider.of(
|
||||
yearMonthProvider,
|
||||
yearMonth ->
|
||||
/** Saves the billing events to a single overall invoice CSV file. */
|
||||
static void saveInvoiceCsv(
|
||||
PCollection<BillingEvent> billingEvents, InvoicingPipelineOptions options) {
|
||||
billingEvents
|
||||
.apply("Generate overall invoice rows", new GenerateInvoiceRows())
|
||||
.apply(
|
||||
"Write overall invoice to CSV",
|
||||
TextIO.write()
|
||||
.to(
|
||||
String.format(
|
||||
"%s/%s/%s/%s-%s",
|
||||
billingBucketUrl,
|
||||
options.getBillingBucketUrl(),
|
||||
BillingModule.INVOICES_DIRECTORY,
|
||||
yearMonth,
|
||||
invoiceFilePrefix,
|
||||
yearMonth)))
|
||||
.withHeader(InvoiceGroupingKey.invoiceHeader())
|
||||
.withoutSharding()
|
||||
.withSuffix(".csv");
|
||||
options.getYearMonth(),
|
||||
options.getInvoiceFilePrefix(),
|
||||
options.getYearMonth()))
|
||||
.withHeader(InvoiceGroupingKey.invoiceHeader())
|
||||
.withoutSharding()
|
||||
.withSuffix(".csv"));
|
||||
}
|
||||
|
||||
/** Returns an IO transform that writes detail reports to registrar-tld keyed CSV files. */
|
||||
private TextIO.TypedWrite<BillingEvent, Params> writeDetailReports(
|
||||
ValueProvider<String> yearMonthProvider) {
|
||||
return TextIO.<BillingEvent>writeCustomType()
|
||||
.to(
|
||||
InvoicingUtils.makeDestinationFunction(
|
||||
String.format("%s/%s", billingBucketUrl, BillingModule.INVOICES_DIRECTORY),
|
||||
yearMonthProvider),
|
||||
InvoicingUtils.makeEmptyDestinationParams(billingBucketUrl + "/errors"))
|
||||
.withFormatFunction(BillingEvent::toCsv)
|
||||
.withoutSharding()
|
||||
.withTempDirectory(
|
||||
FileBasedSink.convertToFileResourceIfPossible(beamBucketUrl + "/temporary"))
|
||||
.withHeader(BillingEvent.getHeader())
|
||||
.withSuffix(".csv");
|
||||
/** Saves the billing events to detailed report CSV files keyed by registrar-tld pairs. */
|
||||
static void saveDetailedCsv(
|
||||
PCollection<BillingEvent> billingEvents, InvoicingPipelineOptions options) {
|
||||
String yearMonth = options.getYearMonth();
|
||||
billingEvents.apply(
|
||||
"Write detailed report for each registrar-tld pair",
|
||||
FileIO.<String, BillingEvent>writeDynamic()
|
||||
.to(
|
||||
String.format(
|
||||
"%s/%s/%s",
|
||||
options.getBillingBucketUrl(), BillingModule.INVOICES_DIRECTORY, yearMonth))
|
||||
.by(BillingEvent::getDetailedReportGroupingKey)
|
||||
.withNumShards(1)
|
||||
.withDestinationCoder(StringUtf8Coder.of())
|
||||
.withNaming(
|
||||
key ->
|
||||
(window, pane, numShards, shardIndex, compression) ->
|
||||
String.format(
|
||||
"%s_%s_%s.csv", BillingModule.DETAIL_REPORT_PREFIX, yearMonth, key))
|
||||
.via(
|
||||
Contextful.fn(BillingEvent::toCsv),
|
||||
TextIO.sink().withHeader(BillingEvent.getHeader())));
|
||||
}
|
||||
|
||||
/** Create the Bigquery query for a given project and yearMonth at runtime. */
|
||||
static String makeQuery(String yearMonth, String projectId) {
|
||||
// Get the timestamp endpoints capturing the entire month with microsecond precision
|
||||
YearMonth reportingMonth = YearMonth.parse(yearMonth);
|
||||
LocalDateTime firstMoment = reportingMonth.atDay(1).atTime(LocalTime.MIDNIGHT);
|
||||
LocalDateTime lastMoment = reportingMonth.atEndOfMonth().atTime(LocalTime.MAX);
|
||||
// Construct the month's query by filling in the billing_events.sql template
|
||||
return SqlTemplate.create(getQueryFromFile(InvoicingPipeline.class, "billing_events.sql"))
|
||||
.put("FIRST_TIMESTAMP_OF_MONTH", firstMoment.format(TIMESTAMP_FORMATTER))
|
||||
.put("LAST_TIMESTAMP_OF_MONTH", lastMoment.format(TIMESTAMP_FORMATTER))
|
||||
.put("PROJECT_ID", projectId)
|
||||
.put("DATASTORE_EXPORT_DATA_SET", "latest_datastore_export")
|
||||
.put("ONETIME_TABLE", "OneTime")
|
||||
.put("REGISTRY_TABLE", "Registry")
|
||||
.put("REGISTRAR_TABLE", "Registrar")
|
||||
.put("CANCELLATION_TABLE", "Cancellation")
|
||||
.build();
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
PipelineOptionsFactory.register(InvoicingPipelineOptions.class);
|
||||
InvoicingPipelineOptions options =
|
||||
PipelineOptionsFactory.fromArgs(args).withValidation().as(InvoicingPipelineOptions.class);
|
||||
new InvoicingPipeline(options).run();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,37 @@
|
|||
// Copyright 2021 The Nomulus Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package google.registry.beam.invoicing;
|
||||
|
||||
import google.registry.beam.common.RegistryPipelineOptions;
|
||||
import org.apache.beam.sdk.options.Description;
|
||||
|
||||
/** Custom options for running the invoicing pipeline. */
|
||||
public interface InvoicingPipelineOptions extends RegistryPipelineOptions {
|
||||
|
||||
@Description("The year and month we generate invoices for, in yyyy-MM format.")
|
||||
String getYearMonth();
|
||||
|
||||
void setYearMonth(String value);
|
||||
|
||||
@Description("Filename prefix for the invoice CSV file.")
|
||||
String getInvoiceFilePrefix();
|
||||
|
||||
void setInvoiceFilePrefix(String value);
|
||||
|
||||
@Description("The GCS bucket URL for invoices and detailed reports to be uploaded.")
|
||||
String getBillingBucketUrl();
|
||||
|
||||
void setBillingBucketUrl(String value);
|
||||
}
|
|
@ -1,106 +0,0 @@
|
|||
// Copyright 2018 The Nomulus Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package google.registry.beam.invoicing;
|
||||
|
||||
import static google.registry.beam.BeamUtils.getQueryFromFile;
|
||||
|
||||
import google.registry.util.SqlTemplate;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.LocalTime;
|
||||
import java.time.YearMonth;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
|
||||
import org.apache.beam.sdk.io.FileBasedSink;
|
||||
import org.apache.beam.sdk.options.ValueProvider;
|
||||
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
|
||||
import org.apache.beam.sdk.transforms.SerializableFunction;
|
||||
|
||||
/** Pipeline helper functions used to generate invoices from instances of {@link BillingEvent}. */
|
||||
public class InvoicingUtils {
|
||||
|
||||
private InvoicingUtils() {}
|
||||
|
||||
private static final DateTimeFormatter TIMESTAMP_FORMATTER =
|
||||
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
|
||||
|
||||
/**
|
||||
* Returns a function mapping from {@code BillingEvent} to filename {@code Params}.
|
||||
*
|
||||
* <p>Beam uses this to determine which file a given {@code BillingEvent} should get placed into.
|
||||
*
|
||||
* @param outputBucket the GCS bucket we're outputting reports to
|
||||
* @param yearMonthProvider a runtime provider for the yyyy-MM we're generating the invoice for
|
||||
*/
|
||||
static SerializableFunction<BillingEvent, Params> makeDestinationFunction(
|
||||
String outputBucket, ValueProvider<String> yearMonthProvider) {
|
||||
return billingEvent ->
|
||||
new Params()
|
||||
.withShardTemplate("")
|
||||
.withSuffix(".csv")
|
||||
.withBaseFilename(
|
||||
NestedValueProvider.of(
|
||||
yearMonthProvider,
|
||||
yearMonth ->
|
||||
FileBasedSink.convertToFileResourceIfPossible(
|
||||
String.format(
|
||||
"%s/%s/%s",
|
||||
outputBucket, yearMonth, billingEvent.toFilename(yearMonth)))));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the default filename parameters for an unmappable {@code BillingEvent}.
|
||||
*
|
||||
* <p>The "failed" file should only be populated when an error occurs, which warrants further
|
||||
* investigation.
|
||||
*/
|
||||
static Params makeEmptyDestinationParams(String outputBucket) {
|
||||
return new Params()
|
||||
.withBaseFilename(
|
||||
FileBasedSink.convertToFileResourceIfPossible(
|
||||
String.format("%s/%s", outputBucket, "FAILURES")));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a provider that creates a Bigquery query for a given project and yearMonth at runtime.
|
||||
*
|
||||
* <p>We only know yearMonth at runtime, so this provider fills in the {@code
|
||||
* sql/billing_events.sql} template at runtime.
|
||||
*
|
||||
* @param yearMonthProvider a runtime provider that returns which month we're invoicing for.
|
||||
* @param projectId the projectId we're generating invoicing for.
|
||||
*/
|
||||
static ValueProvider<String> makeQueryProvider(
|
||||
ValueProvider<String> yearMonthProvider, String projectId) {
|
||||
return NestedValueProvider.of(
|
||||
yearMonthProvider,
|
||||
(yearMonth) -> {
|
||||
// Get the timestamp endpoints capturing the entire month with microsecond precision
|
||||
YearMonth reportingMonth = YearMonth.parse(yearMonth);
|
||||
LocalDateTime firstMoment = reportingMonth.atDay(1).atTime(LocalTime.MIDNIGHT);
|
||||
LocalDateTime lastMoment = reportingMonth.atEndOfMonth().atTime(LocalTime.MAX);
|
||||
// Construct the month's query by filling in the billing_events.sql template
|
||||
return SqlTemplate.create(getQueryFromFile(InvoicingPipeline.class, "billing_events.sql"))
|
||||
.put("FIRST_TIMESTAMP_OF_MONTH", firstMoment.format(TIMESTAMP_FORMATTER))
|
||||
.put("LAST_TIMESTAMP_OF_MONTH", lastMoment.format(TIMESTAMP_FORMATTER))
|
||||
.put("PROJECT_ID", projectId)
|
||||
.put("DATASTORE_EXPORT_DATA_SET", "latest_datastore_export")
|
||||
.put("ONETIME_TABLE", "OneTime")
|
||||
.put("REGISTRY_TABLE", "Registry")
|
||||
.put("REGISTRAR_TABLE", "Registrar")
|
||||
.put("CANCELLATION_TABLE", "Cancellation")
|
||||
.build();
|
||||
});
|
||||
}
|
||||
}
|
|
@ -384,19 +384,6 @@ public final class RegistryConfig {
|
|||
return Duration.standardHours(1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Number of sharded entity group roots used for performing strongly consistent scans.
|
||||
*
|
||||
* <p><b>Warning:</b> This number may increase but never decrease.
|
||||
*
|
||||
* @see google.registry.model.index.EppResourceIndex
|
||||
*/
|
||||
@Provides
|
||||
@Config("eppResourceIndexBucketCount")
|
||||
public static int provideEppResourceIndexBucketCount(RegistryConfigSettings config) {
|
||||
return config.datastore.eppResourceIndexBucketsNum;
|
||||
}
|
||||
|
||||
@Provides
|
||||
@Config("cloudSqlJdbcUrl")
|
||||
public static String providesCloudSqlJdbcUrl(RegistryConfigSettings config) {
|
||||
|
@ -564,53 +551,6 @@ public final class RegistryConfig {
|
|||
return config.gSuite.outgoingEmailDisplayName;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the name of the GCS bucket for storing Beam templates and results.
|
||||
*
|
||||
* @see google.registry.reporting.billing.GenerateInvoicesAction
|
||||
*/
|
||||
@Provides
|
||||
@Config("apacheBeamBucket")
|
||||
public static String provideApacheBeamBucket(@Config("projectId") String projectId) {
|
||||
return projectId + "-beam";
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the URL of the GCS location for storing Apache Beam related objects.
|
||||
*
|
||||
* @see google.registry.reporting.billing.GenerateInvoicesAction
|
||||
*/
|
||||
@Provides
|
||||
@Config("apacheBeamBucketUrl")
|
||||
public static String provideApacheBeamBucketUrl(@Config("apacheBeamBucket") String beamBucket) {
|
||||
return "gs://" + beamBucket;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the URL of the GCS location for storing the monthly invoicing Beam template.
|
||||
*
|
||||
* @see google.registry.reporting.billing.GenerateInvoicesAction
|
||||
* @see google.registry.beam.invoicing.InvoicingPipeline
|
||||
*/
|
||||
@Provides
|
||||
@Config("invoiceTemplateUrl")
|
||||
public static String provideInvoiceTemplateUrl(
|
||||
@Config("apacheBeamBucketUrl") String beamBucketUrl) {
|
||||
return beamBucketUrl + "/templates/invoicing";
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the URL of the GCS location for storing the monthly spec11 Beam template.
|
||||
*
|
||||
* @see google.registry.beam.spec11.Spec11Pipeline
|
||||
*/
|
||||
@Provides
|
||||
@Config("spec11TemplateUrl")
|
||||
public static String provideSpec11TemplateUrl(
|
||||
@Config("apacheBeamBucketUrl") String beamBucketUrl) {
|
||||
return beamBucketUrl + "/templates/spec11";
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns whether an SSL certificate hash is required to log in via EPP and run flows.
|
||||
*
|
||||
|
@ -634,18 +574,6 @@ public final class RegistryConfig {
|
|||
return config.beam.defaultJobRegion;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the default job zone to run Apache Beam (Cloud Dataflow) jobs in.
|
||||
*
|
||||
* @see google.registry.reporting.billing.GenerateInvoicesAction
|
||||
* @see google.registry.reporting.spec11.GenerateSpec11ReportAction
|
||||
*/
|
||||
@Provides
|
||||
@Config("defaultJobZone")
|
||||
public static String provideDefaultJobZone(RegistryConfigSettings config) {
|
||||
return config.beam.defaultJobZone;
|
||||
}
|
||||
|
||||
/** Returns the GCS bucket URL with all staged BEAM flex templates. */
|
||||
@Provides
|
||||
@Config("beamStagingBucketUrl")
|
||||
|
@ -653,19 +581,6 @@ public final class RegistryConfig {
|
|||
return config.beam.stagingBucketUrl;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the URL of the GCS location we store jar dependencies for beam pipelines.
|
||||
*
|
||||
* @see google.registry.beam.invoicing.InvoicingPipeline
|
||||
* @see google.registry.beam.spec11.Spec11Pipeline
|
||||
*/
|
||||
@Provides
|
||||
@Config("beamStagingUrl")
|
||||
public static String provideInvoiceStagingUrl(
|
||||
@Config("apacheBeamBucketUrl") String beamBucketUrl) {
|
||||
return beamBucketUrl + "/staging";
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the Google Cloud Storage bucket for Spec11 and ICANN transaction and activity reports
|
||||
* to be uploaded.
|
||||
|
@ -1227,14 +1142,6 @@ public final class RegistryConfig {
|
|||
return formatComments(config.registryPolicy.reservedTermsExportDisclaimer);
|
||||
}
|
||||
|
||||
/** Returns the clientId of the registrar used by the {@code CheckApiServlet}. */
|
||||
// TODO(b/80417678): remove this once CheckApiAction no longer uses this id.
|
||||
@Provides
|
||||
@Config("checkApiServletRegistrarClientId")
|
||||
public static String provideCheckApiServletRegistrarClientId(RegistryConfigSettings config) {
|
||||
return config.registryPolicy.checkApiServletClientId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the clientId of the registrar that admins are automatically logged in as if they
|
||||
* aren't otherwise associated with one.
|
||||
|
|
|
@ -133,7 +133,6 @@ public class RegistryConfigSettings {
|
|||
/** Configuration for Apache Beam (Cloud Dataflow). */
|
||||
public static class Beam {
|
||||
public String defaultJobRegion;
|
||||
public String defaultJobZone;
|
||||
public String stagingBucketUrl;
|
||||
}
|
||||
|
||||
|
|
|
@ -420,9 +420,6 @@ misc:
|
|||
beam:
|
||||
# The default region to run Apache Beam (Cloud Dataflow) jobs in.
|
||||
defaultJobRegion: us-east1
|
||||
# The default zone to run Apache Beam (Cloud Dataflow) jobs in.
|
||||
# TODO(weiminyu): consider dropping zone config. No obvious needs for this.
|
||||
defaultJobZone: us-east1-c
|
||||
stagingBucketUrl: gcs-bucket-with-staged-templates
|
||||
|
||||
keyring:
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
|
||||
package google.registry.reporting.billing;
|
||||
|
||||
import static google.registry.beam.BeamUtils.createJobName;
|
||||
import static google.registry.reporting.ReportingUtils.enqueueBeamReportingTask;
|
||||
import static google.registry.reporting.billing.BillingModule.PARAM_SHOULD_PUBLISH;
|
||||
import static google.registry.request.Action.Method.POST;
|
||||
|
@ -21,9 +22,9 @@ import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
|
|||
import static javax.servlet.http.HttpServletResponse.SC_OK;
|
||||
|
||||
import com.google.api.services.dataflow.Dataflow;
|
||||
import com.google.api.services.dataflow.model.LaunchTemplateParameters;
|
||||
import com.google.api.services.dataflow.model.LaunchTemplateResponse;
|
||||
import com.google.api.services.dataflow.model.RuntimeEnvironment;
|
||||
import com.google.api.services.dataflow.model.LaunchFlexTemplateParameter;
|
||||
import com.google.api.services.dataflow.model.LaunchFlexTemplateRequest;
|
||||
import com.google.api.services.dataflow.model.LaunchFlexTemplateResponse;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.flogger.FluentLogger;
|
||||
import com.google.common.net.MediaType;
|
||||
|
@ -33,6 +34,7 @@ import google.registry.request.Action;
|
|||
import google.registry.request.Parameter;
|
||||
import google.registry.request.Response;
|
||||
import google.registry.request.auth.Auth;
|
||||
import google.registry.util.Clock;
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import javax.inject.Inject;
|
||||
|
@ -42,9 +44,8 @@ import org.joda.time.YearMonth;
|
|||
* Invokes the {@code InvoicingPipeline} beam template via the REST api, and enqueues the {@link
|
||||
* PublishInvoicesAction} to publish the subsequent output.
|
||||
*
|
||||
* <p>This action runs the {@link google.registry.beam.invoicing.InvoicingPipeline} beam template,
|
||||
* staged at gs://<projectId>-beam/templates/invoicing. The pipeline then generates invoices
|
||||
* for the month and stores them on GCS.
|
||||
* <p>This action runs the {@link google.registry.beam.invoicing.InvoicingPipeline} beam flex
|
||||
* template. The pipeline then generates invoices for the month and stores them on GCS.
|
||||
*/
|
||||
@Action(
|
||||
service = Action.Service.BACKEND,
|
||||
|
@ -56,57 +57,73 @@ public class GenerateInvoicesAction implements Runnable {
|
|||
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
|
||||
|
||||
static final String PATH = "/_dr/task/generateInvoices";
|
||||
static final String PIPELINE_NAME = "invoicing_pipeline";
|
||||
|
||||
private final String projectId;
|
||||
private final String beamBucketUrl;
|
||||
private final String invoiceTemplateUrl;
|
||||
private final String jobZone;
|
||||
private final String jobRegion;
|
||||
private final String stagingBucketUrl;
|
||||
private final String billingBucketUrl;
|
||||
private final String invoiceFilePrefix;
|
||||
private final boolean shouldPublish;
|
||||
private final YearMonth yearMonth;
|
||||
private final Dataflow dataflow;
|
||||
private final Response response;
|
||||
private final BillingEmailUtils emailUtils;
|
||||
private final Clock clock;
|
||||
private final Response response;
|
||||
private final Dataflow dataflow;
|
||||
|
||||
@Inject
|
||||
GenerateInvoicesAction(
|
||||
@Config("projectId") String projectId,
|
||||
@Config("apacheBeamBucketUrl") String beamBucketUrl,
|
||||
@Config("invoiceTemplateUrl") String invoiceTemplateUrl,
|
||||
@Config("defaultJobZone") String jobZone,
|
||||
@Config("defaultJobRegion") String jobRegion,
|
||||
@Config("beamStagingBucketUrl") String stagingBucketUrl,
|
||||
@Config("billingBucketUrl") String billingBucketUrl,
|
||||
@Config("invoiceFilePrefix") String invoiceFilePrefix,
|
||||
@Parameter(PARAM_SHOULD_PUBLISH) boolean shouldPublish,
|
||||
YearMonth yearMonth,
|
||||
Dataflow dataflow,
|
||||
BillingEmailUtils emailUtils,
|
||||
Clock clock,
|
||||
Response response,
|
||||
BillingEmailUtils emailUtils) {
|
||||
Dataflow dataflow) {
|
||||
this.projectId = projectId;
|
||||
this.beamBucketUrl = beamBucketUrl;
|
||||
this.invoiceTemplateUrl = invoiceTemplateUrl;
|
||||
this.jobZone = jobZone;
|
||||
this.jobRegion = jobRegion;
|
||||
this.stagingBucketUrl = stagingBucketUrl;
|
||||
this.billingBucketUrl = billingBucketUrl;
|
||||
this.invoiceFilePrefix = invoiceFilePrefix;
|
||||
this.shouldPublish = shouldPublish;
|
||||
this.yearMonth = yearMonth;
|
||||
this.dataflow = dataflow;
|
||||
this.response = response;
|
||||
this.emailUtils = emailUtils;
|
||||
this.clock = clock;
|
||||
this.response = response;
|
||||
this.dataflow = dataflow;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
response.setContentType(MediaType.PLAIN_TEXT_UTF_8);
|
||||
logger.atInfo().log("Launching invoicing pipeline for %s", yearMonth);
|
||||
try {
|
||||
LaunchTemplateParameters params =
|
||||
new LaunchTemplateParameters()
|
||||
.setJobName(String.format("invoicing-%s", yearMonth))
|
||||
.setEnvironment(
|
||||
new RuntimeEnvironment()
|
||||
.setZone(jobZone)
|
||||
.setTempLocation(beamBucketUrl + "/temporary"))
|
||||
.setParameters(ImmutableMap.of("yearMonth", yearMonth.toString("yyyy-MM")));
|
||||
LaunchTemplateResponse launchResponse =
|
||||
LaunchFlexTemplateParameter parameter =
|
||||
new LaunchFlexTemplateParameter()
|
||||
.setJobName(createJobName("invoicing", clock))
|
||||
.setContainerSpecGcsPath(
|
||||
String.format("%s/%s_metadata.json", stagingBucketUrl, PIPELINE_NAME))
|
||||
.setParameters(
|
||||
ImmutableMap.of(
|
||||
"yearMonth",
|
||||
yearMonth.toString("yyyy-MM"),
|
||||
"invoiceFilePrefix",
|
||||
invoiceFilePrefix,
|
||||
"billingBucketUrl",
|
||||
billingBucketUrl));
|
||||
LaunchFlexTemplateResponse launchResponse =
|
||||
dataflow
|
||||
.projects()
|
||||
.templates()
|
||||
.launch(projectId, params)
|
||||
.setGcsPath(invoiceTemplateUrl)
|
||||
.locations()
|
||||
.flexTemplates()
|
||||
.launch(
|
||||
projectId,
|
||||
jobRegion,
|
||||
new LaunchFlexTemplateRequest().setLaunchParameter(parameter))
|
||||
.execute();
|
||||
logger.atInfo().log("Got response: %s", launchResponse.getJob().toPrettyString());
|
||||
String jobId = launchResponse.getJob().getId();
|
||||
|
@ -123,12 +140,10 @@ public class GenerateInvoicesAction implements Runnable {
|
|||
logger.atWarning().withCause(e).log("Template Launch failed");
|
||||
emailUtils.sendAlertEmail(String.format("Template Launch failed due to %s", e.getMessage()));
|
||||
response.setStatus(SC_INTERNAL_SERVER_ERROR);
|
||||
response.setContentType(MediaType.PLAIN_TEXT_UTF_8);
|
||||
response.setPayload(String.format("Template launch failed: %s", e.getMessage()));
|
||||
return;
|
||||
}
|
||||
response.setStatus(SC_OK);
|
||||
response.setContentType(MediaType.PLAIN_TEXT_UTF_8);
|
||||
response.setPayload("Launched dataflow template.");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -59,6 +59,7 @@ public class PublishInvoicesAction implements Runnable {
|
|||
private static final String JOB_FAILED = "JOB_STATE_FAILED";
|
||||
|
||||
private final String projectId;
|
||||
private final String jobRegion;
|
||||
private final String jobId;
|
||||
private final BillingEmailUtils emailUtils;
|
||||
private final Dataflow dataflow;
|
||||
|
@ -68,12 +69,14 @@ public class PublishInvoicesAction implements Runnable {
|
|||
@Inject
|
||||
PublishInvoicesAction(
|
||||
@Config("projectId") String projectId,
|
||||
@Config("defaultJobRegion") String jobRegion,
|
||||
@Parameter(ReportingModule.PARAM_JOB_ID) String jobId,
|
||||
BillingEmailUtils emailUtils,
|
||||
Dataflow dataflow,
|
||||
Response response,
|
||||
YearMonth yearMonth) {
|
||||
this.projectId = projectId;
|
||||
this.jobRegion = jobRegion;
|
||||
this.jobId = jobId;
|
||||
this.emailUtils = emailUtils;
|
||||
this.dataflow = dataflow;
|
||||
|
@ -87,7 +90,7 @@ public class PublishInvoicesAction implements Runnable {
|
|||
public void run() {
|
||||
try {
|
||||
logger.atInfo().log("Starting publish job.");
|
||||
Job job = dataflow.projects().jobs().get(projectId, jobId).execute();
|
||||
Job job = dataflow.projects().locations().jobs().get(projectId, jobRegion, jobId).execute();
|
||||
String state = job.getCurrentState();
|
||||
switch (state) {
|
||||
case JOB_DONE:
|
||||
|
|
|
@ -1,31 +0,0 @@
|
|||
// Copyright 2017 The Nomulus Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package google.registry.tools;
|
||||
|
||||
import com.beust.jcommander.Parameters;
|
||||
import google.registry.beam.invoicing.InvoicingPipeline;
|
||||
import javax.inject.Inject;
|
||||
|
||||
/** Nomulus command that deploys the {@link InvoicingPipeline} template. */
|
||||
@Parameters(commandDescription = "Deploy the invoicing pipeline to GCS.")
|
||||
public class DeployInvoicingPipelineCommand implements Command {
|
||||
|
||||
@Inject InvoicingPipeline invoicingPipeline;
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
invoicingPipeline.deploy();
|
||||
}
|
||||
}
|
|
@ -62,7 +62,6 @@ public final class RegistryTool {
|
|||
.put("delete_premium_list", DeletePremiumListCommand.class)
|
||||
.put("delete_reserved_list", DeleteReservedListCommand.class)
|
||||
.put("delete_tld", DeleteTldCommand.class)
|
||||
.put("deploy_invoicing_pipeline", DeployInvoicingPipelineCommand.class)
|
||||
.put("encrypt_escrow_deposit", EncryptEscrowDepositCommand.class)
|
||||
.put("execute_epp", ExecuteEppCommand.class)
|
||||
.put("generate_allocation_tokens", GenerateAllocationTokensCommand.class)
|
||||
|
|
|
@ -105,8 +105,6 @@ interface RegistryToolComponent {
|
|||
|
||||
void inject(DeleteContactByRoidCommand command);
|
||||
|
||||
void inject(DeployInvoicingPipelineCommand command);
|
||||
|
||||
void inject(EncryptEscrowDepositCommand command);
|
||||
|
||||
void inject(GenerateAllocationTokensCommand command);
|
||||
|
|
|
@ -5,7 +5,7 @@
|
|||
{
|
||||
"name": "registryEnvironment",
|
||||
"label": "The Registry environment.",
|
||||
"helpText": "The Registry environment, required if environment-specific initialization is needed on worker VMs.",
|
||||
"helpText": "The Registry environment, required if environment-specific initialization (such as JPA) is needed on worker VMs.",
|
||||
"is_optional": true,
|
||||
"regexes": [
|
||||
"^[0-9A-Z_]+$"
|
||||
|
|
|
@ -0,0 +1,66 @@
|
|||
{
|
||||
"name": "Invoice and Detailed Reports Generation",
|
||||
"description": "An Apache Beam batch pipeline that reads from a Datastore export and generate monthly invoice and detailed reports, saving them on GCS.",
|
||||
"parameters": [
|
||||
{
|
||||
"name": "registryEnvironment",
|
||||
"label": "The Registry environment.",
|
||||
"helpText": "The Registry environment, required if environment-specific initialization (such as JPA) is needed on worker VMs.",
|
||||
"is_optional": true,
|
||||
"regexes": [
|
||||
"^[0-9A-Z_]+$"
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "isolationOverride",
|
||||
"label": "The desired SQL transaction isolation level.",
|
||||
"helpText": "The desired SQL transaction isolation level.",
|
||||
"is_optional": true,
|
||||
"regexes": [
|
||||
"^[0-9A-Z_]+$"
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "sqlWriteBatchSize",
|
||||
"label": "SQL write batch size.",
|
||||
"helpText": "The number of entities to write to the SQL database in one operation.",
|
||||
"is_optional": true,
|
||||
"regexes": [
|
||||
"^[1-9][0-9]*$"
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "sqlWriteShards",
|
||||
"label": "Number of output shards to create when writing to SQL.",
|
||||
"helpText": "Number of shards to create out of the data before writing to the SQL database. Please refer to the Javadoc of RegistryJpaIO.Write.shards() for how to choose this value.",
|
||||
"is_optional": true,
|
||||
"regexes": [
|
||||
"^[1-9][0-9]*$"
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "yearMonth",
|
||||
"label": "The year and month we generate invoice and detailed reports for.",
|
||||
"helpText": "The year and month for which the invoice and detailed reports are generated, in yyyy-MM format.",
|
||||
"regexes": [
|
||||
"^2[0-9]{3}-(0[1-9]|1[0-2])$"
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "invoiceFilePrefix",
|
||||
"label": "Filename prefix for the invoice CSV file.",
|
||||
"helpText": "The prefix that will be applied to the invoice file.",
|
||||
"regexes": [
|
||||
"^[a-zA-Z0-9_\\-]+$"
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "billingBucketUrl",
|
||||
"label": "invoice and detailed reports upload dir.",
|
||||
"helpText": "The root directory of the reports to upload.",
|
||||
"regexes": [
|
||||
"^gs:\\/\\/[^\\n\\r]+$"
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
|
@ -5,7 +5,7 @@
|
|||
{
|
||||
"name": "registryEnvironment",
|
||||
"label": "The Registry environment.",
|
||||
"helpText": "The Registry environment, required if environment-specific initialization is needed on worker VMs.",
|
||||
"helpText": "The Registry environment, required if environment-specific initialization (such as JPA) is needed on worker VMs.",
|
||||
"is_optional": true,
|
||||
"regexes": [
|
||||
"^[0-9A-Z_]+$"
|
||||
|
@ -41,7 +41,7 @@
|
|||
{
|
||||
"name": "date",
|
||||
"label": "The date when the pipeline runs",
|
||||
"helpText": "The date then the threat scan is performed, in yyyy-MM-dd format.",
|
||||
"helpText": "The date when the threat scan is performed, in yyyy-MM-dd format.",
|
||||
"regexes": [
|
||||
"^2[0-9]{3}-(0[1-9]|1[0-2])-(0[1-9]|[12][0-9]|3[01])$"
|
||||
]
|
||||
|
|
|
@ -18,52 +18,20 @@ import static com.google.common.truth.Truth.assertThat;
|
|||
import static javax.servlet.http.HttpServletResponse.SC_FORBIDDEN;
|
||||
import static org.apache.http.HttpStatus.SC_INTERNAL_SERVER_ERROR;
|
||||
import static org.apache.http.HttpStatus.SC_OK;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
import static org.mockito.Mockito.lenient;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyNoInteractions;
|
||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import com.google.api.services.dataflow.Dataflow;
|
||||
import com.google.api.services.dataflow.model.Job;
|
||||
import com.google.api.services.dataflow.model.LaunchFlexTemplateRequest;
|
||||
import com.google.api.services.dataflow.model.LaunchFlexTemplateResponse;
|
||||
import google.registry.beam.BeamActionTestBase;
|
||||
import google.registry.testing.FakeClock;
|
||||
import google.registry.testing.FakeResponse;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
/** Unit tests for {@link WipeoutDatastoreAction}. */
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
class WipeOutDatastoreActionTest {
|
||||
|
||||
@Mock private Dataflow dataflow;
|
||||
@Mock private Dataflow.Projects projects;
|
||||
@Mock private Dataflow.Projects.Locations locations;
|
||||
@Mock private Dataflow.Projects.Locations.FlexTemplates flexTemplates;
|
||||
@Mock private Dataflow.Projects.Locations.FlexTemplates.Launch launch;
|
||||
private LaunchFlexTemplateResponse launchResponse =
|
||||
new LaunchFlexTemplateResponse().setJob(new Job());
|
||||
class WipeOutDatastoreActionTest extends BeamActionTestBase {
|
||||
|
||||
private final FakeClock clock = new FakeClock();
|
||||
private final FakeResponse response = new FakeResponse();
|
||||
|
||||
@BeforeEach
|
||||
void beforeEach() throws Exception {
|
||||
lenient().when(dataflow.projects()).thenReturn(projects);
|
||||
lenient().when(projects.locations()).thenReturn(locations);
|
||||
lenient().when(locations.flexTemplates()).thenReturn(flexTemplates);
|
||||
lenient()
|
||||
.when(flexTemplates.launch(anyString(), anyString(), any(LaunchFlexTemplateRequest.class)))
|
||||
.thenReturn(launch);
|
||||
lenient().when(launch.execute()).thenReturn(launchResponse);
|
||||
}
|
||||
|
||||
@Test
|
||||
void run_projectNotAllowed() {
|
||||
|
|
|
@ -0,0 +1,57 @@
|
|||
// Copyright 2021 The Nomulus Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package google.registry.beam;
|
||||
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import com.google.api.services.dataflow.Dataflow;
|
||||
import com.google.api.services.dataflow.Dataflow.Projects;
|
||||
import com.google.api.services.dataflow.Dataflow.Projects.Locations;
|
||||
import com.google.api.services.dataflow.Dataflow.Projects.Locations.FlexTemplates;
|
||||
import com.google.api.services.dataflow.Dataflow.Projects.Locations.FlexTemplates.Launch;
|
||||
import com.google.api.services.dataflow.model.Job;
|
||||
import com.google.api.services.dataflow.model.LaunchFlexTemplateRequest;
|
||||
import com.google.api.services.dataflow.model.LaunchFlexTemplateResponse;
|
||||
import google.registry.testing.FakeResponse;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.mockito.junit.jupiter.MockitoSettings;
|
||||
import org.mockito.quality.Strictness;
|
||||
|
||||
/** Base class for all actions that launches a Dataflow Flex template. */
|
||||
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
|
||||
public abstract class BeamActionTestBase {
|
||||
|
||||
protected FakeResponse response = new FakeResponse();
|
||||
protected Dataflow dataflow = mock(Dataflow.class);
|
||||
private Projects projects = mock(Projects.class);
|
||||
private Locations locations = mock(Locations.class);
|
||||
private FlexTemplates templates = mock(FlexTemplates.class);
|
||||
protected Launch launch = mock(Launch.class);
|
||||
private LaunchFlexTemplateResponse launchResponse =
|
||||
new LaunchFlexTemplateResponse().setJob(new Job().setId("jobid"));
|
||||
|
||||
@BeforeEach
|
||||
void beforeEach() throws Exception {
|
||||
when(dataflow.projects()).thenReturn(projects);
|
||||
when(projects.locations()).thenReturn(locations);
|
||||
when(locations.flexTemplates()).thenReturn(templates);
|
||||
when(templates.launch(anyString(), anyString(), any(LaunchFlexTemplateRequest.class)))
|
||||
.thenReturn(launch);
|
||||
when(launch.execute()).thenReturn(launchResponse);
|
||||
}
|
||||
}
|
|
@ -16,26 +16,21 @@ package google.registry.beam.invoicing;
|
|||
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
|
||||
import com.google.auth.oauth2.GoogleCredentials;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import google.registry.beam.TestPipelineExtension;
|
||||
import google.registry.util.GoogleCredentialsBundle;
|
||||
import google.registry.testing.TestDataHelper;
|
||||
import google.registry.util.ResourceUtils;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.time.ZoneId;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.Map.Entry;
|
||||
import org.apache.beam.runners.direct.DirectRunner;
|
||||
import org.apache.beam.sdk.options.PipelineOptions;
|
||||
import org.apache.beam.sdk.coders.SerializableCoder;
|
||||
import org.apache.beam.sdk.options.PipelineOptionsFactory;
|
||||
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
|
||||
import org.apache.beam.sdk.transforms.Create;
|
||||
import org.apache.beam.sdk.values.PCollection;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
|
@ -44,160 +39,168 @@ import org.junit.jupiter.api.io.TempDir;
|
|||
/** Unit tests for {@link InvoicingPipeline}. */
|
||||
class InvoicingPipelineTest {
|
||||
|
||||
private static PipelineOptions pipelineOptions;
|
||||
private static final String BILLING_BUCKET_URL = "billing_bucket";
|
||||
private static final String YEAR_MONTH = "2017-10";
|
||||
private static final String INVOICE_FILE_PREFIX = "REG-INV";
|
||||
|
||||
@BeforeAll
|
||||
static void beforeAll() {
|
||||
pipelineOptions = PipelineOptionsFactory.create();
|
||||
pipelineOptions.setRunner(DirectRunner.class);
|
||||
}
|
||||
private static final ImmutableList<BillingEvent> INPUT_EVENTS =
|
||||
ImmutableList.of(
|
||||
BillingEvent.create(
|
||||
1,
|
||||
ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
|
||||
ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
|
||||
"theRegistrar",
|
||||
"234",
|
||||
"",
|
||||
"test",
|
||||
"RENEW",
|
||||
"mydomain.test",
|
||||
"REPO-ID",
|
||||
3,
|
||||
"USD",
|
||||
20.5,
|
||||
""),
|
||||
BillingEvent.create(
|
||||
1,
|
||||
ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
|
||||
ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
|
||||
"theRegistrar",
|
||||
"234",
|
||||
"",
|
||||
"test",
|
||||
"RENEW",
|
||||
"mydomain2.test",
|
||||
"REPO-ID",
|
||||
3,
|
||||
"USD",
|
||||
20.5,
|
||||
""),
|
||||
BillingEvent.create(
|
||||
1,
|
||||
ZonedDateTime.of(2017, 10, 2, 0, 0, 0, 0, ZoneId.of("UTC")),
|
||||
ZonedDateTime.of(2017, 9, 29, 0, 0, 0, 0, ZoneId.of("UTC")),
|
||||
"theRegistrar",
|
||||
"234",
|
||||
"",
|
||||
"hello",
|
||||
"CREATE",
|
||||
"mydomain3.hello",
|
||||
"REPO-ID",
|
||||
5,
|
||||
"JPY",
|
||||
70.75,
|
||||
""),
|
||||
BillingEvent.create(
|
||||
1,
|
||||
ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
|
||||
ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
|
||||
"bestdomains",
|
||||
"456",
|
||||
"116688",
|
||||
"test",
|
||||
"RENEW",
|
||||
"mydomain4.test",
|
||||
"REPO-ID",
|
||||
1,
|
||||
"USD",
|
||||
20.5,
|
||||
""),
|
||||
BillingEvent.create(
|
||||
1,
|
||||
ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
|
||||
ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
|
||||
"anotherRegistrar",
|
||||
"789",
|
||||
"",
|
||||
"test",
|
||||
"CREATE",
|
||||
"mydomain5.test",
|
||||
"REPO-ID",
|
||||
1,
|
||||
"USD",
|
||||
0,
|
||||
"SUNRISE ANCHOR_TENANT"));
|
||||
|
||||
private static final ImmutableMap<String, ImmutableList<String>> EXPECTED_DETAILED_REPORT_MAP =
|
||||
ImmutableMap.of(
|
||||
"invoice_details_2017-10_theRegistrar_test.csv",
|
||||
ImmutableList.of(
|
||||
"1,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,theRegistrar,234,,"
|
||||
+ "test,RENEW,mydomain2.test,REPO-ID,3,USD,20.50,",
|
||||
"1,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,theRegistrar,234,,"
|
||||
+ "test,RENEW,mydomain.test,REPO-ID,3,USD,20.50,"),
|
||||
"invoice_details_2017-10_theRegistrar_hello.csv",
|
||||
ImmutableList.of(
|
||||
"1,2017-10-02 00:00:00 UTC,2017-09-29 00:00:00 UTC,theRegistrar,234,,"
|
||||
+ "hello,CREATE,mydomain3.hello,REPO-ID,5,JPY,70.75,"),
|
||||
"invoice_details_2017-10_bestdomains_test.csv",
|
||||
ImmutableList.of(
|
||||
"1,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,bestdomains,456,116688,"
|
||||
+ "test,RENEW,mydomain4.test,REPO-ID,1,USD,20.50,"),
|
||||
"invoice_details_2017-10_anotherRegistrar_test.csv",
|
||||
ImmutableList.of(
|
||||
"1,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,anotherRegistrar,789,,"
|
||||
+ "test,CREATE,mydomain5.test,REPO-ID,1,USD,0.00,SUNRISE ANCHOR_TENANT"));
|
||||
|
||||
private static final ImmutableList<String> EXPECTED_INVOICE_OUTPUT =
|
||||
ImmutableList.of(
|
||||
"2017-10-01,2020-09-30,234,41.00,USD,10125,1,PURCHASE,theRegistrar - test,2,"
|
||||
+ "RENEW | TLD: test | TERM: 3-year,20.50,USD,",
|
||||
"2017-10-01,2022-09-30,234,70.75,JPY,10125,1,PURCHASE,theRegistrar - hello,1,"
|
||||
+ "CREATE | TLD: hello | TERM: 5-year,70.75,JPY,",
|
||||
"2017-10-01,2018-09-30,456,20.50,USD,10125,1,PURCHASE,bestdomains - test,1,"
|
||||
+ "RENEW | TLD: test | TERM: 1-year,20.50,USD,116688");
|
||||
|
||||
@RegisterExtension
|
||||
final transient TestPipelineExtension testPipeline =
|
||||
TestPipelineExtension.fromOptions(pipelineOptions);
|
||||
final TestPipelineExtension pipeline =
|
||||
TestPipelineExtension.create().enableAbandonedNodeEnforcement(true);
|
||||
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
@TempDir
|
||||
transient Path tmpDir;
|
||||
@TempDir Path tmpDir;
|
||||
|
||||
private InvoicingPipeline invoicingPipeline;
|
||||
private final InvoicingPipelineOptions options =
|
||||
PipelineOptionsFactory.create().as(InvoicingPipelineOptions.class);
|
||||
|
||||
private File billingBucketUrl;
|
||||
private PCollection<BillingEvent> billingEvents;
|
||||
|
||||
@BeforeEach
|
||||
void beforeEach() throws IOException {
|
||||
String beamTempFolder =
|
||||
Files.createDirectory(tmpDir.resolve("beam_temp")).toAbsolutePath().toString();
|
||||
invoicingPipeline =
|
||||
new InvoicingPipeline(
|
||||
"test-project",
|
||||
"region",
|
||||
beamTempFolder,
|
||||
beamTempFolder + "/templates/invoicing",
|
||||
beamTempFolder + "/staging",
|
||||
tmpDir.toAbsolutePath().toString(),
|
||||
"REG-INV",
|
||||
GoogleCredentialsBundle.create(GoogleCredentials.create(null)));
|
||||
}
|
||||
|
||||
private ImmutableList<BillingEvent> getInputEvents() {
|
||||
return ImmutableList.of(
|
||||
BillingEvent.create(
|
||||
1,
|
||||
ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
|
||||
ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
|
||||
"theRegistrar",
|
||||
"234",
|
||||
"",
|
||||
"test",
|
||||
"RENEW",
|
||||
"mydomain.test",
|
||||
"REPO-ID",
|
||||
3,
|
||||
"USD",
|
||||
20.5,
|
||||
""),
|
||||
BillingEvent.create(
|
||||
1,
|
||||
ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
|
||||
ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
|
||||
"theRegistrar",
|
||||
"234",
|
||||
"",
|
||||
"test",
|
||||
"RENEW",
|
||||
"mydomain2.test",
|
||||
"REPO-ID",
|
||||
3,
|
||||
"USD",
|
||||
20.5,
|
||||
""),
|
||||
BillingEvent.create(
|
||||
1,
|
||||
ZonedDateTime.of(2017, 10, 2, 0, 0, 0, 0, ZoneId.of("UTC")),
|
||||
ZonedDateTime.of(2017, 9, 29, 0, 0, 0, 0, ZoneId.of("UTC")),
|
||||
"theRegistrar",
|
||||
"234",
|
||||
"",
|
||||
"hello",
|
||||
"CREATE",
|
||||
"mydomain3.hello",
|
||||
"REPO-ID",
|
||||
5,
|
||||
"JPY",
|
||||
70.75,
|
||||
""),
|
||||
BillingEvent.create(
|
||||
1,
|
||||
ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
|
||||
ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
|
||||
"bestdomains",
|
||||
"456",
|
||||
"116688",
|
||||
"test",
|
||||
"RENEW",
|
||||
"mydomain4.test",
|
||||
"REPO-ID",
|
||||
1,
|
||||
"USD",
|
||||
20.5,
|
||||
""),
|
||||
BillingEvent.create(
|
||||
1,
|
||||
ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
|
||||
ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
|
||||
"anotherRegistrar",
|
||||
"789",
|
||||
"",
|
||||
"test",
|
||||
"CREATE",
|
||||
"mydomain5.test",
|
||||
"REPO-ID",
|
||||
1,
|
||||
"USD",
|
||||
0,
|
||||
"SUNRISE ANCHOR_TENANT"));
|
||||
}
|
||||
|
||||
/** Returns a map from filename to expected contents for detail reports. */
|
||||
private ImmutableMap<String, ImmutableList<String>> getExpectedDetailReportMap() {
|
||||
return ImmutableMap.of(
|
||||
"invoice_details_2017-10_theRegistrar_test.csv",
|
||||
ImmutableList.of(
|
||||
"1,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,theRegistrar,234,,"
|
||||
+ "test,RENEW,mydomain2.test,REPO-ID,3,USD,20.50,",
|
||||
"1,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,theRegistrar,234,,"
|
||||
+ "test,RENEW,mydomain.test,REPO-ID,3,USD,20.50,"),
|
||||
"invoice_details_2017-10_theRegistrar_hello.csv",
|
||||
ImmutableList.of(
|
||||
"1,2017-10-02 00:00:00 UTC,2017-09-29 00:00:00 UTC,theRegistrar,234,,"
|
||||
+ "hello,CREATE,mydomain3.hello,REPO-ID,5,JPY,70.75,"),
|
||||
"invoice_details_2017-10_bestdomains_test.csv",
|
||||
ImmutableList.of(
|
||||
"1,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,bestdomains,456,116688,"
|
||||
+ "test,RENEW,mydomain4.test,REPO-ID,1,USD,20.50,"),
|
||||
"invoice_details_2017-10_anotherRegistrar_test.csv",
|
||||
ImmutableList.of(
|
||||
"1,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,anotherRegistrar,789,,"
|
||||
+ "test,CREATE,mydomain5.test,REPO-ID,1,USD,0.00,SUNRISE ANCHOR_TENANT"));
|
||||
}
|
||||
|
||||
private ImmutableList<String> getExpectedInvoiceOutput() {
|
||||
return ImmutableList.of(
|
||||
"2017-10-01,2020-09-30,234,41.00,USD,10125,1,PURCHASE,theRegistrar - test,2,"
|
||||
+ "RENEW | TLD: test | TERM: 3-year,20.50,USD,",
|
||||
"2017-10-01,2022-09-30,234,70.75,JPY,10125,1,PURCHASE,theRegistrar - hello,1,"
|
||||
+ "CREATE | TLD: hello | TERM: 5-year,70.75,JPY,",
|
||||
"2017-10-01,2018-09-30,456,20.50,USD,10125,1,PURCHASE,bestdomains - test,1,"
|
||||
+ "RENEW | TLD: test | TERM: 1-year,20.50,USD,116688");
|
||||
void beforeEach() throws Exception {
|
||||
billingBucketUrl = Files.createDirectory(tmpDir.resolve(BILLING_BUCKET_URL)).toFile();
|
||||
options.setBillingBucketUrl(billingBucketUrl.getAbsolutePath());
|
||||
options.setYearMonth(YEAR_MONTH);
|
||||
options.setInvoiceFilePrefix(INVOICE_FILE_PREFIX);
|
||||
billingEvents =
|
||||
pipeline.apply(Create.of(INPUT_EVENTS).withCoder(SerializableCoder.of(BillingEvent.class)));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testEndToEndPipeline_generatesExpectedFiles() throws Exception {
|
||||
ImmutableList<BillingEvent> inputRows = getInputEvents();
|
||||
PCollection<BillingEvent> input = testPipeline.apply(Create.of(inputRows));
|
||||
invoicingPipeline.applyTerminalTransforms(input, StaticValueProvider.of("2017-10"));
|
||||
testPipeline.run();
|
||||
void testSuccess_makeQuery() {
|
||||
String query = InvoicingPipeline.makeQuery("2017-10", "my-project-id");
|
||||
assertThat(query)
|
||||
.isEqualTo(TestDataHelper.loadFile(this.getClass(), "billing_events_test.sql"));
|
||||
// This is necessary because the TestPipelineExtension verifies that the pipelien is run.
|
||||
pipeline.run();
|
||||
}
|
||||
|
||||
for (Entry<String, ImmutableList<String>> entry : getExpectedDetailReportMap().entrySet()) {
|
||||
@Test
|
||||
void testSuccess_saveInvoiceCsv() throws Exception {
|
||||
InvoicingPipeline.saveInvoiceCsv(billingEvents, options);
|
||||
pipeline.run().waitUntilFinish();
|
||||
ImmutableList<String> overallInvoice = resultFileContents("REG-INV-2017-10.csv");
|
||||
assertThat(overallInvoice.get(0))
|
||||
.isEqualTo(
|
||||
"StartDate,EndDate,ProductAccountKey,Amount,AmountCurrency,BillingProductCode,"
|
||||
+ "SalesChannel,LineItemType,UsageGroupingKey,Quantity,Description,UnitPrice,"
|
||||
+ "UnitPriceCurrency,PONumber");
|
||||
assertThat(overallInvoice.subList(1, overallInvoice.size()))
|
||||
.containsExactlyElementsIn(EXPECTED_INVOICE_OUTPUT);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testSuccess_saveDetailedCsv() throws Exception {
|
||||
InvoicingPipeline.saveDetailedCsv(billingEvents, options);
|
||||
pipeline.run().waitUntilFinish();
|
||||
for (Entry<String, ImmutableList<String>> entry : EXPECTED_DETAILED_REPORT_MAP.entrySet()) {
|
||||
ImmutableList<String> detailReport = resultFileContents(entry.getKey());
|
||||
assertThat(detailReport.get(0))
|
||||
.isEqualTo(
|
||||
|
@ -206,22 +209,14 @@ class InvoicingPipelineTest {
|
|||
assertThat(detailReport.subList(1, detailReport.size()))
|
||||
.containsExactlyElementsIn(entry.getValue());
|
||||
}
|
||||
|
||||
ImmutableList<String> overallInvoice = resultFileContents("REG-INV-2017-10.csv");
|
||||
assertThat(overallInvoice.get(0))
|
||||
.isEqualTo(
|
||||
"StartDate,EndDate,ProductAccountKey,Amount,AmountCurrency,BillingProductCode,"
|
||||
+ "SalesChannel,LineItemType,UsageGroupingKey,Quantity,Description,UnitPrice,"
|
||||
+ "UnitPriceCurrency,PONumber");
|
||||
assertThat(overallInvoice.subList(1, overallInvoice.size()))
|
||||
.containsExactlyElementsIn(getExpectedInvoiceOutput());
|
||||
}
|
||||
|
||||
/** Returns the text contents of a file under the beamBucket/results directory. */
|
||||
private ImmutableList<String> resultFileContents(String filename) throws Exception {
|
||||
File resultFile =
|
||||
new File(
|
||||
String.format("%s/invoices/2017-10/%s", tmpDir.toAbsolutePath().toString(), filename));
|
||||
String.format(
|
||||
"%s/invoices/2017-10/%s", billingBucketUrl.getAbsolutePath().toString(), filename));
|
||||
return ImmutableList.copyOf(
|
||||
ResourceUtils.readResourceUtf8(resultFile.toURI().toURL()).split("\n"));
|
||||
}
|
||||
|
|
|
@ -1,73 +0,0 @@
|
|||
// Copyright 2018 The Nomulus Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package google.registry.beam.invoicing;
|
||||
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import google.registry.testing.TestDataHelper;
|
||||
import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
|
||||
import org.apache.beam.sdk.io.FileBasedSink;
|
||||
import org.apache.beam.sdk.options.ValueProvider;
|
||||
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
|
||||
import org.apache.beam.sdk.transforms.SerializableFunction;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
/** Unit tests for {@link InvoicingUtils}. */
|
||||
class InvoicingUtilsTest {
|
||||
|
||||
@Test
|
||||
void testDestinationFunction_generatesProperFileParams() {
|
||||
SerializableFunction<BillingEvent, Params> destinationFunction =
|
||||
InvoicingUtils.makeDestinationFunction("my/directory", StaticValueProvider.of("2017-10"));
|
||||
|
||||
BillingEvent billingEvent = mock(BillingEvent.class);
|
||||
// We mock BillingEvent to make the test independent of the implementation of toFilename()
|
||||
when(billingEvent.toFilename(any())).thenReturn("invoice_details_2017-10_registrar_tld");
|
||||
|
||||
assertThat(destinationFunction.apply(billingEvent))
|
||||
.isEqualTo(
|
||||
new Params()
|
||||
.withShardTemplate("")
|
||||
.withSuffix(".csv")
|
||||
.withBaseFilename(
|
||||
FileBasedSink.convertToFileResourceIfPossible(
|
||||
"my/directory/2017-10/invoice_details_2017-10_registrar_tld")));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testEmptyDestinationParams() {
|
||||
assertThat(InvoicingUtils.makeEmptyDestinationParams("my/directory"))
|
||||
.isEqualTo(
|
||||
new Params()
|
||||
.withBaseFilename(
|
||||
FileBasedSink.convertToFileResourceIfPossible("my/directory/FAILURES")));
|
||||
}
|
||||
|
||||
/** Asserts that the instantiated sql template matches a golden expected file. */
|
||||
@Test
|
||||
void testMakeQueryProvider() {
|
||||
ValueProvider<String> queryProvider =
|
||||
InvoicingUtils.makeQueryProvider(StaticValueProvider.of("2017-10"), "my-project-id");
|
||||
assertThat(queryProvider.get()).isEqualTo(loadFile("billing_events_test.sql"));
|
||||
}
|
||||
|
||||
/** Returns a {@link String} from a file in the {@code billing/testdata/} directory. */
|
||||
private static String loadFile(String filename) {
|
||||
return TestDataHelper.loadFile(InvoicingUtilsTest.class, filename);
|
||||
}
|
||||
}
|
|
@ -17,94 +17,57 @@ package google.registry.reporting.billing;
|
|||
import static com.google.common.truth.Truth.assertThat;
|
||||
import static google.registry.testing.TaskQueueHelper.assertNoTasksEnqueued;
|
||||
import static google.registry.testing.TaskQueueHelper.assertTasksEnqueued;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
|
||||
import static javax.servlet.http.HttpServletResponse.SC_OK;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import com.google.api.services.dataflow.Dataflow;
|
||||
import com.google.api.services.dataflow.Dataflow.Projects;
|
||||
import com.google.api.services.dataflow.Dataflow.Projects.Templates;
|
||||
import com.google.api.services.dataflow.Dataflow.Projects.Templates.Launch;
|
||||
import com.google.api.services.dataflow.model.Job;
|
||||
import com.google.api.services.dataflow.model.LaunchTemplateParameters;
|
||||
import com.google.api.services.dataflow.model.LaunchTemplateResponse;
|
||||
import com.google.api.services.dataflow.model.RuntimeEnvironment;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.net.MediaType;
|
||||
import google.registry.beam.BeamActionTestBase;
|
||||
import google.registry.testing.AppEngineExtension;
|
||||
import google.registry.testing.FakeResponse;
|
||||
import google.registry.testing.FakeClock;
|
||||
import google.registry.testing.TaskQueueHelper.TaskMatcher;
|
||||
import java.io.IOException;
|
||||
import org.joda.time.YearMonth;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
|
||||
/** Unit tests for {@link google.registry.reporting.billing.GenerateInvoicesAction}. */
|
||||
class GenerateInvoicesActionTest {
|
||||
class GenerateInvoicesActionTest extends BeamActionTestBase {
|
||||
|
||||
@RegisterExtension
|
||||
final AppEngineExtension appEngine = AppEngineExtension.builder().withTaskQueue().build();
|
||||
|
||||
private Dataflow dataflow;
|
||||
private Projects projects;
|
||||
private Templates templates;
|
||||
private Launch launch;
|
||||
private FakeResponse response;
|
||||
private BillingEmailUtils emailUtils;
|
||||
private final BillingEmailUtils emailUtils = mock(BillingEmailUtils.class);
|
||||
private FakeClock clock = new FakeClock();
|
||||
private GenerateInvoicesAction action;
|
||||
|
||||
@BeforeEach
|
||||
void beforeEach() throws IOException {
|
||||
dataflow = mock(Dataflow.class);
|
||||
projects = mock(Projects.class);
|
||||
templates = mock(Templates.class);
|
||||
launch = mock(Launch.class);
|
||||
emailUtils = mock(BillingEmailUtils.class);
|
||||
when(dataflow.projects()).thenReturn(projects);
|
||||
when(projects.templates()).thenReturn(templates);
|
||||
when(templates.launch(any(String.class), any(LaunchTemplateParameters.class)))
|
||||
.thenReturn(launch);
|
||||
when(launch.setGcsPath(any(String.class))).thenReturn(launch);
|
||||
|
||||
response = new FakeResponse();
|
||||
Job job = new Job();
|
||||
job.setId("12345");
|
||||
when(launch.execute()).thenReturn(new LaunchTemplateResponse().setJob(job));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testLaunchTemplateJob_withPublish() throws Exception {
|
||||
action =
|
||||
new GenerateInvoicesAction(
|
||||
"test-project",
|
||||
"gs://test-project-beam",
|
||||
"gs://test-project-beam/templates/invoicing",
|
||||
"us-east1-c",
|
||||
"test-region",
|
||||
"staging_bucket",
|
||||
"billing_bucket",
|
||||
"REG-INV",
|
||||
true,
|
||||
new YearMonth(2017, 10),
|
||||
dataflow,
|
||||
emailUtils,
|
||||
clock,
|
||||
response,
|
||||
emailUtils);
|
||||
dataflow);
|
||||
action.run();
|
||||
LaunchTemplateParameters expectedParams =
|
||||
new LaunchTemplateParameters()
|
||||
.setJobName("invoicing-2017-10")
|
||||
.setEnvironment(
|
||||
new RuntimeEnvironment()
|
||||
.setZone("us-east1-c")
|
||||
.setTempLocation("gs://test-project-beam/temporary"))
|
||||
.setParameters(ImmutableMap.of("yearMonth", "2017-10"));
|
||||
verify(templates).launch("test-project", expectedParams);
|
||||
verify(launch).setGcsPath("gs://test-project-beam/templates/invoicing");
|
||||
assertThat(response.getStatus()).isEqualTo(200);
|
||||
assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8);
|
||||
assertThat(response.getStatus()).isEqualTo(SC_OK);
|
||||
assertThat(response.getPayload()).isEqualTo("Launched dataflow template.");
|
||||
|
||||
TaskMatcher matcher =
|
||||
new TaskMatcher()
|
||||
.url("/_dr/task/publishInvoices")
|
||||
.method("POST")
|
||||
.param("jobId", "12345")
|
||||
.param("jobId", "jobid")
|
||||
.param("yearMonth", "2017-10");
|
||||
assertTasksEnqueued("beam-reporting", matcher);
|
||||
}
|
||||
|
@ -114,47 +77,43 @@ class GenerateInvoicesActionTest {
|
|||
action =
|
||||
new GenerateInvoicesAction(
|
||||
"test-project",
|
||||
"gs://test-project-beam",
|
||||
"gs://test-project-beam/templates/invoicing",
|
||||
"us-east1-c",
|
||||
"test-region",
|
||||
"staging_bucket",
|
||||
"billing_bucket",
|
||||
"REG-INV",
|
||||
false,
|
||||
new YearMonth(2017, 10),
|
||||
dataflow,
|
||||
emailUtils,
|
||||
clock,
|
||||
response,
|
||||
emailUtils);
|
||||
dataflow);
|
||||
action.run();
|
||||
LaunchTemplateParameters expectedParams =
|
||||
new LaunchTemplateParameters()
|
||||
.setJobName("invoicing-2017-10")
|
||||
.setEnvironment(
|
||||
new RuntimeEnvironment()
|
||||
.setZone("us-east1-c")
|
||||
.setTempLocation("gs://test-project-beam/temporary"))
|
||||
.setParameters(ImmutableMap.of("yearMonth", "2017-10"));
|
||||
verify(templates).launch("test-project", expectedParams);
|
||||
verify(launch).setGcsPath("gs://test-project-beam/templates/invoicing");
|
||||
assertThat(response.getStatus()).isEqualTo(200);
|
||||
assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8);
|
||||
assertThat(response.getStatus()).isEqualTo(SC_OK);
|
||||
assertThat(response.getPayload()).isEqualTo("Launched dataflow template.");
|
||||
assertNoTasksEnqueued();
|
||||
assertNoTasksEnqueued("beam-reporting");
|
||||
}
|
||||
|
||||
@Test
|
||||
void testCaughtIOException() throws IOException {
|
||||
when(launch.execute()).thenThrow(new IOException("expected"));
|
||||
when(launch.execute()).thenThrow(new IOException("Pipeline error"));
|
||||
action =
|
||||
new GenerateInvoicesAction(
|
||||
"test-project",
|
||||
"gs://test-project-beam",
|
||||
"gs://test-project-beam/templates/invoicing",
|
||||
"us-east1-c",
|
||||
true,
|
||||
"test-region",
|
||||
"staging_bucket",
|
||||
"billing_bucket",
|
||||
"REG-INV",
|
||||
false,
|
||||
new YearMonth(2017, 10),
|
||||
dataflow,
|
||||
emailUtils,
|
||||
clock,
|
||||
response,
|
||||
emailUtils);
|
||||
dataflow);
|
||||
action.run();
|
||||
assertThat(response.getStatus()).isEqualTo(500);
|
||||
assertThat(response.getPayload()).isEqualTo("Template launch failed: expected");
|
||||
verify(emailUtils).sendAlertEmail("Template Launch failed due to expected");
|
||||
assertThat(response.getStatus()).isEqualTo(SC_INTERNAL_SERVER_ERROR);
|
||||
assertThat(response.getPayload()).isEqualTo("Template launch failed: Pipeline error");
|
||||
verify(emailUtils).sendAlertEmail("Template Launch failed due to Pipeline error");
|
||||
assertNoTasksEnqueued("beam-reporting");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,8 +26,9 @@ import static org.mockito.Mockito.when;
|
|||
|
||||
import com.google.api.services.dataflow.Dataflow;
|
||||
import com.google.api.services.dataflow.Dataflow.Projects;
|
||||
import com.google.api.services.dataflow.Dataflow.Projects.Jobs;
|
||||
import com.google.api.services.dataflow.Dataflow.Projects.Jobs.Get;
|
||||
import com.google.api.services.dataflow.Dataflow.Projects.Locations;
|
||||
import com.google.api.services.dataflow.Dataflow.Projects.Locations.Jobs;
|
||||
import com.google.api.services.dataflow.Dataflow.Projects.Locations.Jobs.Get;
|
||||
import com.google.api.services.dataflow.model.Job;
|
||||
import com.google.common.net.MediaType;
|
||||
import google.registry.testing.AppEngineExtension;
|
||||
|
@ -42,11 +43,14 @@ import org.junit.jupiter.api.extension.RegisterExtension;
|
|||
/** Unit tests for {@link PublishInvoicesAction}. */
|
||||
class PublishInvoicesActionTest {
|
||||
|
||||
private Get get;
|
||||
private BillingEmailUtils emailUtils;
|
||||
|
||||
private Job expectedJob;
|
||||
private FakeResponse response;
|
||||
private final Dataflow dataflow = mock(Dataflow.class);
|
||||
private final Projects projects = mock(Projects.class);
|
||||
private final Locations locations = mock(Locations.class);
|
||||
private final Jobs jobs = mock(Jobs.class);
|
||||
private final Get get = mock(Get.class);
|
||||
private final Job expectedJob = new Job();
|
||||
private final BillingEmailUtils emailUtils = mock(BillingEmailUtils.class);
|
||||
private final FakeResponse response = new FakeResponse();
|
||||
private PublishInvoicesAction uploadAction;
|
||||
|
||||
@RegisterExtension
|
||||
|
@ -54,20 +58,21 @@ class PublishInvoicesActionTest {
|
|||
|
||||
@BeforeEach
|
||||
void beforeEach() throws IOException {
|
||||
Dataflow dataflow = mock(Dataflow.class);
|
||||
Projects projects = mock(Projects.class);
|
||||
Jobs jobs = mock(Jobs.class);
|
||||
get = mock(Get.class);
|
||||
when(dataflow.projects()).thenReturn(projects);
|
||||
when(projects.jobs()).thenReturn(jobs);
|
||||
when(jobs.get("test-project", "12345")).thenReturn(get);
|
||||
expectedJob = new Job();
|
||||
when(projects.locations()).thenReturn(locations);
|
||||
when(locations.jobs()).thenReturn(jobs);
|
||||
when(jobs.get("test-project", "test-region", "12345")).thenReturn(get);
|
||||
when(get.execute()).thenReturn(expectedJob);
|
||||
emailUtils = mock(BillingEmailUtils.class);
|
||||
response = new FakeResponse();
|
||||
|
||||
uploadAction =
|
||||
new PublishInvoicesAction(
|
||||
"test-project", "12345", emailUtils, dataflow, response, new YearMonth(2017, 10));
|
||||
"test-project",
|
||||
"test-region",
|
||||
"12345",
|
||||
emailUtils,
|
||||
dataflow,
|
||||
response,
|
||||
new YearMonth(2017, 10));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -19,61 +19,27 @@ import static google.registry.testing.TaskQueueHelper.assertNoTasksEnqueued;
|
|||
import static google.registry.testing.TaskQueueHelper.assertTasksEnqueued;
|
||||
import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
|
||||
import static org.apache.http.HttpStatus.SC_OK;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import com.google.api.services.dataflow.Dataflow;
|
||||
import com.google.api.services.dataflow.Dataflow.Projects;
|
||||
import com.google.api.services.dataflow.Dataflow.Projects.Locations;
|
||||
import com.google.api.services.dataflow.Dataflow.Projects.Locations.FlexTemplates;
|
||||
import com.google.api.services.dataflow.Dataflow.Projects.Locations.FlexTemplates.Launch;
|
||||
import com.google.api.services.dataflow.model.Job;
|
||||
import com.google.api.services.dataflow.model.LaunchFlexTemplateRequest;
|
||||
import com.google.api.services.dataflow.model.LaunchFlexTemplateResponse;
|
||||
import com.google.common.net.MediaType;
|
||||
import google.registry.beam.BeamActionTestBase;
|
||||
import google.registry.testing.AppEngineExtension;
|
||||
import google.registry.testing.FakeClock;
|
||||
import google.registry.testing.FakeResponse;
|
||||
import google.registry.testing.TaskQueueHelper.TaskMatcher;
|
||||
import java.io.IOException;
|
||||
import org.joda.time.DateTime;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
import org.mockito.junit.jupiter.MockitoSettings;
|
||||
import org.mockito.quality.Strictness;
|
||||
|
||||
/** Unit tests for {@link GenerateSpec11ReportAction}. */
|
||||
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
|
||||
class GenerateSpec11ReportActionTest {
|
||||
class GenerateSpec11ReportActionTest extends BeamActionTestBase {
|
||||
|
||||
@RegisterExtension
|
||||
final AppEngineExtension appEngine = AppEngineExtension.builder().withTaskQueue().build();
|
||||
|
||||
private FakeResponse response = new FakeResponse();
|
||||
private Dataflow dataflow = mock(Dataflow.class);
|
||||
private Projects projects = mock(Projects.class);
|
||||
private Locations locations = mock(Locations.class);
|
||||
private FlexTemplates templates = mock(FlexTemplates.class);
|
||||
private Launch launch = mock(Launch.class);
|
||||
private LaunchFlexTemplateResponse launchResponse =
|
||||
new LaunchFlexTemplateResponse().setJob(new Job().setId("jobid"));
|
||||
|
||||
private final FakeClock clock = new FakeClock(DateTime.parse("2018-06-11T12:23:56Z"));
|
||||
private GenerateSpec11ReportAction action;
|
||||
|
||||
@BeforeEach
|
||||
void beforeEach() throws IOException {
|
||||
when(dataflow.projects()).thenReturn(projects);
|
||||
when(projects.locations()).thenReturn(locations);
|
||||
when(locations.flexTemplates()).thenReturn(templates);
|
||||
when(templates.launch(anyString(), anyString(), any(LaunchFlexTemplateRequest.class)))
|
||||
.thenReturn(launch);
|
||||
when(launch.execute()).thenReturn(launchResponse);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testFailure_dataflowFailure() throws IOException {
|
||||
action =
|
||||
|
|
|
@ -37,14 +37,6 @@ steps:
|
|||
cat tool-credential.json.enc | base64 -d | gcloud kms decrypt \
|
||||
--ciphertext-file=- --plaintext-file=tool-credential.json \
|
||||
--location=global --keyring=nomulus-tool-keyring --key=nomulus-tool-key
|
||||
# Deploy the invoicing pipeline to GCS.
|
||||
- name: 'gcr.io/$PROJECT_ID/nomulus-tool:latest'
|
||||
args:
|
||||
- -e
|
||||
- ${_ENV}
|
||||
- --credential
|
||||
- tool-credential.json
|
||||
- deploy_invoicing_pipeline
|
||||
# Deploy the GAE config files.
|
||||
# First authorize the gcloud tool to use the credential json file, then
|
||||
# download and unzip the tarball that contains the relevant config files
|
||||
|
|
|
@ -92,6 +92,8 @@ steps:
|
|||
google/registry/beam/bulk_delete_datastore_pipeline_metadata.json \
|
||||
google.registry.beam.spec11.Spec11Pipeline \
|
||||
google/registry/beam/spec11_pipeline_metadata.json
|
||||
google.registry.beam.invoicing.InvoicingPipeline \
|
||||
google/registry/beam/invoicing_pipeline_metadata.json
|
||||
# Tentatively build and publish Cloud SQL schema jar here, before schema release
|
||||
# process is finalized. Also publish nomulus:core jars that are needed for
|
||||
# server/schema compatibility tests.
|
||||
|
|
Loading…
Add table
Reference in a new issue