From feab3633a6aa02945963cee1bfc52f3b089a8224 Mon Sep 17 00:00:00 2001 From: Lai Jiang Date: Thu, 22 Apr 2021 10:26:15 -0400 Subject: [PATCH] Migrate the billing pipeline to flex template (#1100) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This is similar to the migration of the spec11 pipeline in #1073. Also removed a few Dagger providers that are no longer needed. TESTED=tested the dataflow job on alpha. --- This change is [Reviewable](https://reviewable.io/reviews/google/nomulus/1100) --- core/build.gradle | 4 + .../registry/beam/invoicing/BillingEvent.java | 5 + .../beam/invoicing/InvoicingPipeline.java | 225 ++++++------- .../invoicing/InvoicingPipelineOptions.java | 37 ++ .../beam/invoicing/InvoicingUtils.java | 106 ------ .../registry/config/RegistryConfig.java | 93 ------ .../config/RegistryConfigSettings.java | 1 - .../registry/config/files/default-config.yaml | 3 - .../billing/GenerateInvoicesAction.java | 85 +++-- .../billing/PublishInvoicesAction.java | 5 +- .../tools/DeployInvoicingPipelineCommand.java | 31 -- .../google/registry/tools/RegistryTool.java | 1 - .../registry/tools/RegistryToolComponent.java | 2 - .../beam/init_sql_pipeline_metadata.json | 2 +- .../beam/invoicing/sql/billing_events.sql | 0 .../beam/invoicing_pipeline_metadata.json | 66 ++++ .../beam/spec11_pipeline_metadata.json | 4 +- .../batch/WipeOutDatastoreActionTest.java | 36 +- .../registry/beam/BeamActionTestBase.java | 57 ++++ .../beam/invoicing/InvoicingPipelineTest.java | 315 +++++++++--------- .../beam/invoicing/InvoicingUtilsTest.java | 73 ---- .../billing/GenerateInvoicesActionTest.java | 123 +++---- .../billing/PublishInvoicesActionTest.java | 39 ++- .../GenerateSpec11ReportActionTest.java | 38 +-- release/cloudbuild-deploy.yaml | 8 - release/cloudbuild-nomulus.yaml | 2 + 26 files changed, 554 insertions(+), 807 deletions(-) create mode 100644 core/src/main/java/google/registry/beam/invoicing/InvoicingPipelineOptions.java delete mode 100644 core/src/main/java/google/registry/beam/invoicing/InvoicingUtils.java delete mode 100644 core/src/main/java/google/registry/tools/DeployInvoicingPipelineCommand.java rename core/src/main/{java => resources}/google/registry/beam/invoicing/sql/billing_events.sql (100%) create mode 100644 core/src/main/resources/google/registry/beam/invoicing_pipeline_metadata.json create mode 100644 core/src/test/java/google/registry/beam/BeamActionTestBase.java delete mode 100644 core/src/test/java/google/registry/beam/invoicing/InvoicingUtilsTest.java diff --git a/core/build.gradle b/core/build.gradle index 5d8503fb5..594221965 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -810,6 +810,10 @@ if (environment in ['alpha', 'crash']) { mainClass: 'google.registry.beam.spec11.Spec11Pipeline', metaData: 'google/registry/beam/spec11_pipeline_metadata.json' ], + [ + mainClass: 'google.registry.beam.invoicing.InvoicingPipeline', + metaData: 'google/registry/beam/invoicing_pipeline_metadata.json' + ], ] project.tasks.create("stage_beam_pipelines") { doLast { diff --git a/core/src/main/java/google/registry/beam/invoicing/BillingEvent.java b/core/src/main/java/google/registry/beam/invoicing/BillingEvent.java index 4de572c3f..6aa33dc18 100644 --- a/core/src/main/java/google/registry/beam/invoicing/BillingEvent.java +++ b/core/src/main/java/google/registry/beam/invoicing/BillingEvent.java @@ -260,6 +260,11 @@ public abstract class BillingEvent implements Serializable { poNumber()); } + /** Returns the grouping key for this {@code BillingEvent}, to generate the detailed report. */ + String getDetailedReportGroupingKey() { + return String.format("%s_%s", registrarId(), tld()); + } + /** Key for each {@code BillingEvent}, when aggregating for the overall invoice. */ @AutoValue abstract static class InvoiceGroupingKey implements Serializable { diff --git a/core/src/main/java/google/registry/beam/invoicing/InvoicingPipeline.java b/core/src/main/java/google/registry/beam/invoicing/InvoicingPipeline.java index 0dce0f6d5..f9ae7d294 100644 --- a/core/src/main/java/google/registry/beam/invoicing/InvoicingPipeline.java +++ b/core/src/main/java/google/registry/beam/invoicing/InvoicingPipeline.java @@ -14,28 +14,28 @@ package google.registry.beam.invoicing; -import com.google.auth.oauth2.GoogleCredentials; +import static google.registry.beam.BeamUtils.getQueryFromFile; +import static org.apache.beam.sdk.values.TypeDescriptors.strings; + +import com.google.common.annotations.VisibleForTesting; import google.registry.beam.invoicing.BillingEvent.InvoiceGroupingKey; import google.registry.beam.invoicing.BillingEvent.InvoiceGroupingKey.InvoiceGroupingKeyCoder; -import google.registry.config.CredentialModule.LocalCredential; -import google.registry.config.RegistryConfig.Config; import google.registry.reporting.billing.BillingModule; -import google.registry.reporting.billing.GenerateInvoicesAction; -import google.registry.util.GoogleCredentialsBundle; +import google.registry.util.SqlTemplate; import java.io.Serializable; -import javax.inject.Inject; -import org.apache.beam.runners.dataflow.DataflowRunner; -import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.YearMonth; +import java.time.format.DateTimeFormatter; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.SerializableCoder; -import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params; -import org.apache.beam.sdk.io.FileBasedSink; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.FileIO; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; -import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.options.ValueProvider; -import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; +import org.apache.beam.sdk.transforms.Contextful; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Filter; import org.apache.beam.sdk.transforms.MapElements; @@ -43,107 +43,54 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TypeDescriptor; -import org.apache.beam.sdk.values.TypeDescriptors; /** - * Definition of a Dataflow pipeline template, which generates a given month's invoices. + * Definition of a Dataflow Flex pipeline template, which generates a given month's invoices. * - *

To stage this template on GCS, run the {@link - * google.registry.tools.DeployInvoicingPipelineCommand} Nomulus command. + *

To stage this template locally, run the {@code stage_beam_pipeline.sh} shell script. * *

Then, you can run the staged template via the API client library, gCloud or a raw REST call. - * For an example using the API client library, see {@link GenerateInvoicesAction}. * - * @see Dataflow Templates + * @see Using + * Flex Templates */ public class InvoicingPipeline implements Serializable { - private final String projectId; - private final String beamJobRegion; - private final String beamBucketUrl; - private final String invoiceTemplateUrl; - private final String beamStagingUrl; - private final String billingBucketUrl; - private final String invoiceFilePrefix; - private final GoogleCredentials googleCredentials; + private static final DateTimeFormatter TIMESTAMP_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"); - @Inject - public InvoicingPipeline( - @Config("projectId") String projectId, - @Config("defaultJobRegion") String beamJobRegion, - @Config("apacheBeamBucketUrl") String beamBucketUrl, - @Config("invoiceTemplateUrl") String invoiceTemplateUrl, - @Config("beamStagingUrl") String beamStagingUrl, - @Config("billingBucketUrl") String billingBucketUrl, - @Config("invoiceFilePrefix") String invoiceFilePrefix, - @LocalCredential GoogleCredentialsBundle googleCredentialsBundle) { - this.projectId = projectId; - this.beamJobRegion = beamJobRegion; - this.beamBucketUrl = beamBucketUrl; - this.invoiceTemplateUrl = invoiceTemplateUrl; - this.beamStagingUrl = beamStagingUrl; - this.billingBucketUrl = billingBucketUrl; - this.invoiceFilePrefix = invoiceFilePrefix; - this.googleCredentials = googleCredentialsBundle.getGoogleCredentials(); + private final InvoicingPipelineOptions options; + private final Pipeline pipeline; + + @VisibleForTesting + InvoicingPipeline(InvoicingPipelineOptions options, Pipeline pipeline) { + this.options = options; + this.pipeline = pipeline; } - /** Custom options for running the invoicing pipeline. */ - public interface InvoicingPipelineOptions extends DataflowPipelineOptions { - /** Returns the yearMonth we're generating invoices for, in yyyy-MM format. */ - @Description("The yearMonth we generate invoices for, in yyyy-MM format.") - ValueProvider getYearMonth(); - /** - * Sets the yearMonth we generate invoices for. - * - *

This is implicitly set when executing the Dataflow template, by specifying the 'yearMonth - * parameter. - */ - void setYearMonth(ValueProvider value); + InvoicingPipeline(InvoicingPipelineOptions options) { + this(options, Pipeline.create(options)); } - /** Deploys the invoicing pipeline as a template on GCS, for a given projectID and GCS bucket. */ - public void deploy() { - // We can't store options as a member variable due to serialization concerns. - InvoicingPipelineOptions options = PipelineOptionsFactory.as(InvoicingPipelineOptions.class); - options.setProject(projectId); - options.setRegion(beamJobRegion); - options.setRunner(DataflowRunner.class); - // This causes p.run() to stage the pipeline as a template on GCS, as opposed to running it. - options.setTemplateLocation(invoiceTemplateUrl); - options.setStagingLocation(beamStagingUrl); - // This credential is used when Dataflow deploys the template to GCS in target GCP project. - // So, make sure the credential has write permission to GCS in that project. - options.setGcpCredential(googleCredentials); - - Pipeline p = Pipeline.create(options); + PipelineResult run() { + setupPipeline(); + return pipeline.run(); + } + void setupPipeline() { PCollection billingEvents = - p.apply( + pipeline.apply( "Read BillingEvents from Bigquery", BigQueryIO.read(BillingEvent::parseFromRecord) - .fromQuery(InvoicingUtils.makeQueryProvider(options.getYearMonth(), projectId)) + .fromQuery(makeQuery(options.getYearMonth(), options.getProject())) .withCoder(SerializableCoder.of(BillingEvent.class)) .usingStandardSql() .withoutValidation() .withTemplateCompatibility()); - applyTerminalTransforms(billingEvents, options.getYearMonth()); - p.run(); - } - /** - * Applies output transforms to the {@code BillingEvent} source collection. - * - *

This is factored out purely to facilitate testing. - */ - void applyTerminalTransforms( - PCollection billingEvents, ValueProvider yearMonthProvider) { - billingEvents - .apply("Generate overall invoice rows", new GenerateInvoiceRows()) - .apply("Write overall invoice to CSV", writeInvoice(yearMonthProvider)); + saveInvoiceCsv(billingEvents, options); - billingEvents.apply( - "Write detail reports to separate CSVs keyed by registrarId_tld pair", - writeDetailReports(yearMonthProvider)); + saveDetailedCsv(billingEvents, options); } /** Transform that converts a {@code BillingEvent} into an invoice CSV row. */ @@ -156,49 +103,85 @@ public class InvoicingPipeline implements Serializable { "Map to invoicing key", MapElements.into(TypeDescriptor.of(InvoiceGroupingKey.class)) .via(BillingEvent::getInvoiceGroupingKey)) - .apply(Filter.by((InvoiceGroupingKey key) -> key.unitPrice() != 0)) + .apply( + "Filter out free events", Filter.by((InvoiceGroupingKey key) -> key.unitPrice() != 0)) .setCoder(new InvoiceGroupingKeyCoder()) .apply("Count occurrences", Count.perElement()) .apply( "Format as CSVs", - MapElements.into(TypeDescriptors.strings()) + MapElements.into(strings()) .via((KV kv) -> kv.getKey().toCsv(kv.getValue()))); } } - /** Returns an IO transform that writes the overall invoice to a single CSV file. */ - private TextIO.Write writeInvoice(ValueProvider yearMonthProvider) { - return TextIO.write() - .to( - NestedValueProvider.of( - yearMonthProvider, - yearMonth -> + /** Saves the billing events to a single overall invoice CSV file. */ + static void saveInvoiceCsv( + PCollection billingEvents, InvoicingPipelineOptions options) { + billingEvents + .apply("Generate overall invoice rows", new GenerateInvoiceRows()) + .apply( + "Write overall invoice to CSV", + TextIO.write() + .to( String.format( "%s/%s/%s/%s-%s", - billingBucketUrl, + options.getBillingBucketUrl(), BillingModule.INVOICES_DIRECTORY, - yearMonth, - invoiceFilePrefix, - yearMonth))) - .withHeader(InvoiceGroupingKey.invoiceHeader()) - .withoutSharding() - .withSuffix(".csv"); + options.getYearMonth(), + options.getInvoiceFilePrefix(), + options.getYearMonth())) + .withHeader(InvoiceGroupingKey.invoiceHeader()) + .withoutSharding() + .withSuffix(".csv")); } - /** Returns an IO transform that writes detail reports to registrar-tld keyed CSV files. */ - private TextIO.TypedWrite writeDetailReports( - ValueProvider yearMonthProvider) { - return TextIO.writeCustomType() - .to( - InvoicingUtils.makeDestinationFunction( - String.format("%s/%s", billingBucketUrl, BillingModule.INVOICES_DIRECTORY), - yearMonthProvider), - InvoicingUtils.makeEmptyDestinationParams(billingBucketUrl + "/errors")) - .withFormatFunction(BillingEvent::toCsv) - .withoutSharding() - .withTempDirectory( - FileBasedSink.convertToFileResourceIfPossible(beamBucketUrl + "/temporary")) - .withHeader(BillingEvent.getHeader()) - .withSuffix(".csv"); + /** Saves the billing events to detailed report CSV files keyed by registrar-tld pairs. */ + static void saveDetailedCsv( + PCollection billingEvents, InvoicingPipelineOptions options) { + String yearMonth = options.getYearMonth(); + billingEvents.apply( + "Write detailed report for each registrar-tld pair", + FileIO.writeDynamic() + .to( + String.format( + "%s/%s/%s", + options.getBillingBucketUrl(), BillingModule.INVOICES_DIRECTORY, yearMonth)) + .by(BillingEvent::getDetailedReportGroupingKey) + .withNumShards(1) + .withDestinationCoder(StringUtf8Coder.of()) + .withNaming( + key -> + (window, pane, numShards, shardIndex, compression) -> + String.format( + "%s_%s_%s.csv", BillingModule.DETAIL_REPORT_PREFIX, yearMonth, key)) + .via( + Contextful.fn(BillingEvent::toCsv), + TextIO.sink().withHeader(BillingEvent.getHeader()))); + } + + /** Create the Bigquery query for a given project and yearMonth at runtime. */ + static String makeQuery(String yearMonth, String projectId) { + // Get the timestamp endpoints capturing the entire month with microsecond precision + YearMonth reportingMonth = YearMonth.parse(yearMonth); + LocalDateTime firstMoment = reportingMonth.atDay(1).atTime(LocalTime.MIDNIGHT); + LocalDateTime lastMoment = reportingMonth.atEndOfMonth().atTime(LocalTime.MAX); + // Construct the month's query by filling in the billing_events.sql template + return SqlTemplate.create(getQueryFromFile(InvoicingPipeline.class, "billing_events.sql")) + .put("FIRST_TIMESTAMP_OF_MONTH", firstMoment.format(TIMESTAMP_FORMATTER)) + .put("LAST_TIMESTAMP_OF_MONTH", lastMoment.format(TIMESTAMP_FORMATTER)) + .put("PROJECT_ID", projectId) + .put("DATASTORE_EXPORT_DATA_SET", "latest_datastore_export") + .put("ONETIME_TABLE", "OneTime") + .put("REGISTRY_TABLE", "Registry") + .put("REGISTRAR_TABLE", "Registrar") + .put("CANCELLATION_TABLE", "Cancellation") + .build(); + } + + public static void main(String[] args) { + PipelineOptionsFactory.register(InvoicingPipelineOptions.class); + InvoicingPipelineOptions options = + PipelineOptionsFactory.fromArgs(args).withValidation().as(InvoicingPipelineOptions.class); + new InvoicingPipeline(options).run(); } } diff --git a/core/src/main/java/google/registry/beam/invoicing/InvoicingPipelineOptions.java b/core/src/main/java/google/registry/beam/invoicing/InvoicingPipelineOptions.java new file mode 100644 index 000000000..ff4d5a69d --- /dev/null +++ b/core/src/main/java/google/registry/beam/invoicing/InvoicingPipelineOptions.java @@ -0,0 +1,37 @@ +// Copyright 2021 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam.invoicing; + +import google.registry.beam.common.RegistryPipelineOptions; +import org.apache.beam.sdk.options.Description; + +/** Custom options for running the invoicing pipeline. */ +public interface InvoicingPipelineOptions extends RegistryPipelineOptions { + + @Description("The year and month we generate invoices for, in yyyy-MM format.") + String getYearMonth(); + + void setYearMonth(String value); + + @Description("Filename prefix for the invoice CSV file.") + String getInvoiceFilePrefix(); + + void setInvoiceFilePrefix(String value); + + @Description("The GCS bucket URL for invoices and detailed reports to be uploaded.") + String getBillingBucketUrl(); + + void setBillingBucketUrl(String value); +} diff --git a/core/src/main/java/google/registry/beam/invoicing/InvoicingUtils.java b/core/src/main/java/google/registry/beam/invoicing/InvoicingUtils.java deleted file mode 100644 index 07cabeb2d..000000000 --- a/core/src/main/java/google/registry/beam/invoicing/InvoicingUtils.java +++ /dev/null @@ -1,106 +0,0 @@ -// Copyright 2018 The Nomulus Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package google.registry.beam.invoicing; - -import static google.registry.beam.BeamUtils.getQueryFromFile; - -import google.registry.util.SqlTemplate; -import java.time.LocalDateTime; -import java.time.LocalTime; -import java.time.YearMonth; -import java.time.format.DateTimeFormatter; -import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params; -import org.apache.beam.sdk.io.FileBasedSink; -import org.apache.beam.sdk.options.ValueProvider; -import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; -import org.apache.beam.sdk.transforms.SerializableFunction; - -/** Pipeline helper functions used to generate invoices from instances of {@link BillingEvent}. */ -public class InvoicingUtils { - - private InvoicingUtils() {} - - private static final DateTimeFormatter TIMESTAMP_FORMATTER = - DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"); - - /** - * Returns a function mapping from {@code BillingEvent} to filename {@code Params}. - * - *

Beam uses this to determine which file a given {@code BillingEvent} should get placed into. - * - * @param outputBucket the GCS bucket we're outputting reports to - * @param yearMonthProvider a runtime provider for the yyyy-MM we're generating the invoice for - */ - static SerializableFunction makeDestinationFunction( - String outputBucket, ValueProvider yearMonthProvider) { - return billingEvent -> - new Params() - .withShardTemplate("") - .withSuffix(".csv") - .withBaseFilename( - NestedValueProvider.of( - yearMonthProvider, - yearMonth -> - FileBasedSink.convertToFileResourceIfPossible( - String.format( - "%s/%s/%s", - outputBucket, yearMonth, billingEvent.toFilename(yearMonth))))); - } - - /** - * Returns the default filename parameters for an unmappable {@code BillingEvent}. - * - *

The "failed" file should only be populated when an error occurs, which warrants further - * investigation. - */ - static Params makeEmptyDestinationParams(String outputBucket) { - return new Params() - .withBaseFilename( - FileBasedSink.convertToFileResourceIfPossible( - String.format("%s/%s", outputBucket, "FAILURES"))); - } - - /** - * Returns a provider that creates a Bigquery query for a given project and yearMonth at runtime. - * - *

We only know yearMonth at runtime, so this provider fills in the {@code - * sql/billing_events.sql} template at runtime. - * - * @param yearMonthProvider a runtime provider that returns which month we're invoicing for. - * @param projectId the projectId we're generating invoicing for. - */ - static ValueProvider makeQueryProvider( - ValueProvider yearMonthProvider, String projectId) { - return NestedValueProvider.of( - yearMonthProvider, - (yearMonth) -> { - // Get the timestamp endpoints capturing the entire month with microsecond precision - YearMonth reportingMonth = YearMonth.parse(yearMonth); - LocalDateTime firstMoment = reportingMonth.atDay(1).atTime(LocalTime.MIDNIGHT); - LocalDateTime lastMoment = reportingMonth.atEndOfMonth().atTime(LocalTime.MAX); - // Construct the month's query by filling in the billing_events.sql template - return SqlTemplate.create(getQueryFromFile(InvoicingPipeline.class, "billing_events.sql")) - .put("FIRST_TIMESTAMP_OF_MONTH", firstMoment.format(TIMESTAMP_FORMATTER)) - .put("LAST_TIMESTAMP_OF_MONTH", lastMoment.format(TIMESTAMP_FORMATTER)) - .put("PROJECT_ID", projectId) - .put("DATASTORE_EXPORT_DATA_SET", "latest_datastore_export") - .put("ONETIME_TABLE", "OneTime") - .put("REGISTRY_TABLE", "Registry") - .put("REGISTRAR_TABLE", "Registrar") - .put("CANCELLATION_TABLE", "Cancellation") - .build(); - }); - } -} diff --git a/core/src/main/java/google/registry/config/RegistryConfig.java b/core/src/main/java/google/registry/config/RegistryConfig.java index ee359adf7..46ad5b77f 100644 --- a/core/src/main/java/google/registry/config/RegistryConfig.java +++ b/core/src/main/java/google/registry/config/RegistryConfig.java @@ -384,19 +384,6 @@ public final class RegistryConfig { return Duration.standardHours(1); } - /** - * Number of sharded entity group roots used for performing strongly consistent scans. - * - *

Warning: This number may increase but never decrease. - * - * @see google.registry.model.index.EppResourceIndex - */ - @Provides - @Config("eppResourceIndexBucketCount") - public static int provideEppResourceIndexBucketCount(RegistryConfigSettings config) { - return config.datastore.eppResourceIndexBucketsNum; - } - @Provides @Config("cloudSqlJdbcUrl") public static String providesCloudSqlJdbcUrl(RegistryConfigSettings config) { @@ -564,53 +551,6 @@ public final class RegistryConfig { return config.gSuite.outgoingEmailDisplayName; } - /** - * Returns the name of the GCS bucket for storing Beam templates and results. - * - * @see google.registry.reporting.billing.GenerateInvoicesAction - */ - @Provides - @Config("apacheBeamBucket") - public static String provideApacheBeamBucket(@Config("projectId") String projectId) { - return projectId + "-beam"; - } - - /** - * Returns the URL of the GCS location for storing Apache Beam related objects. - * - * @see google.registry.reporting.billing.GenerateInvoicesAction - */ - @Provides - @Config("apacheBeamBucketUrl") - public static String provideApacheBeamBucketUrl(@Config("apacheBeamBucket") String beamBucket) { - return "gs://" + beamBucket; - } - - /** - * Returns the URL of the GCS location for storing the monthly invoicing Beam template. - * - * @see google.registry.reporting.billing.GenerateInvoicesAction - * @see google.registry.beam.invoicing.InvoicingPipeline - */ - @Provides - @Config("invoiceTemplateUrl") - public static String provideInvoiceTemplateUrl( - @Config("apacheBeamBucketUrl") String beamBucketUrl) { - return beamBucketUrl + "/templates/invoicing"; - } - - /** - * Returns the URL of the GCS location for storing the monthly spec11 Beam template. - * - * @see google.registry.beam.spec11.Spec11Pipeline - */ - @Provides - @Config("spec11TemplateUrl") - public static String provideSpec11TemplateUrl( - @Config("apacheBeamBucketUrl") String beamBucketUrl) { - return beamBucketUrl + "/templates/spec11"; - } - /** * Returns whether an SSL certificate hash is required to log in via EPP and run flows. * @@ -634,18 +574,6 @@ public final class RegistryConfig { return config.beam.defaultJobRegion; } - /** - * Returns the default job zone to run Apache Beam (Cloud Dataflow) jobs in. - * - * @see google.registry.reporting.billing.GenerateInvoicesAction - * @see google.registry.reporting.spec11.GenerateSpec11ReportAction - */ - @Provides - @Config("defaultJobZone") - public static String provideDefaultJobZone(RegistryConfigSettings config) { - return config.beam.defaultJobZone; - } - /** Returns the GCS bucket URL with all staged BEAM flex templates. */ @Provides @Config("beamStagingBucketUrl") @@ -653,19 +581,6 @@ public final class RegistryConfig { return config.beam.stagingBucketUrl; } - /** - * Returns the URL of the GCS location we store jar dependencies for beam pipelines. - * - * @see google.registry.beam.invoicing.InvoicingPipeline - * @see google.registry.beam.spec11.Spec11Pipeline - */ - @Provides - @Config("beamStagingUrl") - public static String provideInvoiceStagingUrl( - @Config("apacheBeamBucketUrl") String beamBucketUrl) { - return beamBucketUrl + "/staging"; - } - /** * Returns the Google Cloud Storage bucket for Spec11 and ICANN transaction and activity reports * to be uploaded. @@ -1227,14 +1142,6 @@ public final class RegistryConfig { return formatComments(config.registryPolicy.reservedTermsExportDisclaimer); } - /** Returns the clientId of the registrar used by the {@code CheckApiServlet}. */ - // TODO(b/80417678): remove this once CheckApiAction no longer uses this id. - @Provides - @Config("checkApiServletRegistrarClientId") - public static String provideCheckApiServletRegistrarClientId(RegistryConfigSettings config) { - return config.registryPolicy.checkApiServletClientId; - } - /** * Returns the clientId of the registrar that admins are automatically logged in as if they * aren't otherwise associated with one. diff --git a/core/src/main/java/google/registry/config/RegistryConfigSettings.java b/core/src/main/java/google/registry/config/RegistryConfigSettings.java index d33383aec..e5dffb563 100644 --- a/core/src/main/java/google/registry/config/RegistryConfigSettings.java +++ b/core/src/main/java/google/registry/config/RegistryConfigSettings.java @@ -133,7 +133,6 @@ public class RegistryConfigSettings { /** Configuration for Apache Beam (Cloud Dataflow). */ public static class Beam { public String defaultJobRegion; - public String defaultJobZone; public String stagingBucketUrl; } diff --git a/core/src/main/java/google/registry/config/files/default-config.yaml b/core/src/main/java/google/registry/config/files/default-config.yaml index 7bd04db53..44c07d690 100644 --- a/core/src/main/java/google/registry/config/files/default-config.yaml +++ b/core/src/main/java/google/registry/config/files/default-config.yaml @@ -420,9 +420,6 @@ misc: beam: # The default region to run Apache Beam (Cloud Dataflow) jobs in. defaultJobRegion: us-east1 - # The default zone to run Apache Beam (Cloud Dataflow) jobs in. - # TODO(weiminyu): consider dropping zone config. No obvious needs for this. - defaultJobZone: us-east1-c stagingBucketUrl: gcs-bucket-with-staged-templates keyring: diff --git a/core/src/main/java/google/registry/reporting/billing/GenerateInvoicesAction.java b/core/src/main/java/google/registry/reporting/billing/GenerateInvoicesAction.java index 41866c35d..a6770ca2e 100644 --- a/core/src/main/java/google/registry/reporting/billing/GenerateInvoicesAction.java +++ b/core/src/main/java/google/registry/reporting/billing/GenerateInvoicesAction.java @@ -14,6 +14,7 @@ package google.registry.reporting.billing; +import static google.registry.beam.BeamUtils.createJobName; import static google.registry.reporting.ReportingUtils.enqueueBeamReportingTask; import static google.registry.reporting.billing.BillingModule.PARAM_SHOULD_PUBLISH; import static google.registry.request.Action.Method.POST; @@ -21,9 +22,9 @@ import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR; import static javax.servlet.http.HttpServletResponse.SC_OK; import com.google.api.services.dataflow.Dataflow; -import com.google.api.services.dataflow.model.LaunchTemplateParameters; -import com.google.api.services.dataflow.model.LaunchTemplateResponse; -import com.google.api.services.dataflow.model.RuntimeEnvironment; +import com.google.api.services.dataflow.model.LaunchFlexTemplateParameter; +import com.google.api.services.dataflow.model.LaunchFlexTemplateRequest; +import com.google.api.services.dataflow.model.LaunchFlexTemplateResponse; import com.google.common.collect.ImmutableMap; import com.google.common.flogger.FluentLogger; import com.google.common.net.MediaType; @@ -33,6 +34,7 @@ import google.registry.request.Action; import google.registry.request.Parameter; import google.registry.request.Response; import google.registry.request.auth.Auth; +import google.registry.util.Clock; import java.io.IOException; import java.util.Map; import javax.inject.Inject; @@ -42,9 +44,8 @@ import org.joda.time.YearMonth; * Invokes the {@code InvoicingPipeline} beam template via the REST api, and enqueues the {@link * PublishInvoicesAction} to publish the subsequent output. * - *

This action runs the {@link google.registry.beam.invoicing.InvoicingPipeline} beam template, - * staged at gs://<projectId>-beam/templates/invoicing. The pipeline then generates invoices - * for the month and stores them on GCS. + *

This action runs the {@link google.registry.beam.invoicing.InvoicingPipeline} beam flex + * template. The pipeline then generates invoices for the month and stores them on GCS. */ @Action( service = Action.Service.BACKEND, @@ -56,57 +57,73 @@ public class GenerateInvoicesAction implements Runnable { private static final FluentLogger logger = FluentLogger.forEnclosingClass(); static final String PATH = "/_dr/task/generateInvoices"; + static final String PIPELINE_NAME = "invoicing_pipeline"; private final String projectId; - private final String beamBucketUrl; - private final String invoiceTemplateUrl; - private final String jobZone; + private final String jobRegion; + private final String stagingBucketUrl; + private final String billingBucketUrl; + private final String invoiceFilePrefix; private final boolean shouldPublish; private final YearMonth yearMonth; - private final Dataflow dataflow; - private final Response response; private final BillingEmailUtils emailUtils; + private final Clock clock; + private final Response response; + private final Dataflow dataflow; @Inject GenerateInvoicesAction( @Config("projectId") String projectId, - @Config("apacheBeamBucketUrl") String beamBucketUrl, - @Config("invoiceTemplateUrl") String invoiceTemplateUrl, - @Config("defaultJobZone") String jobZone, + @Config("defaultJobRegion") String jobRegion, + @Config("beamStagingBucketUrl") String stagingBucketUrl, + @Config("billingBucketUrl") String billingBucketUrl, + @Config("invoiceFilePrefix") String invoiceFilePrefix, @Parameter(PARAM_SHOULD_PUBLISH) boolean shouldPublish, YearMonth yearMonth, - Dataflow dataflow, + BillingEmailUtils emailUtils, + Clock clock, Response response, - BillingEmailUtils emailUtils) { + Dataflow dataflow) { this.projectId = projectId; - this.beamBucketUrl = beamBucketUrl; - this.invoiceTemplateUrl = invoiceTemplateUrl; - this.jobZone = jobZone; + this.jobRegion = jobRegion; + this.stagingBucketUrl = stagingBucketUrl; + this.billingBucketUrl = billingBucketUrl; + this.invoiceFilePrefix = invoiceFilePrefix; this.shouldPublish = shouldPublish; this.yearMonth = yearMonth; - this.dataflow = dataflow; - this.response = response; this.emailUtils = emailUtils; + this.clock = clock; + this.response = response; + this.dataflow = dataflow; } @Override public void run() { + response.setContentType(MediaType.PLAIN_TEXT_UTF_8); logger.atInfo().log("Launching invoicing pipeline for %s", yearMonth); try { - LaunchTemplateParameters params = - new LaunchTemplateParameters() - .setJobName(String.format("invoicing-%s", yearMonth)) - .setEnvironment( - new RuntimeEnvironment() - .setZone(jobZone) - .setTempLocation(beamBucketUrl + "/temporary")) - .setParameters(ImmutableMap.of("yearMonth", yearMonth.toString("yyyy-MM"))); - LaunchTemplateResponse launchResponse = + LaunchFlexTemplateParameter parameter = + new LaunchFlexTemplateParameter() + .setJobName(createJobName("invoicing", clock)) + .setContainerSpecGcsPath( + String.format("%s/%s_metadata.json", stagingBucketUrl, PIPELINE_NAME)) + .setParameters( + ImmutableMap.of( + "yearMonth", + yearMonth.toString("yyyy-MM"), + "invoiceFilePrefix", + invoiceFilePrefix, + "billingBucketUrl", + billingBucketUrl)); + LaunchFlexTemplateResponse launchResponse = dataflow .projects() - .templates() - .launch(projectId, params) - .setGcsPath(invoiceTemplateUrl) + .locations() + .flexTemplates() + .launch( + projectId, + jobRegion, + new LaunchFlexTemplateRequest().setLaunchParameter(parameter)) .execute(); logger.atInfo().log("Got response: %s", launchResponse.getJob().toPrettyString()); String jobId = launchResponse.getJob().getId(); @@ -123,12 +140,10 @@ public class GenerateInvoicesAction implements Runnable { logger.atWarning().withCause(e).log("Template Launch failed"); emailUtils.sendAlertEmail(String.format("Template Launch failed due to %s", e.getMessage())); response.setStatus(SC_INTERNAL_SERVER_ERROR); - response.setContentType(MediaType.PLAIN_TEXT_UTF_8); response.setPayload(String.format("Template launch failed: %s", e.getMessage())); return; } response.setStatus(SC_OK); - response.setContentType(MediaType.PLAIN_TEXT_UTF_8); response.setPayload("Launched dataflow template."); } } diff --git a/core/src/main/java/google/registry/reporting/billing/PublishInvoicesAction.java b/core/src/main/java/google/registry/reporting/billing/PublishInvoicesAction.java index 971d9e77c..b12e30bd9 100644 --- a/core/src/main/java/google/registry/reporting/billing/PublishInvoicesAction.java +++ b/core/src/main/java/google/registry/reporting/billing/PublishInvoicesAction.java @@ -59,6 +59,7 @@ public class PublishInvoicesAction implements Runnable { private static final String JOB_FAILED = "JOB_STATE_FAILED"; private final String projectId; + private final String jobRegion; private final String jobId; private final BillingEmailUtils emailUtils; private final Dataflow dataflow; @@ -68,12 +69,14 @@ public class PublishInvoicesAction implements Runnable { @Inject PublishInvoicesAction( @Config("projectId") String projectId, + @Config("defaultJobRegion") String jobRegion, @Parameter(ReportingModule.PARAM_JOB_ID) String jobId, BillingEmailUtils emailUtils, Dataflow dataflow, Response response, YearMonth yearMonth) { this.projectId = projectId; + this.jobRegion = jobRegion; this.jobId = jobId; this.emailUtils = emailUtils; this.dataflow = dataflow; @@ -87,7 +90,7 @@ public class PublishInvoicesAction implements Runnable { public void run() { try { logger.atInfo().log("Starting publish job."); - Job job = dataflow.projects().jobs().get(projectId, jobId).execute(); + Job job = dataflow.projects().locations().jobs().get(projectId, jobRegion, jobId).execute(); String state = job.getCurrentState(); switch (state) { case JOB_DONE: diff --git a/core/src/main/java/google/registry/tools/DeployInvoicingPipelineCommand.java b/core/src/main/java/google/registry/tools/DeployInvoicingPipelineCommand.java deleted file mode 100644 index d76829062..000000000 --- a/core/src/main/java/google/registry/tools/DeployInvoicingPipelineCommand.java +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright 2017 The Nomulus Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package google.registry.tools; - -import com.beust.jcommander.Parameters; -import google.registry.beam.invoicing.InvoicingPipeline; -import javax.inject.Inject; - -/** Nomulus command that deploys the {@link InvoicingPipeline} template. */ -@Parameters(commandDescription = "Deploy the invoicing pipeline to GCS.") -public class DeployInvoicingPipelineCommand implements Command { - - @Inject InvoicingPipeline invoicingPipeline; - - @Override - public void run() { - invoicingPipeline.deploy(); - } -} diff --git a/core/src/main/java/google/registry/tools/RegistryTool.java b/core/src/main/java/google/registry/tools/RegistryTool.java index f25997a37..30b9bda34 100644 --- a/core/src/main/java/google/registry/tools/RegistryTool.java +++ b/core/src/main/java/google/registry/tools/RegistryTool.java @@ -62,7 +62,6 @@ public final class RegistryTool { .put("delete_premium_list", DeletePremiumListCommand.class) .put("delete_reserved_list", DeleteReservedListCommand.class) .put("delete_tld", DeleteTldCommand.class) - .put("deploy_invoicing_pipeline", DeployInvoicingPipelineCommand.class) .put("encrypt_escrow_deposit", EncryptEscrowDepositCommand.class) .put("execute_epp", ExecuteEppCommand.class) .put("generate_allocation_tokens", GenerateAllocationTokensCommand.class) diff --git a/core/src/main/java/google/registry/tools/RegistryToolComponent.java b/core/src/main/java/google/registry/tools/RegistryToolComponent.java index 4bbefb4d0..c655ff120 100644 --- a/core/src/main/java/google/registry/tools/RegistryToolComponent.java +++ b/core/src/main/java/google/registry/tools/RegistryToolComponent.java @@ -105,8 +105,6 @@ interface RegistryToolComponent { void inject(DeleteContactByRoidCommand command); - void inject(DeployInvoicingPipelineCommand command); - void inject(EncryptEscrowDepositCommand command); void inject(GenerateAllocationTokensCommand command); diff --git a/core/src/main/resources/google/registry/beam/init_sql_pipeline_metadata.json b/core/src/main/resources/google/registry/beam/init_sql_pipeline_metadata.json index 69d96fa63..d3c251b50 100644 --- a/core/src/main/resources/google/registry/beam/init_sql_pipeline_metadata.json +++ b/core/src/main/resources/google/registry/beam/init_sql_pipeline_metadata.json @@ -5,7 +5,7 @@ { "name": "registryEnvironment", "label": "The Registry environment.", - "helpText": "The Registry environment, required if environment-specific initialization is needed on worker VMs.", + "helpText": "The Registry environment, required if environment-specific initialization (such as JPA) is needed on worker VMs.", "is_optional": true, "regexes": [ "^[0-9A-Z_]+$" diff --git a/core/src/main/java/google/registry/beam/invoicing/sql/billing_events.sql b/core/src/main/resources/google/registry/beam/invoicing/sql/billing_events.sql similarity index 100% rename from core/src/main/java/google/registry/beam/invoicing/sql/billing_events.sql rename to core/src/main/resources/google/registry/beam/invoicing/sql/billing_events.sql diff --git a/core/src/main/resources/google/registry/beam/invoicing_pipeline_metadata.json b/core/src/main/resources/google/registry/beam/invoicing_pipeline_metadata.json new file mode 100644 index 000000000..2b709cbef --- /dev/null +++ b/core/src/main/resources/google/registry/beam/invoicing_pipeline_metadata.json @@ -0,0 +1,66 @@ +{ + "name": "Invoice and Detailed Reports Generation", + "description": "An Apache Beam batch pipeline that reads from a Datastore export and generate monthly invoice and detailed reports, saving them on GCS.", + "parameters": [ + { + "name": "registryEnvironment", + "label": "The Registry environment.", + "helpText": "The Registry environment, required if environment-specific initialization (such as JPA) is needed on worker VMs.", + "is_optional": true, + "regexes": [ + "^[0-9A-Z_]+$" + ] + }, + { + "name": "isolationOverride", + "label": "The desired SQL transaction isolation level.", + "helpText": "The desired SQL transaction isolation level.", + "is_optional": true, + "regexes": [ + "^[0-9A-Z_]+$" + ] + }, + { + "name": "sqlWriteBatchSize", + "label": "SQL write batch size.", + "helpText": "The number of entities to write to the SQL database in one operation.", + "is_optional": true, + "regexes": [ + "^[1-9][0-9]*$" + ] + }, + { + "name": "sqlWriteShards", + "label": "Number of output shards to create when writing to SQL.", + "helpText": "Number of shards to create out of the data before writing to the SQL database. Please refer to the Javadoc of RegistryJpaIO.Write.shards() for how to choose this value.", + "is_optional": true, + "regexes": [ + "^[1-9][0-9]*$" + ] + }, + { + "name": "yearMonth", + "label": "The year and month we generate invoice and detailed reports for.", + "helpText": "The year and month for which the invoice and detailed reports are generated, in yyyy-MM format.", + "regexes": [ + "^2[0-9]{3}-(0[1-9]|1[0-2])$" + ] + }, + { + "name": "invoiceFilePrefix", + "label": "Filename prefix for the invoice CSV file.", + "helpText": "The prefix that will be applied to the invoice file.", + "regexes": [ + "^[a-zA-Z0-9_\\-]+$" + ] + }, + { + "name": "billingBucketUrl", + "label": "invoice and detailed reports upload dir.", + "helpText": "The root directory of the reports to upload.", + "regexes": [ + "^gs:\\/\\/[^\\n\\r]+$" + ] + } + ] +} diff --git a/core/src/main/resources/google/registry/beam/spec11_pipeline_metadata.json b/core/src/main/resources/google/registry/beam/spec11_pipeline_metadata.json index f9e17b2c1..eaff2dfe8 100644 --- a/core/src/main/resources/google/registry/beam/spec11_pipeline_metadata.json +++ b/core/src/main/resources/google/registry/beam/spec11_pipeline_metadata.json @@ -5,7 +5,7 @@ { "name": "registryEnvironment", "label": "The Registry environment.", - "helpText": "The Registry environment, required if environment-specific initialization is needed on worker VMs.", + "helpText": "The Registry environment, required if environment-specific initialization (such as JPA) is needed on worker VMs.", "is_optional": true, "regexes": [ "^[0-9A-Z_]+$" @@ -41,7 +41,7 @@ { "name": "date", "label": "The date when the pipeline runs", - "helpText": "The date then the threat scan is performed, in yyyy-MM-dd format.", + "helpText": "The date when the threat scan is performed, in yyyy-MM-dd format.", "regexes": [ "^2[0-9]{3}-(0[1-9]|1[0-2])-(0[1-9]|[12][0-9]|3[01])$" ] diff --git a/core/src/test/java/google/registry/batch/WipeOutDatastoreActionTest.java b/core/src/test/java/google/registry/batch/WipeOutDatastoreActionTest.java index c3b4d9d42..ce3f780be 100644 --- a/core/src/test/java/google/registry/batch/WipeOutDatastoreActionTest.java +++ b/core/src/test/java/google/registry/batch/WipeOutDatastoreActionTest.java @@ -18,52 +18,20 @@ import static com.google.common.truth.Truth.assertThat; import static javax.servlet.http.HttpServletResponse.SC_FORBIDDEN; import static org.apache.http.HttpStatus.SC_INTERNAL_SERVER_ERROR; import static org.apache.http.HttpStatus.SC_OK; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -import com.google.api.services.dataflow.Dataflow; -import com.google.api.services.dataflow.model.Job; -import com.google.api.services.dataflow.model.LaunchFlexTemplateRequest; -import com.google.api.services.dataflow.model.LaunchFlexTemplateResponse; +import google.registry.beam.BeamActionTestBase; import google.registry.testing.FakeClock; -import google.registry.testing.FakeResponse; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; /** Unit tests for {@link WipeoutDatastoreAction}. */ -@ExtendWith(MockitoExtension.class) -class WipeOutDatastoreActionTest { - - @Mock private Dataflow dataflow; - @Mock private Dataflow.Projects projects; - @Mock private Dataflow.Projects.Locations locations; - @Mock private Dataflow.Projects.Locations.FlexTemplates flexTemplates; - @Mock private Dataflow.Projects.Locations.FlexTemplates.Launch launch; - private LaunchFlexTemplateResponse launchResponse = - new LaunchFlexTemplateResponse().setJob(new Job()); +class WipeOutDatastoreActionTest extends BeamActionTestBase { private final FakeClock clock = new FakeClock(); - private final FakeResponse response = new FakeResponse(); - - @BeforeEach - void beforeEach() throws Exception { - lenient().when(dataflow.projects()).thenReturn(projects); - lenient().when(projects.locations()).thenReturn(locations); - lenient().when(locations.flexTemplates()).thenReturn(flexTemplates); - lenient() - .when(flexTemplates.launch(anyString(), anyString(), any(LaunchFlexTemplateRequest.class))) - .thenReturn(launch); - lenient().when(launch.execute()).thenReturn(launchResponse); - } @Test void run_projectNotAllowed() { diff --git a/core/src/test/java/google/registry/beam/BeamActionTestBase.java b/core/src/test/java/google/registry/beam/BeamActionTestBase.java new file mode 100644 index 000000000..473b871bc --- /dev/null +++ b/core/src/test/java/google/registry/beam/BeamActionTestBase.java @@ -0,0 +1,57 @@ +// Copyright 2021 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.api.services.dataflow.Dataflow; +import com.google.api.services.dataflow.Dataflow.Projects; +import com.google.api.services.dataflow.Dataflow.Projects.Locations; +import com.google.api.services.dataflow.Dataflow.Projects.Locations.FlexTemplates; +import com.google.api.services.dataflow.Dataflow.Projects.Locations.FlexTemplates.Launch; +import com.google.api.services.dataflow.model.Job; +import com.google.api.services.dataflow.model.LaunchFlexTemplateRequest; +import com.google.api.services.dataflow.model.LaunchFlexTemplateResponse; +import google.registry.testing.FakeResponse; +import org.junit.jupiter.api.BeforeEach; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; + +/** Base class for all actions that launches a Dataflow Flex template. */ +@MockitoSettings(strictness = Strictness.STRICT_STUBS) +public abstract class BeamActionTestBase { + + protected FakeResponse response = new FakeResponse(); + protected Dataflow dataflow = mock(Dataflow.class); + private Projects projects = mock(Projects.class); + private Locations locations = mock(Locations.class); + private FlexTemplates templates = mock(FlexTemplates.class); + protected Launch launch = mock(Launch.class); + private LaunchFlexTemplateResponse launchResponse = + new LaunchFlexTemplateResponse().setJob(new Job().setId("jobid")); + + @BeforeEach + void beforeEach() throws Exception { + when(dataflow.projects()).thenReturn(projects); + when(projects.locations()).thenReturn(locations); + when(locations.flexTemplates()).thenReturn(templates); + when(templates.launch(anyString(), anyString(), any(LaunchFlexTemplateRequest.class))) + .thenReturn(launch); + when(launch.execute()).thenReturn(launchResponse); + } +} diff --git a/core/src/test/java/google/registry/beam/invoicing/InvoicingPipelineTest.java b/core/src/test/java/google/registry/beam/invoicing/InvoicingPipelineTest.java index 456a2dee9..e6e62b905 100644 --- a/core/src/test/java/google/registry/beam/invoicing/InvoicingPipelineTest.java +++ b/core/src/test/java/google/registry/beam/invoicing/InvoicingPipelineTest.java @@ -16,26 +16,21 @@ package google.registry.beam.invoicing; import static com.google.common.truth.Truth.assertThat; -import com.google.auth.oauth2.GoogleCredentials; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import google.registry.beam.TestPipelineExtension; -import google.registry.util.GoogleCredentialsBundle; +import google.registry.testing.TestDataHelper; import google.registry.util.ResourceUtils; import java.io.File; -import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.time.ZoneId; import java.time.ZonedDateTime; import java.util.Map.Entry; -import org.apache.beam.runners.direct.DirectRunner; -import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.PCollection; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -44,160 +39,168 @@ import org.junit.jupiter.api.io.TempDir; /** Unit tests for {@link InvoicingPipeline}. */ class InvoicingPipelineTest { - private static PipelineOptions pipelineOptions; + private static final String BILLING_BUCKET_URL = "billing_bucket"; + private static final String YEAR_MONTH = "2017-10"; + private static final String INVOICE_FILE_PREFIX = "REG-INV"; - @BeforeAll - static void beforeAll() { - pipelineOptions = PipelineOptionsFactory.create(); - pipelineOptions.setRunner(DirectRunner.class); - } + private static final ImmutableList INPUT_EVENTS = + ImmutableList.of( + BillingEvent.create( + 1, + ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")), + ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")), + "theRegistrar", + "234", + "", + "test", + "RENEW", + "mydomain.test", + "REPO-ID", + 3, + "USD", + 20.5, + ""), + BillingEvent.create( + 1, + ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")), + ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")), + "theRegistrar", + "234", + "", + "test", + "RENEW", + "mydomain2.test", + "REPO-ID", + 3, + "USD", + 20.5, + ""), + BillingEvent.create( + 1, + ZonedDateTime.of(2017, 10, 2, 0, 0, 0, 0, ZoneId.of("UTC")), + ZonedDateTime.of(2017, 9, 29, 0, 0, 0, 0, ZoneId.of("UTC")), + "theRegistrar", + "234", + "", + "hello", + "CREATE", + "mydomain3.hello", + "REPO-ID", + 5, + "JPY", + 70.75, + ""), + BillingEvent.create( + 1, + ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")), + ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")), + "bestdomains", + "456", + "116688", + "test", + "RENEW", + "mydomain4.test", + "REPO-ID", + 1, + "USD", + 20.5, + ""), + BillingEvent.create( + 1, + ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")), + ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")), + "anotherRegistrar", + "789", + "", + "test", + "CREATE", + "mydomain5.test", + "REPO-ID", + 1, + "USD", + 0, + "SUNRISE ANCHOR_TENANT")); + + private static final ImmutableMap> EXPECTED_DETAILED_REPORT_MAP = + ImmutableMap.of( + "invoice_details_2017-10_theRegistrar_test.csv", + ImmutableList.of( + "1,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,theRegistrar,234,," + + "test,RENEW,mydomain2.test,REPO-ID,3,USD,20.50,", + "1,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,theRegistrar,234,," + + "test,RENEW,mydomain.test,REPO-ID,3,USD,20.50,"), + "invoice_details_2017-10_theRegistrar_hello.csv", + ImmutableList.of( + "1,2017-10-02 00:00:00 UTC,2017-09-29 00:00:00 UTC,theRegistrar,234,," + + "hello,CREATE,mydomain3.hello,REPO-ID,5,JPY,70.75,"), + "invoice_details_2017-10_bestdomains_test.csv", + ImmutableList.of( + "1,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,bestdomains,456,116688," + + "test,RENEW,mydomain4.test,REPO-ID,1,USD,20.50,"), + "invoice_details_2017-10_anotherRegistrar_test.csv", + ImmutableList.of( + "1,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,anotherRegistrar,789,," + + "test,CREATE,mydomain5.test,REPO-ID,1,USD,0.00,SUNRISE ANCHOR_TENANT")); + + private static final ImmutableList EXPECTED_INVOICE_OUTPUT = + ImmutableList.of( + "2017-10-01,2020-09-30,234,41.00,USD,10125,1,PURCHASE,theRegistrar - test,2," + + "RENEW | TLD: test | TERM: 3-year,20.50,USD,", + "2017-10-01,2022-09-30,234,70.75,JPY,10125,1,PURCHASE,theRegistrar - hello,1," + + "CREATE | TLD: hello | TERM: 5-year,70.75,JPY,", + "2017-10-01,2018-09-30,456,20.50,USD,10125,1,PURCHASE,bestdomains - test,1," + + "RENEW | TLD: test | TERM: 1-year,20.50,USD,116688"); @RegisterExtension - final transient TestPipelineExtension testPipeline = - TestPipelineExtension.fromOptions(pipelineOptions); + final TestPipelineExtension pipeline = + TestPipelineExtension.create().enableAbandonedNodeEnforcement(true); - @SuppressWarnings("WeakerAccess") - @TempDir - transient Path tmpDir; + @TempDir Path tmpDir; - private InvoicingPipeline invoicingPipeline; + private final InvoicingPipelineOptions options = + PipelineOptionsFactory.create().as(InvoicingPipelineOptions.class); + + private File billingBucketUrl; + private PCollection billingEvents; @BeforeEach - void beforeEach() throws IOException { - String beamTempFolder = - Files.createDirectory(tmpDir.resolve("beam_temp")).toAbsolutePath().toString(); - invoicingPipeline = - new InvoicingPipeline( - "test-project", - "region", - beamTempFolder, - beamTempFolder + "/templates/invoicing", - beamTempFolder + "/staging", - tmpDir.toAbsolutePath().toString(), - "REG-INV", - GoogleCredentialsBundle.create(GoogleCredentials.create(null))); - } - - private ImmutableList getInputEvents() { - return ImmutableList.of( - BillingEvent.create( - 1, - ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")), - ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")), - "theRegistrar", - "234", - "", - "test", - "RENEW", - "mydomain.test", - "REPO-ID", - 3, - "USD", - 20.5, - ""), - BillingEvent.create( - 1, - ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")), - ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")), - "theRegistrar", - "234", - "", - "test", - "RENEW", - "mydomain2.test", - "REPO-ID", - 3, - "USD", - 20.5, - ""), - BillingEvent.create( - 1, - ZonedDateTime.of(2017, 10, 2, 0, 0, 0, 0, ZoneId.of("UTC")), - ZonedDateTime.of(2017, 9, 29, 0, 0, 0, 0, ZoneId.of("UTC")), - "theRegistrar", - "234", - "", - "hello", - "CREATE", - "mydomain3.hello", - "REPO-ID", - 5, - "JPY", - 70.75, - ""), - BillingEvent.create( - 1, - ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")), - ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")), - "bestdomains", - "456", - "116688", - "test", - "RENEW", - "mydomain4.test", - "REPO-ID", - 1, - "USD", - 20.5, - ""), - BillingEvent.create( - 1, - ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")), - ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")), - "anotherRegistrar", - "789", - "", - "test", - "CREATE", - "mydomain5.test", - "REPO-ID", - 1, - "USD", - 0, - "SUNRISE ANCHOR_TENANT")); - } - - /** Returns a map from filename to expected contents for detail reports. */ - private ImmutableMap> getExpectedDetailReportMap() { - return ImmutableMap.of( - "invoice_details_2017-10_theRegistrar_test.csv", - ImmutableList.of( - "1,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,theRegistrar,234,," - + "test,RENEW,mydomain2.test,REPO-ID,3,USD,20.50,", - "1,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,theRegistrar,234,," - + "test,RENEW,mydomain.test,REPO-ID,3,USD,20.50,"), - "invoice_details_2017-10_theRegistrar_hello.csv", - ImmutableList.of( - "1,2017-10-02 00:00:00 UTC,2017-09-29 00:00:00 UTC,theRegistrar,234,," - + "hello,CREATE,mydomain3.hello,REPO-ID,5,JPY,70.75,"), - "invoice_details_2017-10_bestdomains_test.csv", - ImmutableList.of( - "1,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,bestdomains,456,116688," - + "test,RENEW,mydomain4.test,REPO-ID,1,USD,20.50,"), - "invoice_details_2017-10_anotherRegistrar_test.csv", - ImmutableList.of( - "1,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,anotherRegistrar,789,," - + "test,CREATE,mydomain5.test,REPO-ID,1,USD,0.00,SUNRISE ANCHOR_TENANT")); - } - - private ImmutableList getExpectedInvoiceOutput() { - return ImmutableList.of( - "2017-10-01,2020-09-30,234,41.00,USD,10125,1,PURCHASE,theRegistrar - test,2," - + "RENEW | TLD: test | TERM: 3-year,20.50,USD,", - "2017-10-01,2022-09-30,234,70.75,JPY,10125,1,PURCHASE,theRegistrar - hello,1," - + "CREATE | TLD: hello | TERM: 5-year,70.75,JPY,", - "2017-10-01,2018-09-30,456,20.50,USD,10125,1,PURCHASE,bestdomains - test,1," - + "RENEW | TLD: test | TERM: 1-year,20.50,USD,116688"); + void beforeEach() throws Exception { + billingBucketUrl = Files.createDirectory(tmpDir.resolve(BILLING_BUCKET_URL)).toFile(); + options.setBillingBucketUrl(billingBucketUrl.getAbsolutePath()); + options.setYearMonth(YEAR_MONTH); + options.setInvoiceFilePrefix(INVOICE_FILE_PREFIX); + billingEvents = + pipeline.apply(Create.of(INPUT_EVENTS).withCoder(SerializableCoder.of(BillingEvent.class))); } @Test - void testEndToEndPipeline_generatesExpectedFiles() throws Exception { - ImmutableList inputRows = getInputEvents(); - PCollection input = testPipeline.apply(Create.of(inputRows)); - invoicingPipeline.applyTerminalTransforms(input, StaticValueProvider.of("2017-10")); - testPipeline.run(); + void testSuccess_makeQuery() { + String query = InvoicingPipeline.makeQuery("2017-10", "my-project-id"); + assertThat(query) + .isEqualTo(TestDataHelper.loadFile(this.getClass(), "billing_events_test.sql")); + // This is necessary because the TestPipelineExtension verifies that the pipelien is run. + pipeline.run(); + } - for (Entry> entry : getExpectedDetailReportMap().entrySet()) { + @Test + void testSuccess_saveInvoiceCsv() throws Exception { + InvoicingPipeline.saveInvoiceCsv(billingEvents, options); + pipeline.run().waitUntilFinish(); + ImmutableList overallInvoice = resultFileContents("REG-INV-2017-10.csv"); + assertThat(overallInvoice.get(0)) + .isEqualTo( + "StartDate,EndDate,ProductAccountKey,Amount,AmountCurrency,BillingProductCode," + + "SalesChannel,LineItemType,UsageGroupingKey,Quantity,Description,UnitPrice," + + "UnitPriceCurrency,PONumber"); + assertThat(overallInvoice.subList(1, overallInvoice.size())) + .containsExactlyElementsIn(EXPECTED_INVOICE_OUTPUT); + } + + @Test + void testSuccess_saveDetailedCsv() throws Exception { + InvoicingPipeline.saveDetailedCsv(billingEvents, options); + pipeline.run().waitUntilFinish(); + for (Entry> entry : EXPECTED_DETAILED_REPORT_MAP.entrySet()) { ImmutableList detailReport = resultFileContents(entry.getKey()); assertThat(detailReport.get(0)) .isEqualTo( @@ -206,22 +209,14 @@ class InvoicingPipelineTest { assertThat(detailReport.subList(1, detailReport.size())) .containsExactlyElementsIn(entry.getValue()); } - - ImmutableList overallInvoice = resultFileContents("REG-INV-2017-10.csv"); - assertThat(overallInvoice.get(0)) - .isEqualTo( - "StartDate,EndDate,ProductAccountKey,Amount,AmountCurrency,BillingProductCode," - + "SalesChannel,LineItemType,UsageGroupingKey,Quantity,Description,UnitPrice," - + "UnitPriceCurrency,PONumber"); - assertThat(overallInvoice.subList(1, overallInvoice.size())) - .containsExactlyElementsIn(getExpectedInvoiceOutput()); } /** Returns the text contents of a file under the beamBucket/results directory. */ private ImmutableList resultFileContents(String filename) throws Exception { File resultFile = new File( - String.format("%s/invoices/2017-10/%s", tmpDir.toAbsolutePath().toString(), filename)); + String.format( + "%s/invoices/2017-10/%s", billingBucketUrl.getAbsolutePath().toString(), filename)); return ImmutableList.copyOf( ResourceUtils.readResourceUtf8(resultFile.toURI().toURL()).split("\n")); } diff --git a/core/src/test/java/google/registry/beam/invoicing/InvoicingUtilsTest.java b/core/src/test/java/google/registry/beam/invoicing/InvoicingUtilsTest.java deleted file mode 100644 index 5902d4cc1..000000000 --- a/core/src/test/java/google/registry/beam/invoicing/InvoicingUtilsTest.java +++ /dev/null @@ -1,73 +0,0 @@ -// Copyright 2018 The Nomulus Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package google.registry.beam.invoicing; - -import static com.google.common.truth.Truth.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import google.registry.testing.TestDataHelper; -import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params; -import org.apache.beam.sdk.io.FileBasedSink; -import org.apache.beam.sdk.options.ValueProvider; -import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.junit.jupiter.api.Test; - -/** Unit tests for {@link InvoicingUtils}. */ -class InvoicingUtilsTest { - - @Test - void testDestinationFunction_generatesProperFileParams() { - SerializableFunction destinationFunction = - InvoicingUtils.makeDestinationFunction("my/directory", StaticValueProvider.of("2017-10")); - - BillingEvent billingEvent = mock(BillingEvent.class); - // We mock BillingEvent to make the test independent of the implementation of toFilename() - when(billingEvent.toFilename(any())).thenReturn("invoice_details_2017-10_registrar_tld"); - - assertThat(destinationFunction.apply(billingEvent)) - .isEqualTo( - new Params() - .withShardTemplate("") - .withSuffix(".csv") - .withBaseFilename( - FileBasedSink.convertToFileResourceIfPossible( - "my/directory/2017-10/invoice_details_2017-10_registrar_tld"))); - } - - @Test - void testEmptyDestinationParams() { - assertThat(InvoicingUtils.makeEmptyDestinationParams("my/directory")) - .isEqualTo( - new Params() - .withBaseFilename( - FileBasedSink.convertToFileResourceIfPossible("my/directory/FAILURES"))); - } - - /** Asserts that the instantiated sql template matches a golden expected file. */ - @Test - void testMakeQueryProvider() { - ValueProvider queryProvider = - InvoicingUtils.makeQueryProvider(StaticValueProvider.of("2017-10"), "my-project-id"); - assertThat(queryProvider.get()).isEqualTo(loadFile("billing_events_test.sql")); - } - - /** Returns a {@link String} from a file in the {@code billing/testdata/} directory. */ - private static String loadFile(String filename) { - return TestDataHelper.loadFile(InvoicingUtilsTest.class, filename); - } -} diff --git a/core/src/test/java/google/registry/reporting/billing/GenerateInvoicesActionTest.java b/core/src/test/java/google/registry/reporting/billing/GenerateInvoicesActionTest.java index c5813f833..a9fcb6109 100644 --- a/core/src/test/java/google/registry/reporting/billing/GenerateInvoicesActionTest.java +++ b/core/src/test/java/google/registry/reporting/billing/GenerateInvoicesActionTest.java @@ -17,94 +17,57 @@ package google.registry.reporting.billing; import static com.google.common.truth.Truth.assertThat; import static google.registry.testing.TaskQueueHelper.assertNoTasksEnqueued; import static google.registry.testing.TaskQueueHelper.assertTasksEnqueued; -import static org.mockito.ArgumentMatchers.any; +import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR; +import static javax.servlet.http.HttpServletResponse.SC_OK; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import com.google.api.services.dataflow.Dataflow; -import com.google.api.services.dataflow.Dataflow.Projects; -import com.google.api.services.dataflow.Dataflow.Projects.Templates; -import com.google.api.services.dataflow.Dataflow.Projects.Templates.Launch; -import com.google.api.services.dataflow.model.Job; -import com.google.api.services.dataflow.model.LaunchTemplateParameters; -import com.google.api.services.dataflow.model.LaunchTemplateResponse; -import com.google.api.services.dataflow.model.RuntimeEnvironment; -import com.google.common.collect.ImmutableMap; +import com.google.common.net.MediaType; +import google.registry.beam.BeamActionTestBase; import google.registry.testing.AppEngineExtension; -import google.registry.testing.FakeResponse; +import google.registry.testing.FakeClock; import google.registry.testing.TaskQueueHelper.TaskMatcher; import java.io.IOException; import org.joda.time.YearMonth; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; /** Unit tests for {@link google.registry.reporting.billing.GenerateInvoicesAction}. */ -class GenerateInvoicesActionTest { +class GenerateInvoicesActionTest extends BeamActionTestBase { @RegisterExtension final AppEngineExtension appEngine = AppEngineExtension.builder().withTaskQueue().build(); - private Dataflow dataflow; - private Projects projects; - private Templates templates; - private Launch launch; - private FakeResponse response; - private BillingEmailUtils emailUtils; + private final BillingEmailUtils emailUtils = mock(BillingEmailUtils.class); + private FakeClock clock = new FakeClock(); private GenerateInvoicesAction action; - @BeforeEach - void beforeEach() throws IOException { - dataflow = mock(Dataflow.class); - projects = mock(Projects.class); - templates = mock(Templates.class); - launch = mock(Launch.class); - emailUtils = mock(BillingEmailUtils.class); - when(dataflow.projects()).thenReturn(projects); - when(projects.templates()).thenReturn(templates); - when(templates.launch(any(String.class), any(LaunchTemplateParameters.class))) - .thenReturn(launch); - when(launch.setGcsPath(any(String.class))).thenReturn(launch); - - response = new FakeResponse(); - Job job = new Job(); - job.setId("12345"); - when(launch.execute()).thenReturn(new LaunchTemplateResponse().setJob(job)); - } - @Test void testLaunchTemplateJob_withPublish() throws Exception { action = new GenerateInvoicesAction( "test-project", - "gs://test-project-beam", - "gs://test-project-beam/templates/invoicing", - "us-east1-c", + "test-region", + "staging_bucket", + "billing_bucket", + "REG-INV", true, new YearMonth(2017, 10), - dataflow, + emailUtils, + clock, response, - emailUtils); + dataflow); action.run(); - LaunchTemplateParameters expectedParams = - new LaunchTemplateParameters() - .setJobName("invoicing-2017-10") - .setEnvironment( - new RuntimeEnvironment() - .setZone("us-east1-c") - .setTempLocation("gs://test-project-beam/temporary")) - .setParameters(ImmutableMap.of("yearMonth", "2017-10")); - verify(templates).launch("test-project", expectedParams); - verify(launch).setGcsPath("gs://test-project-beam/templates/invoicing"); - assertThat(response.getStatus()).isEqualTo(200); + assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8); + assertThat(response.getStatus()).isEqualTo(SC_OK); assertThat(response.getPayload()).isEqualTo("Launched dataflow template."); TaskMatcher matcher = new TaskMatcher() .url("/_dr/task/publishInvoices") .method("POST") - .param("jobId", "12345") + .param("jobId", "jobid") .param("yearMonth", "2017-10"); assertTasksEnqueued("beam-reporting", matcher); } @@ -114,47 +77,43 @@ class GenerateInvoicesActionTest { action = new GenerateInvoicesAction( "test-project", - "gs://test-project-beam", - "gs://test-project-beam/templates/invoicing", - "us-east1-c", + "test-region", + "staging_bucket", + "billing_bucket", + "REG-INV", false, new YearMonth(2017, 10), - dataflow, + emailUtils, + clock, response, - emailUtils); + dataflow); action.run(); - LaunchTemplateParameters expectedParams = - new LaunchTemplateParameters() - .setJobName("invoicing-2017-10") - .setEnvironment( - new RuntimeEnvironment() - .setZone("us-east1-c") - .setTempLocation("gs://test-project-beam/temporary")) - .setParameters(ImmutableMap.of("yearMonth", "2017-10")); - verify(templates).launch("test-project", expectedParams); - verify(launch).setGcsPath("gs://test-project-beam/templates/invoicing"); - assertThat(response.getStatus()).isEqualTo(200); + assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8); + assertThat(response.getStatus()).isEqualTo(SC_OK); assertThat(response.getPayload()).isEqualTo("Launched dataflow template."); - assertNoTasksEnqueued(); + assertNoTasksEnqueued("beam-reporting"); } @Test void testCaughtIOException() throws IOException { - when(launch.execute()).thenThrow(new IOException("expected")); + when(launch.execute()).thenThrow(new IOException("Pipeline error")); action = new GenerateInvoicesAction( "test-project", - "gs://test-project-beam", - "gs://test-project-beam/templates/invoicing", - "us-east1-c", - true, + "test-region", + "staging_bucket", + "billing_bucket", + "REG-INV", + false, new YearMonth(2017, 10), - dataflow, + emailUtils, + clock, response, - emailUtils); + dataflow); action.run(); - assertThat(response.getStatus()).isEqualTo(500); - assertThat(response.getPayload()).isEqualTo("Template launch failed: expected"); - verify(emailUtils).sendAlertEmail("Template Launch failed due to expected"); + assertThat(response.getStatus()).isEqualTo(SC_INTERNAL_SERVER_ERROR); + assertThat(response.getPayload()).isEqualTo("Template launch failed: Pipeline error"); + verify(emailUtils).sendAlertEmail("Template Launch failed due to Pipeline error"); + assertNoTasksEnqueued("beam-reporting"); } } diff --git a/core/src/test/java/google/registry/reporting/billing/PublishInvoicesActionTest.java b/core/src/test/java/google/registry/reporting/billing/PublishInvoicesActionTest.java index 701a8fb69..b8c38e922 100644 --- a/core/src/test/java/google/registry/reporting/billing/PublishInvoicesActionTest.java +++ b/core/src/test/java/google/registry/reporting/billing/PublishInvoicesActionTest.java @@ -26,8 +26,9 @@ import static org.mockito.Mockito.when; import com.google.api.services.dataflow.Dataflow; import com.google.api.services.dataflow.Dataflow.Projects; -import com.google.api.services.dataflow.Dataflow.Projects.Jobs; -import com.google.api.services.dataflow.Dataflow.Projects.Jobs.Get; +import com.google.api.services.dataflow.Dataflow.Projects.Locations; +import com.google.api.services.dataflow.Dataflow.Projects.Locations.Jobs; +import com.google.api.services.dataflow.Dataflow.Projects.Locations.Jobs.Get; import com.google.api.services.dataflow.model.Job; import com.google.common.net.MediaType; import google.registry.testing.AppEngineExtension; @@ -42,11 +43,14 @@ import org.junit.jupiter.api.extension.RegisterExtension; /** Unit tests for {@link PublishInvoicesAction}. */ class PublishInvoicesActionTest { - private Get get; - private BillingEmailUtils emailUtils; - - private Job expectedJob; - private FakeResponse response; + private final Dataflow dataflow = mock(Dataflow.class); + private final Projects projects = mock(Projects.class); + private final Locations locations = mock(Locations.class); + private final Jobs jobs = mock(Jobs.class); + private final Get get = mock(Get.class); + private final Job expectedJob = new Job(); + private final BillingEmailUtils emailUtils = mock(BillingEmailUtils.class); + private final FakeResponse response = new FakeResponse(); private PublishInvoicesAction uploadAction; @RegisterExtension @@ -54,20 +58,21 @@ class PublishInvoicesActionTest { @BeforeEach void beforeEach() throws IOException { - Dataflow dataflow = mock(Dataflow.class); - Projects projects = mock(Projects.class); - Jobs jobs = mock(Jobs.class); - get = mock(Get.class); when(dataflow.projects()).thenReturn(projects); - when(projects.jobs()).thenReturn(jobs); - when(jobs.get("test-project", "12345")).thenReturn(get); - expectedJob = new Job(); + when(projects.locations()).thenReturn(locations); + when(locations.jobs()).thenReturn(jobs); + when(jobs.get("test-project", "test-region", "12345")).thenReturn(get); when(get.execute()).thenReturn(expectedJob); - emailUtils = mock(BillingEmailUtils.class); - response = new FakeResponse(); + uploadAction = new PublishInvoicesAction( - "test-project", "12345", emailUtils, dataflow, response, new YearMonth(2017, 10)); + "test-project", + "test-region", + "12345", + emailUtils, + dataflow, + response, + new YearMonth(2017, 10)); } @Test diff --git a/core/src/test/java/google/registry/reporting/spec11/GenerateSpec11ReportActionTest.java b/core/src/test/java/google/registry/reporting/spec11/GenerateSpec11ReportActionTest.java index 06760fbac..c4b6ec6d0 100644 --- a/core/src/test/java/google/registry/reporting/spec11/GenerateSpec11ReportActionTest.java +++ b/core/src/test/java/google/registry/reporting/spec11/GenerateSpec11ReportActionTest.java @@ -19,61 +19,27 @@ import static google.registry.testing.TaskQueueHelper.assertNoTasksEnqueued; import static google.registry.testing.TaskQueueHelper.assertTasksEnqueued; import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR; import static org.apache.http.HttpStatus.SC_OK; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import com.google.api.services.dataflow.Dataflow; -import com.google.api.services.dataflow.Dataflow.Projects; -import com.google.api.services.dataflow.Dataflow.Projects.Locations; -import com.google.api.services.dataflow.Dataflow.Projects.Locations.FlexTemplates; -import com.google.api.services.dataflow.Dataflow.Projects.Locations.FlexTemplates.Launch; -import com.google.api.services.dataflow.model.Job; -import com.google.api.services.dataflow.model.LaunchFlexTemplateRequest; -import com.google.api.services.dataflow.model.LaunchFlexTemplateResponse; import com.google.common.net.MediaType; +import google.registry.beam.BeamActionTestBase; import google.registry.testing.AppEngineExtension; import google.registry.testing.FakeClock; -import google.registry.testing.FakeResponse; import google.registry.testing.TaskQueueHelper.TaskMatcher; import java.io.IOException; import org.joda.time.DateTime; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import org.mockito.junit.jupiter.MockitoSettings; -import org.mockito.quality.Strictness; /** Unit tests for {@link GenerateSpec11ReportAction}. */ -@MockitoSettings(strictness = Strictness.STRICT_STUBS) -class GenerateSpec11ReportActionTest { +class GenerateSpec11ReportActionTest extends BeamActionTestBase { @RegisterExtension final AppEngineExtension appEngine = AppEngineExtension.builder().withTaskQueue().build(); - private FakeResponse response = new FakeResponse(); - private Dataflow dataflow = mock(Dataflow.class); - private Projects projects = mock(Projects.class); - private Locations locations = mock(Locations.class); - private FlexTemplates templates = mock(FlexTemplates.class); - private Launch launch = mock(Launch.class); - private LaunchFlexTemplateResponse launchResponse = - new LaunchFlexTemplateResponse().setJob(new Job().setId("jobid")); - private final FakeClock clock = new FakeClock(DateTime.parse("2018-06-11T12:23:56Z")); private GenerateSpec11ReportAction action; - @BeforeEach - void beforeEach() throws IOException { - when(dataflow.projects()).thenReturn(projects); - when(projects.locations()).thenReturn(locations); - when(locations.flexTemplates()).thenReturn(templates); - when(templates.launch(anyString(), anyString(), any(LaunchFlexTemplateRequest.class))) - .thenReturn(launch); - when(launch.execute()).thenReturn(launchResponse); - } - @Test void testFailure_dataflowFailure() throws IOException { action = diff --git a/release/cloudbuild-deploy.yaml b/release/cloudbuild-deploy.yaml index e5cc57634..5a7c50ac5 100644 --- a/release/cloudbuild-deploy.yaml +++ b/release/cloudbuild-deploy.yaml @@ -37,14 +37,6 @@ steps: cat tool-credential.json.enc | base64 -d | gcloud kms decrypt \ --ciphertext-file=- --plaintext-file=tool-credential.json \ --location=global --keyring=nomulus-tool-keyring --key=nomulus-tool-key -# Deploy the invoicing pipeline to GCS. -- name: 'gcr.io/$PROJECT_ID/nomulus-tool:latest' - args: - - -e - - ${_ENV} - - --credential - - tool-credential.json - - deploy_invoicing_pipeline # Deploy the GAE config files. # First authorize the gcloud tool to use the credential json file, then # download and unzip the tarball that contains the relevant config files diff --git a/release/cloudbuild-nomulus.yaml b/release/cloudbuild-nomulus.yaml index 6ab672023..e37d6aee0 100644 --- a/release/cloudbuild-nomulus.yaml +++ b/release/cloudbuild-nomulus.yaml @@ -92,6 +92,8 @@ steps: google/registry/beam/bulk_delete_datastore_pipeline_metadata.json \ google.registry.beam.spec11.Spec11Pipeline \ google/registry/beam/spec11_pipeline_metadata.json + google.registry.beam.invoicing.InvoicingPipeline \ + google/registry/beam/invoicing_pipeline_metadata.json # Tentatively build and publish Cloud SQL schema jar here, before schema release # process is finalized. Also publish nomulus:core jars that are needed for # server/schema compatibility tests.