diff --git a/core/build.gradle b/core/build.gradle index 5d8503fb5..594221965 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -810,6 +810,10 @@ if (environment in ['alpha', 'crash']) { mainClass: 'google.registry.beam.spec11.Spec11Pipeline', metaData: 'google/registry/beam/spec11_pipeline_metadata.json' ], + [ + mainClass: 'google.registry.beam.invoicing.InvoicingPipeline', + metaData: 'google/registry/beam/invoicing_pipeline_metadata.json' + ], ] project.tasks.create("stage_beam_pipelines") { doLast { diff --git a/core/src/main/java/google/registry/beam/invoicing/BillingEvent.java b/core/src/main/java/google/registry/beam/invoicing/BillingEvent.java index 4de572c3f..6aa33dc18 100644 --- a/core/src/main/java/google/registry/beam/invoicing/BillingEvent.java +++ b/core/src/main/java/google/registry/beam/invoicing/BillingEvent.java @@ -260,6 +260,11 @@ public abstract class BillingEvent implements Serializable { poNumber()); } + /** Returns the grouping key for this {@code BillingEvent}, to generate the detailed report. */ + String getDetailedReportGroupingKey() { + return String.format("%s_%s", registrarId(), tld()); + } + /** Key for each {@code BillingEvent}, when aggregating for the overall invoice. */ @AutoValue abstract static class InvoiceGroupingKey implements Serializable { diff --git a/core/src/main/java/google/registry/beam/invoicing/InvoicingPipeline.java b/core/src/main/java/google/registry/beam/invoicing/InvoicingPipeline.java index 0dce0f6d5..f9ae7d294 100644 --- a/core/src/main/java/google/registry/beam/invoicing/InvoicingPipeline.java +++ b/core/src/main/java/google/registry/beam/invoicing/InvoicingPipeline.java @@ -14,28 +14,28 @@ package google.registry.beam.invoicing; -import com.google.auth.oauth2.GoogleCredentials; +import static google.registry.beam.BeamUtils.getQueryFromFile; +import static org.apache.beam.sdk.values.TypeDescriptors.strings; + +import com.google.common.annotations.VisibleForTesting; import google.registry.beam.invoicing.BillingEvent.InvoiceGroupingKey; import google.registry.beam.invoicing.BillingEvent.InvoiceGroupingKey.InvoiceGroupingKeyCoder; -import google.registry.config.CredentialModule.LocalCredential; -import google.registry.config.RegistryConfig.Config; import google.registry.reporting.billing.BillingModule; -import google.registry.reporting.billing.GenerateInvoicesAction; -import google.registry.util.GoogleCredentialsBundle; +import google.registry.util.SqlTemplate; import java.io.Serializable; -import javax.inject.Inject; -import org.apache.beam.runners.dataflow.DataflowRunner; -import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.YearMonth; +import java.time.format.DateTimeFormatter; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.SerializableCoder; -import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params; -import org.apache.beam.sdk.io.FileBasedSink; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.FileIO; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; -import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.options.ValueProvider; -import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; +import org.apache.beam.sdk.transforms.Contextful; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Filter; import org.apache.beam.sdk.transforms.MapElements; @@ -43,107 +43,54 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TypeDescriptor; -import org.apache.beam.sdk.values.TypeDescriptors; /** - * Definition of a Dataflow pipeline template, which generates a given month's invoices. + * Definition of a Dataflow Flex pipeline template, which generates a given month's invoices. * - *
To stage this template on GCS, run the {@link - * google.registry.tools.DeployInvoicingPipelineCommand} Nomulus command. + *
To stage this template locally, run the {@code stage_beam_pipeline.sh} shell script. * *
Then, you can run the staged template via the API client library, gCloud or a raw REST call.
- * For an example using the API client library, see {@link GenerateInvoicesAction}.
*
- * @see Dataflow Templates
+ * @see Using
+ * Flex Templates
*/
public class InvoicingPipeline implements Serializable {
- private final String projectId;
- private final String beamJobRegion;
- private final String beamBucketUrl;
- private final String invoiceTemplateUrl;
- private final String beamStagingUrl;
- private final String billingBucketUrl;
- private final String invoiceFilePrefix;
- private final GoogleCredentials googleCredentials;
+ private static final DateTimeFormatter TIMESTAMP_FORMATTER =
+ DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
- @Inject
- public InvoicingPipeline(
- @Config("projectId") String projectId,
- @Config("defaultJobRegion") String beamJobRegion,
- @Config("apacheBeamBucketUrl") String beamBucketUrl,
- @Config("invoiceTemplateUrl") String invoiceTemplateUrl,
- @Config("beamStagingUrl") String beamStagingUrl,
- @Config("billingBucketUrl") String billingBucketUrl,
- @Config("invoiceFilePrefix") String invoiceFilePrefix,
- @LocalCredential GoogleCredentialsBundle googleCredentialsBundle) {
- this.projectId = projectId;
- this.beamJobRegion = beamJobRegion;
- this.beamBucketUrl = beamBucketUrl;
- this.invoiceTemplateUrl = invoiceTemplateUrl;
- this.beamStagingUrl = beamStagingUrl;
- this.billingBucketUrl = billingBucketUrl;
- this.invoiceFilePrefix = invoiceFilePrefix;
- this.googleCredentials = googleCredentialsBundle.getGoogleCredentials();
+ private final InvoicingPipelineOptions options;
+ private final Pipeline pipeline;
+
+ @VisibleForTesting
+ InvoicingPipeline(InvoicingPipelineOptions options, Pipeline pipeline) {
+ this.options = options;
+ this.pipeline = pipeline;
}
- /** Custom options for running the invoicing pipeline. */
- public interface InvoicingPipelineOptions extends DataflowPipelineOptions {
- /** Returns the yearMonth we're generating invoices for, in yyyy-MM format. */
- @Description("The yearMonth we generate invoices for, in yyyy-MM format.")
- ValueProvider This is implicitly set when executing the Dataflow template, by specifying the 'yearMonth
- * parameter.
- */
- void setYearMonth(ValueProvider This is factored out purely to facilitate testing.
- */
- void applyTerminalTransforms(
- PCollection Beam uses this to determine which file a given {@code BillingEvent} should get placed into.
- *
- * @param outputBucket the GCS bucket we're outputting reports to
- * @param yearMonthProvider a runtime provider for the yyyy-MM we're generating the invoice for
- */
- static SerializableFunction The "failed" file should only be populated when an error occurs, which warrants further
- * investigation.
- */
- static Params makeEmptyDestinationParams(String outputBucket) {
- return new Params()
- .withBaseFilename(
- FileBasedSink.convertToFileResourceIfPossible(
- String.format("%s/%s", outputBucket, "FAILURES")));
- }
-
- /**
- * Returns a provider that creates a Bigquery query for a given project and yearMonth at runtime.
- *
- * We only know yearMonth at runtime, so this provider fills in the {@code
- * sql/billing_events.sql} template at runtime.
- *
- * @param yearMonthProvider a runtime provider that returns which month we're invoicing for.
- * @param projectId the projectId we're generating invoicing for.
- */
- static ValueProvider Warning: This number may increase but never decrease.
- *
- * @see google.registry.model.index.EppResourceIndex
- */
- @Provides
- @Config("eppResourceIndexBucketCount")
- public static int provideEppResourceIndexBucketCount(RegistryConfigSettings config) {
- return config.datastore.eppResourceIndexBucketsNum;
- }
-
@Provides
@Config("cloudSqlJdbcUrl")
public static String providesCloudSqlJdbcUrl(RegistryConfigSettings config) {
@@ -564,53 +551,6 @@ public final class RegistryConfig {
return config.gSuite.outgoingEmailDisplayName;
}
- /**
- * Returns the name of the GCS bucket for storing Beam templates and results.
- *
- * @see google.registry.reporting.billing.GenerateInvoicesAction
- */
- @Provides
- @Config("apacheBeamBucket")
- public static String provideApacheBeamBucket(@Config("projectId") String projectId) {
- return projectId + "-beam";
- }
-
- /**
- * Returns the URL of the GCS location for storing Apache Beam related objects.
- *
- * @see google.registry.reporting.billing.GenerateInvoicesAction
- */
- @Provides
- @Config("apacheBeamBucketUrl")
- public static String provideApacheBeamBucketUrl(@Config("apacheBeamBucket") String beamBucket) {
- return "gs://" + beamBucket;
- }
-
- /**
- * Returns the URL of the GCS location for storing the monthly invoicing Beam template.
- *
- * @see google.registry.reporting.billing.GenerateInvoicesAction
- * @see google.registry.beam.invoicing.InvoicingPipeline
- */
- @Provides
- @Config("invoiceTemplateUrl")
- public static String provideInvoiceTemplateUrl(
- @Config("apacheBeamBucketUrl") String beamBucketUrl) {
- return beamBucketUrl + "/templates/invoicing";
- }
-
- /**
- * Returns the URL of the GCS location for storing the monthly spec11 Beam template.
- *
- * @see google.registry.beam.spec11.Spec11Pipeline
- */
- @Provides
- @Config("spec11TemplateUrl")
- public static String provideSpec11TemplateUrl(
- @Config("apacheBeamBucketUrl") String beamBucketUrl) {
- return beamBucketUrl + "/templates/spec11";
- }
-
/**
* Returns whether an SSL certificate hash is required to log in via EPP and run flows.
*
@@ -634,18 +574,6 @@ public final class RegistryConfig {
return config.beam.defaultJobRegion;
}
- /**
- * Returns the default job zone to run Apache Beam (Cloud Dataflow) jobs in.
- *
- * @see google.registry.reporting.billing.GenerateInvoicesAction
- * @see google.registry.reporting.spec11.GenerateSpec11ReportAction
- */
- @Provides
- @Config("defaultJobZone")
- public static String provideDefaultJobZone(RegistryConfigSettings config) {
- return config.beam.defaultJobZone;
- }
-
/** Returns the GCS bucket URL with all staged BEAM flex templates. */
@Provides
@Config("beamStagingBucketUrl")
@@ -653,19 +581,6 @@ public final class RegistryConfig {
return config.beam.stagingBucketUrl;
}
- /**
- * Returns the URL of the GCS location we store jar dependencies for beam pipelines.
- *
- * @see google.registry.beam.invoicing.InvoicingPipeline
- * @see google.registry.beam.spec11.Spec11Pipeline
- */
- @Provides
- @Config("beamStagingUrl")
- public static String provideInvoiceStagingUrl(
- @Config("apacheBeamBucketUrl") String beamBucketUrl) {
- return beamBucketUrl + "/staging";
- }
-
/**
* Returns the Google Cloud Storage bucket for Spec11 and ICANN transaction and activity reports
* to be uploaded.
@@ -1227,14 +1142,6 @@ public final class RegistryConfig {
return formatComments(config.registryPolicy.reservedTermsExportDisclaimer);
}
- /** Returns the clientId of the registrar used by the {@code CheckApiServlet}. */
- // TODO(b/80417678): remove this once CheckApiAction no longer uses this id.
- @Provides
- @Config("checkApiServletRegistrarClientId")
- public static String provideCheckApiServletRegistrarClientId(RegistryConfigSettings config) {
- return config.registryPolicy.checkApiServletClientId;
- }
-
/**
* Returns the clientId of the registrar that admins are automatically logged in as if they
* aren't otherwise associated with one.
diff --git a/core/src/main/java/google/registry/config/RegistryConfigSettings.java b/core/src/main/java/google/registry/config/RegistryConfigSettings.java
index d33383aec..e5dffb563 100644
--- a/core/src/main/java/google/registry/config/RegistryConfigSettings.java
+++ b/core/src/main/java/google/registry/config/RegistryConfigSettings.java
@@ -133,7 +133,6 @@ public class RegistryConfigSettings {
/** Configuration for Apache Beam (Cloud Dataflow). */
public static class Beam {
public String defaultJobRegion;
- public String defaultJobZone;
public String stagingBucketUrl;
}
diff --git a/core/src/main/java/google/registry/config/files/default-config.yaml b/core/src/main/java/google/registry/config/files/default-config.yaml
index 7bd04db53..44c07d690 100644
--- a/core/src/main/java/google/registry/config/files/default-config.yaml
+++ b/core/src/main/java/google/registry/config/files/default-config.yaml
@@ -420,9 +420,6 @@ misc:
beam:
# The default region to run Apache Beam (Cloud Dataflow) jobs in.
defaultJobRegion: us-east1
- # The default zone to run Apache Beam (Cloud Dataflow) jobs in.
- # TODO(weiminyu): consider dropping zone config. No obvious needs for this.
- defaultJobZone: us-east1-c
stagingBucketUrl: gcs-bucket-with-staged-templates
keyring:
diff --git a/core/src/main/java/google/registry/reporting/billing/GenerateInvoicesAction.java b/core/src/main/java/google/registry/reporting/billing/GenerateInvoicesAction.java
index 41866c35d..a6770ca2e 100644
--- a/core/src/main/java/google/registry/reporting/billing/GenerateInvoicesAction.java
+++ b/core/src/main/java/google/registry/reporting/billing/GenerateInvoicesAction.java
@@ -14,6 +14,7 @@
package google.registry.reporting.billing;
+import static google.registry.beam.BeamUtils.createJobName;
import static google.registry.reporting.ReportingUtils.enqueueBeamReportingTask;
import static google.registry.reporting.billing.BillingModule.PARAM_SHOULD_PUBLISH;
import static google.registry.request.Action.Method.POST;
@@ -21,9 +22,9 @@ import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
import static javax.servlet.http.HttpServletResponse.SC_OK;
import com.google.api.services.dataflow.Dataflow;
-import com.google.api.services.dataflow.model.LaunchTemplateParameters;
-import com.google.api.services.dataflow.model.LaunchTemplateResponse;
-import com.google.api.services.dataflow.model.RuntimeEnvironment;
+import com.google.api.services.dataflow.model.LaunchFlexTemplateParameter;
+import com.google.api.services.dataflow.model.LaunchFlexTemplateRequest;
+import com.google.api.services.dataflow.model.LaunchFlexTemplateResponse;
import com.google.common.collect.ImmutableMap;
import com.google.common.flogger.FluentLogger;
import com.google.common.net.MediaType;
@@ -33,6 +34,7 @@ import google.registry.request.Action;
import google.registry.request.Parameter;
import google.registry.request.Response;
import google.registry.request.auth.Auth;
+import google.registry.util.Clock;
import java.io.IOException;
import java.util.Map;
import javax.inject.Inject;
@@ -42,9 +44,8 @@ import org.joda.time.YearMonth;
* Invokes the {@code InvoicingPipeline} beam template via the REST api, and enqueues the {@link
* PublishInvoicesAction} to publish the subsequent output.
*
- * This action runs the {@link google.registry.beam.invoicing.InvoicingPipeline} beam template,
- * staged at gs://<projectId>-beam/templates/invoicing. The pipeline then generates invoices
- * for the month and stores them on GCS.
+ * This action runs the {@link google.registry.beam.invoicing.InvoicingPipeline} beam flex
+ * template. The pipeline then generates invoices for the month and stores them on GCS.
*/
@Action(
service = Action.Service.BACKEND,
@@ -56,57 +57,73 @@ public class GenerateInvoicesAction implements Runnable {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
static final String PATH = "/_dr/task/generateInvoices";
+ static final String PIPELINE_NAME = "invoicing_pipeline";
private final String projectId;
- private final String beamBucketUrl;
- private final String invoiceTemplateUrl;
- private final String jobZone;
+ private final String jobRegion;
+ private final String stagingBucketUrl;
+ private final String billingBucketUrl;
+ private final String invoiceFilePrefix;
private final boolean shouldPublish;
private final YearMonth yearMonth;
- private final Dataflow dataflow;
- private final Response response;
private final BillingEmailUtils emailUtils;
+ private final Clock clock;
+ private final Response response;
+ private final Dataflow dataflow;
@Inject
GenerateInvoicesAction(
@Config("projectId") String projectId,
- @Config("apacheBeamBucketUrl") String beamBucketUrl,
- @Config("invoiceTemplateUrl") String invoiceTemplateUrl,
- @Config("defaultJobZone") String jobZone,
+ @Config("defaultJobRegion") String jobRegion,
+ @Config("beamStagingBucketUrl") String stagingBucketUrl,
+ @Config("billingBucketUrl") String billingBucketUrl,
+ @Config("invoiceFilePrefix") String invoiceFilePrefix,
@Parameter(PARAM_SHOULD_PUBLISH) boolean shouldPublish,
YearMonth yearMonth,
- Dataflow dataflow,
+ BillingEmailUtils emailUtils,
+ Clock clock,
Response response,
- BillingEmailUtils emailUtils) {
+ Dataflow dataflow) {
this.projectId = projectId;
- this.beamBucketUrl = beamBucketUrl;
- this.invoiceTemplateUrl = invoiceTemplateUrl;
- this.jobZone = jobZone;
+ this.jobRegion = jobRegion;
+ this.stagingBucketUrl = stagingBucketUrl;
+ this.billingBucketUrl = billingBucketUrl;
+ this.invoiceFilePrefix = invoiceFilePrefix;
this.shouldPublish = shouldPublish;
this.yearMonth = yearMonth;
- this.dataflow = dataflow;
- this.response = response;
this.emailUtils = emailUtils;
+ this.clock = clock;
+ this.response = response;
+ this.dataflow = dataflow;
}
@Override
public void run() {
+ response.setContentType(MediaType.PLAIN_TEXT_UTF_8);
logger.atInfo().log("Launching invoicing pipeline for %s", yearMonth);
try {
- LaunchTemplateParameters params =
- new LaunchTemplateParameters()
- .setJobName(String.format("invoicing-%s", yearMonth))
- .setEnvironment(
- new RuntimeEnvironment()
- .setZone(jobZone)
- .setTempLocation(beamBucketUrl + "/temporary"))
- .setParameters(ImmutableMap.of("yearMonth", yearMonth.toString("yyyy-MM")));
- LaunchTemplateResponse launchResponse =
+ LaunchFlexTemplateParameter parameter =
+ new LaunchFlexTemplateParameter()
+ .setJobName(createJobName("invoicing", clock))
+ .setContainerSpecGcsPath(
+ String.format("%s/%s_metadata.json", stagingBucketUrl, PIPELINE_NAME))
+ .setParameters(
+ ImmutableMap.of(
+ "yearMonth",
+ yearMonth.toString("yyyy-MM"),
+ "invoiceFilePrefix",
+ invoiceFilePrefix,
+ "billingBucketUrl",
+ billingBucketUrl));
+ LaunchFlexTemplateResponse launchResponse =
dataflow
.projects()
- .templates()
- .launch(projectId, params)
- .setGcsPath(invoiceTemplateUrl)
+ .locations()
+ .flexTemplates()
+ .launch(
+ projectId,
+ jobRegion,
+ new LaunchFlexTemplateRequest().setLaunchParameter(parameter))
.execute();
logger.atInfo().log("Got response: %s", launchResponse.getJob().toPrettyString());
String jobId = launchResponse.getJob().getId();
@@ -123,12 +140,10 @@ public class GenerateInvoicesAction implements Runnable {
logger.atWarning().withCause(e).log("Template Launch failed");
emailUtils.sendAlertEmail(String.format("Template Launch failed due to %s", e.getMessage()));
response.setStatus(SC_INTERNAL_SERVER_ERROR);
- response.setContentType(MediaType.PLAIN_TEXT_UTF_8);
response.setPayload(String.format("Template launch failed: %s", e.getMessage()));
return;
}
response.setStatus(SC_OK);
- response.setContentType(MediaType.PLAIN_TEXT_UTF_8);
response.setPayload("Launched dataflow template.");
}
}
diff --git a/core/src/main/java/google/registry/reporting/billing/PublishInvoicesAction.java b/core/src/main/java/google/registry/reporting/billing/PublishInvoicesAction.java
index 971d9e77c..b12e30bd9 100644
--- a/core/src/main/java/google/registry/reporting/billing/PublishInvoicesAction.java
+++ b/core/src/main/java/google/registry/reporting/billing/PublishInvoicesAction.java
@@ -59,6 +59,7 @@ public class PublishInvoicesAction implements Runnable {
private static final String JOB_FAILED = "JOB_STATE_FAILED";
private final String projectId;
+ private final String jobRegion;
private final String jobId;
private final BillingEmailUtils emailUtils;
private final Dataflow dataflow;
@@ -68,12 +69,14 @@ public class PublishInvoicesAction implements Runnable {
@Inject
PublishInvoicesAction(
@Config("projectId") String projectId,
+ @Config("defaultJobRegion") String jobRegion,
@Parameter(ReportingModule.PARAM_JOB_ID) String jobId,
BillingEmailUtils emailUtils,
Dataflow dataflow,
Response response,
YearMonth yearMonth) {
this.projectId = projectId;
+ this.jobRegion = jobRegion;
this.jobId = jobId;
this.emailUtils = emailUtils;
this.dataflow = dataflow;
@@ -87,7 +90,7 @@ public class PublishInvoicesAction implements Runnable {
public void run() {
try {
logger.atInfo().log("Starting publish job.");
- Job job = dataflow.projects().jobs().get(projectId, jobId).execute();
+ Job job = dataflow.projects().locations().jobs().get(projectId, jobRegion, jobId).execute();
String state = job.getCurrentState();
switch (state) {
case JOB_DONE:
diff --git a/core/src/main/java/google/registry/tools/DeployInvoicingPipelineCommand.java b/core/src/main/java/google/registry/tools/DeployInvoicingPipelineCommand.java
deleted file mode 100644
index d76829062..000000000
--- a/core/src/main/java/google/registry/tools/DeployInvoicingPipelineCommand.java
+++ /dev/null
@@ -1,31 +0,0 @@
-// Copyright 2017 The Nomulus Authors. All Rights Reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package google.registry.tools;
-
-import com.beust.jcommander.Parameters;
-import google.registry.beam.invoicing.InvoicingPipeline;
-import javax.inject.Inject;
-
-/** Nomulus command that deploys the {@link InvoicingPipeline} template. */
-@Parameters(commandDescription = "Deploy the invoicing pipeline to GCS.")
-public class DeployInvoicingPipelineCommand implements Command {
-
- @Inject InvoicingPipeline invoicingPipeline;
-
- @Override
- public void run() {
- invoicingPipeline.deploy();
- }
-}
diff --git a/core/src/main/java/google/registry/tools/RegistryTool.java b/core/src/main/java/google/registry/tools/RegistryTool.java
index f25997a37..30b9bda34 100644
--- a/core/src/main/java/google/registry/tools/RegistryTool.java
+++ b/core/src/main/java/google/registry/tools/RegistryTool.java
@@ -62,7 +62,6 @@ public final class RegistryTool {
.put("delete_premium_list", DeletePremiumListCommand.class)
.put("delete_reserved_list", DeleteReservedListCommand.class)
.put("delete_tld", DeleteTldCommand.class)
- .put("deploy_invoicing_pipeline", DeployInvoicingPipelineCommand.class)
.put("encrypt_escrow_deposit", EncryptEscrowDepositCommand.class)
.put("execute_epp", ExecuteEppCommand.class)
.put("generate_allocation_tokens", GenerateAllocationTokensCommand.class)
diff --git a/core/src/main/java/google/registry/tools/RegistryToolComponent.java b/core/src/main/java/google/registry/tools/RegistryToolComponent.java
index 4bbefb4d0..c655ff120 100644
--- a/core/src/main/java/google/registry/tools/RegistryToolComponent.java
+++ b/core/src/main/java/google/registry/tools/RegistryToolComponent.java
@@ -105,8 +105,6 @@ interface RegistryToolComponent {
void inject(DeleteContactByRoidCommand command);
- void inject(DeployInvoicingPipelineCommand command);
-
void inject(EncryptEscrowDepositCommand command);
void inject(GenerateAllocationTokensCommand command);
diff --git a/core/src/main/resources/google/registry/beam/init_sql_pipeline_metadata.json b/core/src/main/resources/google/registry/beam/init_sql_pipeline_metadata.json
index 69d96fa63..d3c251b50 100644
--- a/core/src/main/resources/google/registry/beam/init_sql_pipeline_metadata.json
+++ b/core/src/main/resources/google/registry/beam/init_sql_pipeline_metadata.json
@@ -5,7 +5,7 @@
{
"name": "registryEnvironment",
"label": "The Registry environment.",
- "helpText": "The Registry environment, required if environment-specific initialization is needed on worker VMs.",
+ "helpText": "The Registry environment, required if environment-specific initialization (such as JPA) is needed on worker VMs.",
"is_optional": true,
"regexes": [
"^[0-9A-Z_]+$"
diff --git a/core/src/main/java/google/registry/beam/invoicing/sql/billing_events.sql b/core/src/main/resources/google/registry/beam/invoicing/sql/billing_events.sql
similarity index 100%
rename from core/src/main/java/google/registry/beam/invoicing/sql/billing_events.sql
rename to core/src/main/resources/google/registry/beam/invoicing/sql/billing_events.sql
diff --git a/core/src/main/resources/google/registry/beam/invoicing_pipeline_metadata.json b/core/src/main/resources/google/registry/beam/invoicing_pipeline_metadata.json
new file mode 100644
index 000000000..2b709cbef
--- /dev/null
+++ b/core/src/main/resources/google/registry/beam/invoicing_pipeline_metadata.json
@@ -0,0 +1,66 @@
+{
+ "name": "Invoice and Detailed Reports Generation",
+ "description": "An Apache Beam batch pipeline that reads from a Datastore export and generate monthly invoice and detailed reports, saving them on GCS.",
+ "parameters": [
+ {
+ "name": "registryEnvironment",
+ "label": "The Registry environment.",
+ "helpText": "The Registry environment, required if environment-specific initialization (such as JPA) is needed on worker VMs.",
+ "is_optional": true,
+ "regexes": [
+ "^[0-9A-Z_]+$"
+ ]
+ },
+ {
+ "name": "isolationOverride",
+ "label": "The desired SQL transaction isolation level.",
+ "helpText": "The desired SQL transaction isolation level.",
+ "is_optional": true,
+ "regexes": [
+ "^[0-9A-Z_]+$"
+ ]
+ },
+ {
+ "name": "sqlWriteBatchSize",
+ "label": "SQL write batch size.",
+ "helpText": "The number of entities to write to the SQL database in one operation.",
+ "is_optional": true,
+ "regexes": [
+ "^[1-9][0-9]*$"
+ ]
+ },
+ {
+ "name": "sqlWriteShards",
+ "label": "Number of output shards to create when writing to SQL.",
+ "helpText": "Number of shards to create out of the data before writing to the SQL database. Please refer to the Javadoc of RegistryJpaIO.Write.shards() for how to choose this value.",
+ "is_optional": true,
+ "regexes": [
+ "^[1-9][0-9]*$"
+ ]
+ },
+ {
+ "name": "yearMonth",
+ "label": "The year and month we generate invoice and detailed reports for.",
+ "helpText": "The year and month for which the invoice and detailed reports are generated, in yyyy-MM format.",
+ "regexes": [
+ "^2[0-9]{3}-(0[1-9]|1[0-2])$"
+ ]
+ },
+ {
+ "name": "invoiceFilePrefix",
+ "label": "Filename prefix for the invoice CSV file.",
+ "helpText": "The prefix that will be applied to the invoice file.",
+ "regexes": [
+ "^[a-zA-Z0-9_\\-]+$"
+ ]
+ },
+ {
+ "name": "billingBucketUrl",
+ "label": "invoice and detailed reports upload dir.",
+ "helpText": "The root directory of the reports to upload.",
+ "regexes": [
+ "^gs:\\/\\/[^\\n\\r]+$"
+ ]
+ }
+ ]
+}
diff --git a/core/src/main/resources/google/registry/beam/spec11_pipeline_metadata.json b/core/src/main/resources/google/registry/beam/spec11_pipeline_metadata.json
index f9e17b2c1..eaff2dfe8 100644
--- a/core/src/main/resources/google/registry/beam/spec11_pipeline_metadata.json
+++ b/core/src/main/resources/google/registry/beam/spec11_pipeline_metadata.json
@@ -5,7 +5,7 @@
{
"name": "registryEnvironment",
"label": "The Registry environment.",
- "helpText": "The Registry environment, required if environment-specific initialization is needed on worker VMs.",
+ "helpText": "The Registry environment, required if environment-specific initialization (such as JPA) is needed on worker VMs.",
"is_optional": true,
"regexes": [
"^[0-9A-Z_]+$"
@@ -41,7 +41,7 @@
{
"name": "date",
"label": "The date when the pipeline runs",
- "helpText": "The date then the threat scan is performed, in yyyy-MM-dd format.",
+ "helpText": "The date when the threat scan is performed, in yyyy-MM-dd format.",
"regexes": [
"^2[0-9]{3}-(0[1-9]|1[0-2])-(0[1-9]|[12][0-9]|3[01])$"
]
diff --git a/core/src/test/java/google/registry/batch/WipeOutDatastoreActionTest.java b/core/src/test/java/google/registry/batch/WipeOutDatastoreActionTest.java
index c3b4d9d42..ce3f780be 100644
--- a/core/src/test/java/google/registry/batch/WipeOutDatastoreActionTest.java
+++ b/core/src/test/java/google/registry/batch/WipeOutDatastoreActionTest.java
@@ -18,52 +18,20 @@ import static com.google.common.truth.Truth.assertThat;
import static javax.servlet.http.HttpServletResponse.SC_FORBIDDEN;
import static org.apache.http.HttpStatus.SC_INTERNAL_SERVER_ERROR;
import static org.apache.http.HttpStatus.SC_OK;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
-import com.google.api.services.dataflow.Dataflow;
-import com.google.api.services.dataflow.model.Job;
-import com.google.api.services.dataflow.model.LaunchFlexTemplateRequest;
-import com.google.api.services.dataflow.model.LaunchFlexTemplateResponse;
+import google.registry.beam.BeamActionTestBase;
import google.registry.testing.FakeClock;
-import google.registry.testing.FakeResponse;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
/** Unit tests for {@link WipeoutDatastoreAction}. */
-@ExtendWith(MockitoExtension.class)
-class WipeOutDatastoreActionTest {
-
- @Mock private Dataflow dataflow;
- @Mock private Dataflow.Projects projects;
- @Mock private Dataflow.Projects.Locations locations;
- @Mock private Dataflow.Projects.Locations.FlexTemplates flexTemplates;
- @Mock private Dataflow.Projects.Locations.FlexTemplates.Launch launch;
- private LaunchFlexTemplateResponse launchResponse =
- new LaunchFlexTemplateResponse().setJob(new Job());
+class WipeOutDatastoreActionTest extends BeamActionTestBase {
private final FakeClock clock = new FakeClock();
- private final FakeResponse response = new FakeResponse();
-
- @BeforeEach
- void beforeEach() throws Exception {
- lenient().when(dataflow.projects()).thenReturn(projects);
- lenient().when(projects.locations()).thenReturn(locations);
- lenient().when(locations.flexTemplates()).thenReturn(flexTemplates);
- lenient()
- .when(flexTemplates.launch(anyString(), anyString(), any(LaunchFlexTemplateRequest.class)))
- .thenReturn(launch);
- lenient().when(launch.execute()).thenReturn(launchResponse);
- }
@Test
void run_projectNotAllowed() {
diff --git a/core/src/test/java/google/registry/beam/BeamActionTestBase.java b/core/src/test/java/google/registry/beam/BeamActionTestBase.java
new file mode 100644
index 000000000..473b871bc
--- /dev/null
+++ b/core/src/test/java/google/registry/beam/BeamActionTestBase.java
@@ -0,0 +1,57 @@
+// Copyright 2021 The Nomulus Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package google.registry.beam;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.api.services.dataflow.Dataflow;
+import com.google.api.services.dataflow.Dataflow.Projects;
+import com.google.api.services.dataflow.Dataflow.Projects.Locations;
+import com.google.api.services.dataflow.Dataflow.Projects.Locations.FlexTemplates;
+import com.google.api.services.dataflow.Dataflow.Projects.Locations.FlexTemplates.Launch;
+import com.google.api.services.dataflow.model.Job;
+import com.google.api.services.dataflow.model.LaunchFlexTemplateRequest;
+import com.google.api.services.dataflow.model.LaunchFlexTemplateResponse;
+import google.registry.testing.FakeResponse;
+import org.junit.jupiter.api.BeforeEach;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+/** Base class for all actions that launches a Dataflow Flex template. */
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
+public abstract class BeamActionTestBase {
+
+ protected FakeResponse response = new FakeResponse();
+ protected Dataflow dataflow = mock(Dataflow.class);
+ private Projects projects = mock(Projects.class);
+ private Locations locations = mock(Locations.class);
+ private FlexTemplates templates = mock(FlexTemplates.class);
+ protected Launch launch = mock(Launch.class);
+ private LaunchFlexTemplateResponse launchResponse =
+ new LaunchFlexTemplateResponse().setJob(new Job().setId("jobid"));
+
+ @BeforeEach
+ void beforeEach() throws Exception {
+ when(dataflow.projects()).thenReturn(projects);
+ when(projects.locations()).thenReturn(locations);
+ when(locations.flexTemplates()).thenReturn(templates);
+ when(templates.launch(anyString(), anyString(), any(LaunchFlexTemplateRequest.class)))
+ .thenReturn(launch);
+ when(launch.execute()).thenReturn(launchResponse);
+ }
+}
diff --git a/core/src/test/java/google/registry/beam/invoicing/InvoicingPipelineTest.java b/core/src/test/java/google/registry/beam/invoicing/InvoicingPipelineTest.java
index 456a2dee9..e6e62b905 100644
--- a/core/src/test/java/google/registry/beam/invoicing/InvoicingPipelineTest.java
+++ b/core/src/test/java/google/registry/beam/invoicing/InvoicingPipelineTest.java
@@ -16,26 +16,21 @@ package google.registry.beam.invoicing;
import static com.google.common.truth.Truth.assertThat;
-import com.google.auth.oauth2.GoogleCredentials;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import google.registry.beam.TestPipelineExtension;
-import google.registry.util.GoogleCredentialsBundle;
+import google.registry.testing.TestDataHelper;
import google.registry.util.ResourceUtils;
import java.io.File;
-import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Map.Entry;
-import org.apache.beam.runners.direct.DirectRunner;
-import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PCollection;
-import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -44,160 +39,168 @@ import org.junit.jupiter.api.io.TempDir;
/** Unit tests for {@link InvoicingPipeline}. */
class InvoicingPipelineTest {
- private static PipelineOptions pipelineOptions;
+ private static final String BILLING_BUCKET_URL = "billing_bucket";
+ private static final String YEAR_MONTH = "2017-10";
+ private static final String INVOICE_FILE_PREFIX = "REG-INV";
- @BeforeAll
- static void beforeAll() {
- pipelineOptions = PipelineOptionsFactory.create();
- pipelineOptions.setRunner(DirectRunner.class);
- }
+ private static final ImmutableList