diff --git a/java/google/registry/beam/InvoicingPipeline.java b/java/google/registry/beam/InvoicingPipeline.java index 9fb3d7df2..bff0e5f4d 100644 --- a/java/google/registry/beam/InvoicingPipeline.java +++ b/java/google/registry/beam/InvoicingPipeline.java @@ -54,16 +54,40 @@ import org.apache.beam.sdk.values.TypeDescriptors; */ public class InvoicingPipeline implements Serializable { - @Inject @Config("projectId") String projectId; - @Inject @Config("apacheBeamBucketUrl") String beamBucket; - @Inject InvoicingPipeline() {} + @Inject + @Config("projectId") + String projectId; + + @Inject + @Config("apacheBeamBucketUrl") + String beamBucketUrl; + + @Inject + @Config("invoiceTemplateUrl") + String invoiceTemplateUrl; + + @Inject + @Config("invoiceStagingUrl") + String invoiceStagingUrl; + + @Inject + @Config("billingBucketUrl") + String billingBucketUrl; + + @Inject + InvoicingPipeline() {} /** Custom options for running the invoicing pipeline. */ interface InvoicingPipelineOptions extends DataflowPipelineOptions { /** Returns the yearMonth we're generating invoices for, in yyyy-MM format. */ @Description("The yearMonth we generate invoices for, in yyyy-MM format.") ValueProvider getYearMonth(); - /** Sets the yearMonth we generate invoices for. */ + /** + * Sets the yearMonth we generate invoices for. + * + *

This is implicitly set when executing the Dataflow template, by specifying the 'yearMonth + * parameter. + */ void setYearMonth(ValueProvider value); } @@ -73,8 +97,9 @@ public class InvoicingPipeline implements Serializable { InvoicingPipelineOptions options = PipelineOptionsFactory.as(InvoicingPipelineOptions.class); options.setProject(projectId); options.setRunner(DataflowRunner.class); - options.setStagingLocation(beamBucket + "/staging"); - options.setTemplateLocation(beamBucket + "/templates/invoicing"); + // This causes p.run() to stage the pipeline as a template on GCS, as opposed to running it. + options.setTemplateLocation(invoiceTemplateUrl); + options.setStagingLocation(invoiceStagingUrl); Pipeline p = Pipeline.create(options); PCollection billingEvents = @@ -97,13 +122,13 @@ public class InvoicingPipeline implements Serializable { */ void applyTerminalTransforms( PCollection billingEvents, ValueProvider yearMonthProvider) { - billingEvents.apply( - "Write events to separate CSVs keyed by registrarId_tld pair", - writeDetailReports(yearMonthProvider)); - billingEvents .apply("Generate overall invoice rows", new GenerateInvoiceRows()) .apply("Write overall invoice to CSV", writeInvoice(yearMonthProvider)); + + billingEvents.apply( + "Write detail reports to separate CSVs keyed by registrarId_tld pair", + writeDetailReports(yearMonthProvider)); } /** Transform that converts a {@code BillingEvent} into an invoice CSV row. */ @@ -131,11 +156,14 @@ public class InvoicingPipeline implements Serializable { .to( NestedValueProvider.of( yearMonthProvider, - // TODO(larryruili): Replace with billing bucket after verifying 2017-12 output. yearMonth -> String.format( - "%s/results/%s-%s", - beamBucket, BillingModule.OVERALL_INVOICE_PREFIX, yearMonth))) + "%s/%s/%s/%s-%s", + billingBucketUrl, + BillingModule.INVOICES_DIRECTORY, + yearMonth, + BillingModule.OVERALL_INVOICE_PREFIX, + yearMonth))) .withHeader(InvoiceGroupingKey.invoiceHeader()) .withoutSharding() .withSuffix(".csv"); @@ -145,13 +173,15 @@ public class InvoicingPipeline implements Serializable { private TextIO.TypedWrite writeDetailReports( ValueProvider yearMonthProvider) { return TextIO.writeCustomType() - // TODO(larryruili): Replace with billing bucket/yyyy-MM after verifying 2017-12 output. .to( - InvoicingUtils.makeDestinationFunction(beamBucket + "/results", yearMonthProvider), - InvoicingUtils.makeEmptyDestinationParams(beamBucket + "/results")) + InvoicingUtils.makeDestinationFunction( + String.format("%s/%s", billingBucketUrl, BillingModule.INVOICES_DIRECTORY), + yearMonthProvider), + InvoicingUtils.makeEmptyDestinationParams(billingBucketUrl + "/errors")) .withFormatFunction(BillingEvent::toCsv) .withoutSharding() - .withTempDirectory(FileBasedSink.convertToFileResourceIfPossible(beamBucket + "/temporary")) + .withTempDirectory( + FileBasedSink.convertToFileResourceIfPossible(beamBucketUrl + "/temporary")) .withHeader(BillingEvent.getHeader()) .withSuffix(".csv"); } diff --git a/java/google/registry/beam/InvoicingUtils.java b/java/google/registry/beam/InvoicingUtils.java index daa864ed5..1e52c0f26 100644 --- a/java/google/registry/beam/InvoicingUtils.java +++ b/java/google/registry/beam/InvoicingUtils.java @@ -55,7 +55,8 @@ public class InvoicingUtils { yearMonth -> FileBasedSink.convertToFileResourceIfPossible( String.format( - "%s/%s", outputBucket, billingEvent.toFilename(yearMonth))))); + "%s/%s/%s", + outputBucket, yearMonth, billingEvent.toFilename(yearMonth))))); } /** diff --git a/java/google/registry/billing/BillingEmailUtils.java b/java/google/registry/billing/BillingEmailUtils.java index 347520fcb..24a34a9d4 100644 --- a/java/google/registry/billing/BillingEmailUtils.java +++ b/java/google/registry/billing/BillingEmailUtils.java @@ -14,15 +14,29 @@ package google.registry.billing; +import static java.nio.charset.StandardCharsets.UTF_8; + +import com.google.appengine.tools.cloudstorage.GcsFilename; import com.google.common.collect.ImmutableList; +import com.google.common.io.CharStreams; +import google.registry.billing.BillingModule.InvoiceDirectoryPrefix; import google.registry.config.RegistryConfig.Config; +import google.registry.gcs.GcsUtils; import google.registry.util.FormattingLogger; +import google.registry.util.Retrier; import google.registry.util.SendEmailService; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; import javax.inject.Inject; +import javax.mail.BodyPart; import javax.mail.Message; import javax.mail.Message.RecipientType; import javax.mail.MessagingException; +import javax.mail.Multipart; import javax.mail.internet.InternetAddress; +import javax.mail.internet.MimeBodyPart; +import javax.mail.internet.MimeMultipart; import org.joda.time.YearMonth; /** Utility functions for sending emails involving monthly invoices. */ @@ -31,55 +45,104 @@ class BillingEmailUtils { private final SendEmailService emailService; private final YearMonth yearMonth; private final String alertSenderAddress; + private final String alertRecipientAddress; private final ImmutableList invoiceEmailRecipients; - // TODO(larryruili): Replace this bucket after verifying 2017-12 output. - private final String beamBucketUrl; + private final String billingBucket; + private final String invoiceDirectoryPrefix; + private final GcsUtils gcsUtils; + private final Retrier retrier; @Inject BillingEmailUtils( SendEmailService emailService, YearMonth yearMonth, @Config("alertSenderEmailAddress") String alertSenderAddress, + @Config("alertRecipientEmailAddress") String alertRecipientAddress, @Config("invoiceEmailRecipients") ImmutableList invoiceEmailRecipients, - @Config("apacheBeamBucketUrl") String beamBucketUrl) { + @Config("billingBucket") String billingBucket, + @InvoiceDirectoryPrefix String invoiceDirectoryPrefix, + GcsUtils gcsUtils, + Retrier retrier) { this.emailService = emailService; this.yearMonth = yearMonth; this.alertSenderAddress = alertSenderAddress; + this.alertRecipientAddress = alertRecipientAddress; this.invoiceEmailRecipients = invoiceEmailRecipients; - this.beamBucketUrl = beamBucketUrl; + this.billingBucket = billingBucket; + this.invoiceDirectoryPrefix = invoiceDirectoryPrefix; + this.gcsUtils = gcsUtils; + this.retrier = retrier; } private static final FormattingLogger logger = FormattingLogger.getLoggerForCallerClass(); - /** - * Sends a link to the generated overall invoice in GCS. - * - *

Note the users receiving the e-mail should have access to the object or bucket, via an - * authorization mechanism such as IAM. - */ - void emailInvoiceLink() { - // TODO(larryruili): Add read permissions for appropriate buckets. - try { - String beamBucket = beamBucketUrl.replaceFirst("gs://", ""); - Message msg = emailService.createMessage(); - msg.setFrom(new InternetAddress(alertSenderAddress)); - for (String recipient : invoiceEmailRecipients) { - msg.addRecipient(RecipientType.TO, new InternetAddress(recipient)); - } - msg.setSubject(String.format("Domain Registry invoice data %s", yearMonth.toString())); - msg.setText( - String.format( - "Link to invoice on GCS:\nhttps://storage.cloud.google.com/%s/%s", - beamBucket, + /** Sends an e-mail to all expected recipients with an attached overall invoice from GCS. */ + void emailOverallInvoice() { + retrier.callWithRetry( + () -> { + String invoiceFile = String.format( - "%s%s-%s.csv", - BillingModule.RESULTS_DIRECTORY_PREFIX, - BillingModule.OVERALL_INVOICE_PREFIX, - yearMonth.toString()))); - emailService.sendMessage(msg); - } catch (MessagingException e) { - // TODO(larryruili): Replace with retrier with final failure email settings. - logger.warning(e, "E-mail service failed due to %s"); - } + "%s-%s.csv", BillingModule.OVERALL_INVOICE_PREFIX, yearMonth.toString()); + GcsFilename invoiceFilename = + new GcsFilename(billingBucket, invoiceDirectoryPrefix + invoiceFile); + try (InputStream in = gcsUtils.openInputStream(invoiceFilename)) { + Message msg = emailService.createMessage(); + msg.setFrom(new InternetAddress(alertSenderAddress)); + for (String recipient : invoiceEmailRecipients) { + msg.addRecipient(RecipientType.TO, new InternetAddress(recipient)); + } + msg.setSubject(String.format("Domain Registry invoice data %s", yearMonth.toString())); + Multipart multipart = new MimeMultipart(); + BodyPart textPart = new MimeBodyPart(); + textPart.setText( + String.format( + "Attached is the %s invoice for the domain registry.", yearMonth.toString())); + multipart.addBodyPart(textPart); + BodyPart invoicePart = new MimeBodyPart(); + String invoiceData = CharStreams.toString(new InputStreamReader(in, UTF_8)); + invoicePart.setContent(invoiceData, "text/csv; charset=utf-8"); + invoicePart.setFileName(invoiceFile); + multipart.addBodyPart(invoicePart); + msg.setContent(multipart); + msg.saveChanges(); + emailService.sendMessage(msg); + } + }, + new Retrier.FailureReporter() { + @Override + public void beforeRetry(Throwable thrown, int failures, int maxAttempts) {} + + @Override + public void afterFinalFailure(Throwable thrown, int failures) { + sendAlertEmail( + String.format("Emailing invoice failed due to %s", thrown.getMessage())); + } + }, + IOException.class, + MessagingException.class); + } + + /** Sends an e-mail to the provided alert e-mail address indicating a billing failure. */ + void sendAlertEmail(String body) { + retrier.callWithRetry( + () -> { + Message msg = emailService.createMessage(); + msg.setFrom(new InternetAddress(alertSenderAddress)); + msg.addRecipient(RecipientType.TO, new InternetAddress(alertRecipientAddress)); + msg.setSubject(String.format("Billing Pipeline Alert: %s", yearMonth.toString())); + msg.setText(body); + emailService.sendMessage(msg); + return null; + }, + new Retrier.FailureReporter() { + @Override + public void beforeRetry(Throwable thrown, int failures, int maxAttempts) {} + + @Override + public void afterFinalFailure(Throwable thrown, int failures) { + logger.severe(thrown, "The alert e-mail system failed."); + } + }, + MessagingException.class); } } diff --git a/java/google/registry/billing/BillingModule.java b/java/google/registry/billing/BillingModule.java index c15fd9c55..e02701183 100644 --- a/java/google/registry/billing/BillingModule.java +++ b/java/google/registry/billing/BillingModule.java @@ -15,6 +15,7 @@ package google.registry.billing; import static google.registry.request.RequestParameters.extractRequiredParameter; +import static java.lang.annotation.RetentionPolicy.RUNTIME; import com.google.api.client.googleapis.extensions.appengine.auth.oauth2.AppIdentityCredential; import com.google.api.client.http.HttpTransport; @@ -25,9 +26,13 @@ import dagger.Module; import dagger.Provides; import google.registry.config.RegistryConfig.Config; import google.registry.request.Parameter; +import java.lang.annotation.Documented; +import java.lang.annotation.Retention; import java.util.Set; import java.util.function.Function; +import javax.inject.Qualifier; import javax.servlet.http.HttpServletRequest; +import org.joda.time.YearMonth; /** Module for dependencies required by monthly billing actions. */ @Module @@ -35,13 +40,12 @@ public final class BillingModule { public static final String DETAIL_REPORT_PREFIX = "invoice_details"; public static final String OVERALL_INVOICE_PREFIX = "CRR-INV"; + public static final String INVOICES_DIRECTORY = "invoices"; static final String PARAM_JOB_ID = "jobId"; - static final String PARAM_DIRECTORY_PREFIX = "directoryPrefix"; + static final String PARAM_YEAR_MONTH = "yearMonth"; static final String BILLING_QUEUE = "billing"; static final String CRON_QUEUE = "retryable-cron-tasks"; - // TODO(larryruili): Replace with invoices/yyyy-MM after verifying 2017-12 invoice. - static final String RESULTS_DIRECTORY_PREFIX = "results/"; private static final String CLOUD_PLATFORM_SCOPE = "https://www.googleapis.com/auth/cloud-platform"; @@ -53,11 +57,10 @@ public final class BillingModule { return extractRequiredParameter(req, PARAM_JOB_ID); } - /** Provides the subdirectory under a GCS bucket that we copy detail reports from. */ @Provides - @Parameter(PARAM_DIRECTORY_PREFIX) - static String provideDirectoryPrefix(HttpServletRequest req) { - return extractRequiredParameter(req, PARAM_DIRECTORY_PREFIX); + @InvoiceDirectoryPrefix + static String provideDirectoryPrefix(YearMonth yearMonth) { + return String.format("%s/%s/", INVOICES_DIRECTORY, yearMonth.toString()); } /** Constructs a {@link Dataflow} API client with default settings. */ @@ -75,4 +78,10 @@ public final class BillingModule { .setApplicationName(String.format("%s billing", projectId)) .build(); } + + /** Dagger qualifier for the subdirectory we stage to/upload from. */ + @Qualifier + @Documented + @Retention(RUNTIME) + @interface InvoiceDirectoryPrefix{} } diff --git a/java/google/registry/billing/CopyDetailReportsAction.java b/java/google/registry/billing/CopyDetailReportsAction.java index 33a170779..231629258 100644 --- a/java/google/registry/billing/CopyDetailReportsAction.java +++ b/java/google/registry/billing/CopyDetailReportsAction.java @@ -22,11 +22,11 @@ import com.google.appengine.tools.cloudstorage.GcsFilename; import com.google.common.collect.ImmutableList; import com.google.common.io.ByteStreams; import com.google.common.net.MediaType; +import google.registry.billing.BillingModule.InvoiceDirectoryPrefix; import google.registry.config.RegistryConfig.Config; import google.registry.gcs.GcsUtils; import google.registry.model.registrar.Registrar; import google.registry.request.Action; -import google.registry.request.Parameter; import google.registry.request.Response; import google.registry.request.auth.Auth; import google.registry.storage.drive.DriveConnection; @@ -45,39 +45,39 @@ public final class CopyDetailReportsAction implements Runnable { private static final FormattingLogger logger = FormattingLogger.getLoggerForCallerClass(); - // TODO(larryruili): Replace this bucket with the billing bucket after verifying 2017-12 output. - private final String beamBucketUrl; - private final String folderPrefix; + private final String billingBucket; + private final String invoiceDirectoryPrefix; private final DriveConnection driveConnection; private final GcsUtils gcsUtils; private final Retrier retrier; private final Response response; + private final BillingEmailUtils emailUtils; @Inject CopyDetailReportsAction( - @Config("apacheBeamBucketUrl") String beamBucketUrl, - @Parameter(BillingModule.PARAM_DIRECTORY_PREFIX) String folderPrefix, + @Config("billingBucket") String billingBucket, + @InvoiceDirectoryPrefix String invoiceDirectoryPrefix, DriveConnection driveConnection, GcsUtils gcsUtils, Retrier retrier, - Response response) { - this.beamBucketUrl = beamBucketUrl; - this.folderPrefix = folderPrefix; + Response response, + BillingEmailUtils emailUtils) { + this.billingBucket = billingBucket; + this.invoiceDirectoryPrefix = invoiceDirectoryPrefix; this.driveConnection = driveConnection; this.gcsUtils = gcsUtils; this.retrier = retrier; this.response = response; + this.emailUtils = emailUtils; } @Override public void run() { - // Strip the URL prefix from the beam bucket - String beamBucket = beamBucketUrl.replace("gs://", ""); ImmutableList detailReportObjectNames; try { detailReportObjectNames = gcsUtils - .listFolderObjects(beamBucket, folderPrefix) + .listFolderObjects(billingBucket, invoiceDirectoryPrefix) .stream() .filter(objectName -> objectName.startsWith(BillingModule.DETAIL_REPORT_PREFIX)) .collect(ImmutableList.toImmutableList()); @@ -93,7 +93,6 @@ public final class CopyDetailReportsAction implements Runnable { // TODO(larryruili): Determine a safer way of enforcing this. String registrarId = detailReportName.split("_")[3]; Optional registrar = Registrar.loadByClientId(registrarId); - // TODO(larryruili): Send an email alert if any report fails to be copied for any reason. if (!registrar.isPresent()) { logger.warningfmt( "Registrar %s not found in database for file %s", registrar, detailReportName); @@ -109,7 +108,7 @@ public final class CopyDetailReportsAction implements Runnable { () -> { try (InputStream input = gcsUtils.openInputStream( - new GcsFilename(beamBucket, folderPrefix + detailReportName))) { + new GcsFilename(billingBucket, invoiceDirectoryPrefix + detailReportName))) { driveConnection.createFile( detailReportName, MediaType.CSV_UTF_8, @@ -117,7 +116,19 @@ public final class CopyDetailReportsAction implements Runnable { ByteStreams.toByteArray(input)); logger.infofmt( "Published detail report for %s to folder %s using GCS file gs://%s/%s.", - registrarId, driveFolderId, beamBucket, detailReportName); + registrarId, driveFolderId, billingBucket, detailReportName); + } + }, + new Retrier.FailureReporter() { + @Override + public void beforeRetry(Throwable thrown, int failures, int maxAttempts) {} + + @Override + public void afterFinalFailure(Throwable thrown, int failures) { + emailUtils.sendAlertEmail( + String.format( + "Warning: CopyDetailReportsAction failed.\nEncountered: %s on file: %s", + thrown.getMessage(), detailReportName)); } }, IOException.class); diff --git a/java/google/registry/billing/GenerateInvoicesAction.java b/java/google/registry/billing/GenerateInvoicesAction.java index 0bd06ee4d..fa6bb4be2 100644 --- a/java/google/registry/billing/GenerateInvoicesAction.java +++ b/java/google/registry/billing/GenerateInvoicesAction.java @@ -53,22 +53,28 @@ public class GenerateInvoicesAction implements Runnable { private final String projectId; private final String beamBucketUrl; + private final String invoiceTemplateUrl; private final YearMonth yearMonth; private final Dataflow dataflow; private final Response response; + private final BillingEmailUtils emailUtils; @Inject GenerateInvoicesAction( @Config("projectId") String projectId, @Config("apacheBeamBucketUrl") String beamBucketUrl, + @Config("invoiceTemplateUrl") String invoiceTemplateUrl, YearMonth yearMonth, Dataflow dataflow, - Response response) { + Response response, + BillingEmailUtils emailUtils) { this.projectId = projectId; this.beamBucketUrl = beamBucketUrl; + this.invoiceTemplateUrl = invoiceTemplateUrl; this.yearMonth = yearMonth; this.dataflow = dataflow; this.response = response; + this.emailUtils = emailUtils; } @Override @@ -88,13 +94,14 @@ public class GenerateInvoicesAction implements Runnable { .projects() .templates() .launch(projectId, params) - .setGcsPath(beamBucketUrl + "/templates/invoicing") + .setGcsPath(invoiceTemplateUrl) .execute(); logger.infofmt("Got response: %s", launchResponse.getJob().toPrettyString()); String jobId = launchResponse.getJob().getId(); enqueuePublishTask(jobId); } catch (IOException e) { logger.warningfmt("Template Launch failed due to: %s", e.getMessage()); + emailUtils.sendAlertEmail(String.format("Template Launch failed due to %s", e.getMessage())); response.setStatus(SC_INTERNAL_SERVER_ERROR); response.setContentType(MediaType.PLAIN_TEXT_UTF_8); response.setPayload(String.format("Template launch failed: %s", e.getMessage())); @@ -111,7 +118,9 @@ public class GenerateInvoicesAction implements Runnable { .method(TaskOptions.Method.POST) // Dataflow jobs tend to take about 10 minutes to complete. .countdownMillis(Duration.standardMinutes(10).getMillis()) - .param(BillingModule.PARAM_JOB_ID, jobId); + .param(BillingModule.PARAM_JOB_ID, jobId) + // Need to pass this through to ensure transitive yearMonth dependencies are satisfied. + .param(BillingModule.PARAM_YEAR_MONTH, yearMonth.toString()); QueueFactory.getQueue(BillingModule.BILLING_QUEUE).add(publishTask); } } diff --git a/java/google/registry/billing/PublishInvoicesAction.java b/java/google/registry/billing/PublishInvoicesAction.java index b3343912e..03616d04e 100644 --- a/java/google/registry/billing/PublishInvoicesAction.java +++ b/java/google/registry/billing/PublishInvoicesAction.java @@ -33,6 +33,7 @@ import google.registry.request.auth.Auth; import google.registry.util.FormattingLogger; import java.io.IOException; import javax.inject.Inject; +import org.joda.time.YearMonth; /** * Uploads the results of the {@link google.registry.beam.InvoicingPipeline}. @@ -55,6 +56,7 @@ public class PublishInvoicesAction implements Runnable { private final BillingEmailUtils emailUtils; private final Dataflow dataflow; private final Response response; + private final YearMonth yearMonth; @Inject PublishInvoicesAction( @@ -62,12 +64,14 @@ public class PublishInvoicesAction implements Runnable { @Parameter(BillingModule.PARAM_JOB_ID) String jobId, BillingEmailUtils emailUtils, Dataflow dataflow, - Response response) { + Response response, + YearMonth yearMonth) { this.projectId = projectId; this.jobId = jobId; this.emailUtils = emailUtils; this.dataflow = dataflow; this.response = response; + this.yearMonth = yearMonth; } static final String PATH = "/_dr/task/publishInvoices"; @@ -83,12 +87,13 @@ public class PublishInvoicesAction implements Runnable { logger.infofmt("Dataflow job %s finished successfully, publishing results.", jobId); response.setStatus(SC_OK); enqueueCopyDetailReportsTask(); - emailUtils.emailInvoiceLink(); + emailUtils.emailOverallInvoice(); break; case JOB_FAILED: logger.severefmt("Dataflow job %s finished unsuccessfully.", jobId); response.setStatus(SC_NO_CONTENT); - // TODO(larryruili): Email failure message + emailUtils.sendAlertEmail( + String.format("Dataflow job %s ended in status failure.", jobId)); break; default: logger.infofmt("Job in non-terminal state %s, retrying:", state); @@ -96,18 +101,18 @@ public class PublishInvoicesAction implements Runnable { break; } } catch (IOException e) { + emailUtils.sendAlertEmail(String.format("Publish action failed due to %s", e.getMessage())); response.setStatus(SC_INTERNAL_SERVER_ERROR); response.setContentType(MediaType.PLAIN_TEXT_UTF_8); response.setPayload(String.format("Template launch failed: %s", e.getMessage())); } } - private static void enqueueCopyDetailReportsTask() { + private void enqueueCopyDetailReportsTask() { TaskOptions copyDetailTask = TaskOptions.Builder.withUrl(CopyDetailReportsAction.PATH) .method(TaskOptions.Method.POST) - .param( - BillingModule.PARAM_DIRECTORY_PREFIX, BillingModule.RESULTS_DIRECTORY_PREFIX); + .param(BillingModule.PARAM_YEAR_MONTH, yearMonth.toString()); QueueFactory.getQueue(BillingModule.CRON_QUEUE).add(copyDetailTask); } } diff --git a/java/google/registry/config/RegistryConfig.java b/java/google/registry/config/RegistryConfig.java index 7bce00e41..cffd0d911 100644 --- a/java/google/registry/config/RegistryConfig.java +++ b/java/google/registry/config/RegistryConfig.java @@ -464,14 +464,50 @@ public final class RegistryConfig { } /** - * Returns the Google Cloud Storage bucket for storing Beam templates and results. + * Returns the name of the GCS bucket for storing Beam templates and results. + * + * @see google.registry.billing.GenerateInvoicesAction + */ + @Provides + @Config("apacheBeamBucket") + public static String provideApacheBeamBucket(@Config("projectId") String projectId) { + return projectId + "-beam"; + } + + /** + * Returns the URL of the GCS location for storing Apache Beam related objects. * * @see google.registry.billing.GenerateInvoicesAction */ @Provides @Config("apacheBeamBucketUrl") - public static String provideApacheBeamBucketUrl(@Config("projectId") String projectId) { - return String.format("gs://%s-beam", projectId); + public static String provideApacheBeamBucketUrl(@Config("apacheBeamBucket") String beamBucket) { + return "gs://" + beamBucket; + } + + /** + * Returns the URL of the GCS location for storing the monthly invoicing Beam template. + * + * @see google.registry.billing.GenerateInvoicesAction + * @see google.registry.beam.InvoicingPipeline + */ + @Provides + @Config("invoiceTemplateUrl") + public static String provideInvoiceTemplateUrl( + @Config("apacheBeamBucketUrl") String beamBucketUrl) { + return beamBucketUrl + "/templates/invoicing"; + } + + /** + * Returns the URL of the GCS location we store jar dependencies for the invoicing pipeline. + * + * @see google.registry.beam.InvoicingPipeline + */ + @Provides + @Config("invoiceStagingUrl") + public static String provideInvoiceStagingUrl( + @Config("apacheBeamBucketUrl") String beamBucketUrl) { + return beamBucketUrl + "/staging"; } /** @@ -508,6 +544,28 @@ public final class RegistryConfig { return config.icannReporting.icannActivityReportingUploadUrl; } + /** + * Returns name of the GCS bucket we store invoices and detail reports in. + * + * @see google.registry.billing + */ + @Provides + @Config("billingBucket") + public static String provideBillingBucket(@Config("projectId") String projectId) { + return projectId + "-billing"; + } + + /** + * Returns the URL of the GCS bucket we store invoices and detail reports in. + * + * @see google.registry.billing + */ + @Provides + @Config("billingBucketUrl") + public static String provideBillingBucketUrl(@Config("billingBucket") String billingBucket) { + return "gs://" + billingBucket; + } + /** * Returns the list of addresses that receive monthly invoicing emails. * diff --git a/java/google/registry/module/backend/BackendModule.java b/java/google/registry/module/backend/BackendModule.java index d4d564871..bcd2aa9b0 100644 --- a/java/google/registry/module/backend/BackendModule.java +++ b/java/google/registry/module/backend/BackendModule.java @@ -42,7 +42,7 @@ import org.joda.time.format.DateTimeFormatter; @Module public class BackendModule { - public static final String PARAM_YEAR_MONTH = "yearMonth"; + private static final String PARAM_YEAR_MONTH = "yearMonth"; @Provides @Parameter(RequestParameters.PARAM_TLD) diff --git a/java/google/registry/util/Retrier.java b/java/google/registry/util/Retrier.java index 608210a0b..2f45ba4af 100644 --- a/java/google/registry/util/Retrier.java +++ b/java/google/registry/util/Retrier.java @@ -108,16 +108,6 @@ public class Retrier implements Serializable { } } - private static final FailureReporter LOGGING_FAILURE_REPORTER = new FailureReporter() { - @Override - public void beforeRetry(Throwable thrown, int failures, int maxAttempts) { - logger.infofmt(thrown, "Retrying transient error, attempt %d", failures); - } - - @Override - public void afterFinalFailure(Throwable thrown, int failures) {} - }; - /** * Retries a unit of work in the face of transient errors and returns the result. * @@ -136,23 +126,20 @@ public class Retrier implements Serializable { Callable callable, Class retryableError, Class... moreRetryableErrors) { - return callWithRetry( - callable, - LOGGING_FAILURE_REPORTER, - retryableError, - moreRetryableErrors); + return callWithRetry(callable, LOGGING_FAILURE_REPORTER, retryableError, moreRetryableErrors); } - /** Retries a unit of work in the face of transient errors. */ + /** + * Retries a unit of work in the face of transient errors, without returning a value. + * + * @see #callWithRetry(Callable, Class, Class[]) + */ @SafeVarargs public final void callWithRetry( VoidCallable callable, Class retryableError, Class... moreRetryableErrors) { - callWithRetry( - callable.asCallable(), - retryableError, - moreRetryableErrors); + callWithRetry(callable.asCallable(), retryableError, moreRetryableErrors); } /** @@ -177,4 +164,29 @@ public class Retrier implements Serializable { return callWithRetry( callable, failureReporter, e -> retryables.stream().anyMatch(supertypeOf(e.getClass()))); } + + /** + * Retries a unit of work in the face of transient errors, without returning a value. + * + * @see #callWithRetry(Callable, FailureReporter, Class, Class[]) + */ + @SafeVarargs + public final void callWithRetry( + VoidCallable callable, + FailureReporter failureReporter, + Class retryableError, + Class... moreRetryableErrors) { + callWithRetry(callable.asCallable(), failureReporter, retryableError, moreRetryableErrors); + } + + private static final FailureReporter LOGGING_FAILURE_REPORTER = + new FailureReporter() { + @Override + public void beforeRetry(Throwable thrown, int failures, int maxAttempts) { + logger.infofmt(thrown, "Retrying transient error, attempt %d", failures); + } + + @Override + public void afterFinalFailure(Throwable thrown, int failures) {} + }; } diff --git a/javatests/google/registry/beam/InvoicingPipelineTest.java b/javatests/google/registry/beam/InvoicingPipelineTest.java index cb6b47a28..42db589ae 100644 --- a/javatests/google/registry/beam/InvoicingPipelineTest.java +++ b/javatests/google/registry/beam/InvoicingPipelineTest.java @@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import google.registry.util.ResourceUtils; import java.io.File; +import java.io.IOException; import java.time.ZoneId; import java.time.ZonedDateTime; import java.util.Map.Entry; @@ -56,10 +57,15 @@ public class InvoicingPipelineTest { private InvoicingPipeline invoicingPipeline; @Before - public void initializePipeline() { + public void initializePipeline() throws IOException { invoicingPipeline = new InvoicingPipeline(); invoicingPipeline.projectId = "test-project"; - invoicingPipeline.beamBucket = tempFolder.getRoot().getAbsolutePath(); + File beamTempFolder = tempFolder.newFolder(); + invoicingPipeline.beamBucketUrl = beamTempFolder.getAbsolutePath(); + invoicingPipeline.invoiceStagingUrl = beamTempFolder.getAbsolutePath() + "/staging"; + invoicingPipeline.invoiceTemplateUrl = + beamTempFolder.getAbsolutePath() + "/templates/invoicing"; + invoicingPipeline.billingBucketUrl = tempFolder.getRoot().getAbsolutePath(); } private ImmutableList getInputEvents() { @@ -180,7 +186,9 @@ public class InvoicingPipelineTest { /** Returns the text contents of a file under the beamBucket/results directory. */ private ImmutableList resultFileContents(String filename) throws Exception { File resultFile = - new File(String.format("%s/results/%s", tempFolder.getRoot().getAbsolutePath(), filename)); + new File( + String.format( + "%s/invoices/2017-10/%s", tempFolder.getRoot().getAbsolutePath(), filename)); return ImmutableList.copyOf( ResourceUtils.readResourceUtf8(resultFile.toURI().toURL()).split("\n")); } diff --git a/javatests/google/registry/beam/InvoicingUtilsTest.java b/javatests/google/registry/beam/InvoicingUtilsTest.java index caca3d864..4b19c173d 100644 --- a/javatests/google/registry/beam/InvoicingUtilsTest.java +++ b/javatests/google/registry/beam/InvoicingUtilsTest.java @@ -49,7 +49,7 @@ public class InvoicingUtilsTest { .withSuffix(".csv") .withBaseFilename( FileBasedSink.convertToFileResourceIfPossible( - "my/directory/invoice_details_2017-10_registrar_tld"))); + "my/directory/2017-10/invoice_details_2017-10_registrar_tld"))); } @Test diff --git a/javatests/google/registry/billing/BillingEmailUtilsTest.java b/javatests/google/registry/billing/BillingEmailUtilsTest.java index 223791de4..b959c34cc 100644 --- a/javatests/google/registry/billing/BillingEmailUtilsTest.java +++ b/javatests/google/registry/billing/BillingEmailUtilsTest.java @@ -15,15 +15,29 @@ package google.registry.billing; import static com.google.common.truth.Truth.assertThat; +import static com.google.common.truth.Truth.assertWithMessage; +import static google.registry.testing.JUnitBackports.expectThrows; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.google.appengine.tools.cloudstorage.GcsFilename; import com.google.common.collect.ImmutableList; +import google.registry.gcs.GcsUtils; +import google.registry.testing.FakeClock; +import google.registry.testing.FakeSleeper; +import google.registry.util.Retrier; import google.registry.util.SendEmailService; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.Properties; +import javax.mail.BodyPart; import javax.mail.Message; import javax.mail.MessagingException; +import javax.mail.Multipart; import javax.mail.Session; import javax.mail.internet.InternetAddress; import javax.mail.internet.MimeMessage; @@ -32,45 +46,125 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; /** Unit tests for {@link BillingEmailUtils}. */ @RunWith(JUnit4.class) public class BillingEmailUtilsTest { + private static final int RETRY_COUNT = 2; + private SendEmailService emailService; - private Message msg; private BillingEmailUtils emailUtils; + private GcsUtils gcsUtils; + private ArgumentCaptor msgCaptor; @Before public void setUp() { - msg = new MimeMessage(Session.getDefaultInstance(new Properties(), null)); emailService = mock(SendEmailService.class); - when(emailService.createMessage()).thenReturn(msg); + when(emailService.createMessage()) + .thenReturn(new MimeMessage(Session.getDefaultInstance(new Properties(), null))); + gcsUtils = mock(GcsUtils.class); + when(gcsUtils.openInputStream(new GcsFilename("test-bucket", "results/CRR-INV-2017-10.csv"))) + .thenReturn( + new ByteArrayInputStream("test,data\nhello,world".getBytes(StandardCharsets.UTF_8))); + msgCaptor = ArgumentCaptor.forClass(Message.class); emailUtils = new BillingEmailUtils( emailService, new YearMonth(2017, 10), "my-sender@test.com", + "my-receiver@test.com", ImmutableList.of("hello@world.com", "hola@mundo.com"), - "gs://test-bucket"); + "test-bucket", + "results/", + gcsUtils, + new Retrier(new FakeSleeper(new FakeClock()), RETRY_COUNT)); } @Test - public void testSuccess_sendsEmail() throws MessagingException, IOException { - emailUtils.emailInvoiceLink(); - assertThat(msg.getFrom()).hasLength(1); - assertThat(msg.getFrom()[0]) - .isEqualTo(new InternetAddress("my-sender@test.com")); - assertThat(msg.getAllRecipients()) + public void testSuccess_emailOverallInvoice() throws MessagingException, IOException { + emailUtils.emailOverallInvoice(); + // We inspect individual parameters because Message doesn't implement equals(). + verify(emailService).sendMessage(msgCaptor.capture()); + Message expectedMsg = msgCaptor.getValue(); + assertThat(expectedMsg.getFrom()) + .asList() + .containsExactly(new InternetAddress("my-sender@test.com")); + assertThat(expectedMsg.getAllRecipients()) .asList() .containsExactly( new InternetAddress("hello@world.com"), new InternetAddress("hola@mundo.com")); - assertThat(msg.getSubject()).isEqualTo("Domain Registry invoice data 2017-10"); + assertThat(expectedMsg.getSubject()).isEqualTo("Domain Registry invoice data 2017-10"); + assertThat(expectedMsg.getContent()).isInstanceOf(Multipart.class); + Multipart contents = (Multipart) expectedMsg.getContent(); + assertThat(contents.getCount()).isEqualTo(2); + assertThat(contents.getBodyPart(0)).isInstanceOf(BodyPart.class); + BodyPart textPart = contents.getBodyPart(0); + assertThat(textPart.getContentType()).isEqualTo("text/plain; charset=us-ascii"); + assertThat(textPart.getContent().toString()) + .isEqualTo("Attached is the 2017-10 invoice for the domain registry."); + assertThat(contents.getBodyPart(1)).isInstanceOf(BodyPart.class); + BodyPart attachmentPart = contents.getBodyPart(1); + assertThat(attachmentPart.getContentType()) + .isEqualTo("text/csv; charset=utf-8; name=CRR-INV-2017-10.csv"); + assertThat(attachmentPart.getContent().toString()).isEqualTo("test,data\nhello,world"); + } + + @Test + public void testFailure_tooManyRetries_emailsAlert() throws MessagingException, IOException { + // This message throws whenever it tries to set content, to force the overall invoice to fail. + Message throwingMessage = mock(Message.class); + doThrow(new MessagingException("expected")) + .when(throwingMessage) + .setContent(any(Multipart.class)); + when(emailService.createMessage()).thenAnswer( + new Answer() { + private int count = 0; + + @Override + public Message answer(InvocationOnMock invocation) throws Throwable { + // Once we've failed the retry limit for the original invoice, return a normal message + // so we can properly check its contents. + if (count < RETRY_COUNT) { + count++; + return throwingMessage; + } else if (count == RETRY_COUNT) { + return new MimeMessage(Session.getDefaultInstance(new Properties(), null)); + } else { + assertWithMessage("Attempted to generate too many messages!").fail(); + return null; + } + } + } + ); + RuntimeException thrown = + expectThrows(RuntimeException.class, () -> emailUtils.emailOverallInvoice()); + assertThat(thrown).hasMessageThat().isEqualTo("javax.mail.MessagingException: expected"); + // Verify we sent an e-mail alert + verify(emailService).sendMessage(msgCaptor.capture()); + validateAlertMessage(msgCaptor.getValue(), "Emailing invoice failed due to expected"); + } + + @Test + public void testSuccess_sendAlertEmail() throws MessagingException, IOException { + emailUtils.sendAlertEmail("Alert!"); + verify(emailService).sendMessage(msgCaptor.capture()); + validateAlertMessage(msgCaptor.getValue(), "Alert!"); + } + + private void validateAlertMessage(Message msg, String body) + throws MessagingException, IOException { + assertThat(msg.getFrom()).hasLength(1); + assertThat(msg.getFrom()[0]).isEqualTo(new InternetAddress("my-sender@test.com")); + assertThat(msg.getAllRecipients()) + .asList() + .containsExactly(new InternetAddress("my-receiver@test.com")); + assertThat(msg.getSubject()).isEqualTo("Billing Pipeline Alert: 2017-10"); assertThat(msg.getContentType()).isEqualTo("text/plain"); - assertThat(msg.getContent().toString()) - .isEqualTo( - "Link to invoice on GCS:\n" - + "https://storage.cloud.google.com/test-bucket/results/CRR-INV-2017-10.csv"); + assertThat(msg.getContent().toString()).isEqualTo(body); } } diff --git a/javatests/google/registry/billing/CopyDetailReportsActionTest.java b/javatests/google/registry/billing/CopyDetailReportsActionTest.java index 304836ed2..50320478a 100644 --- a/javatests/google/registry/billing/CopyDetailReportsActionTest.java +++ b/javatests/google/registry/billing/CopyDetailReportsActionTest.java @@ -15,6 +15,7 @@ package google.registry.billing; import static com.google.common.truth.Truth.assertThat; +import static com.google.common.truth.Truth.assertWithMessage; import static google.registry.testing.DatastoreHelper.loadRegistrar; import static google.registry.testing.DatastoreHelper.persistResource; import static google.registry.testing.GcsTestingUtils.writeGcsFile; @@ -61,6 +62,7 @@ public class CopyDetailReportsActionTest { private FakeResponse response; private DriveConnection driveConnection; + private BillingEmailUtils emailUtils; private CopyDetailReportsAction action; @Before @@ -68,14 +70,16 @@ public class CopyDetailReportsActionTest { persistResource(loadRegistrar("TheRegistrar").asBuilder().setDriveFolderId("0B-12345").build()); response = new FakeResponse(); driveConnection = mock(DriveConnection.class); + emailUtils = mock(BillingEmailUtils.class); action = new CopyDetailReportsAction( - "gs://test-bucket", + "test-bucket", "results/", driveConnection, gcsUtils, new Retrier(new FakeSleeper(new FakeClock()), 3), - response); + response, + emailUtils); } @Test @@ -156,6 +160,31 @@ public class CopyDetailReportsActionTest { assertThat(response.getPayload()).isEqualTo("Copied detail reports."); } + @Test + public void testFail_tooManyFailures_sendsAlertEmail() throws IOException { + writeGcsFile( + gcsService, + new GcsFilename("test-bucket", "results/invoice_details_2017-10_TheRegistrar_hello.csv"), + "hola,mundo\n3,4".getBytes(UTF_8)); + when(driveConnection.createFile(any(), any(), any(), any())) + .thenThrow(new IOException("expected")); + + try { + action.run(); + assertWithMessage("Expected a runtime exception to be thrown!").fail(); + } catch (RuntimeException e) { + assertThat(e).hasMessageThat().isEqualTo("java.io.IOException: expected"); + } + verify(driveConnection, times(3)) + .createFile( + "invoice_details_2017-10_TheRegistrar_hello.csv", + MediaType.CSV_UTF_8, + "0B-12345", + "hola,mundo\n3,4".getBytes(UTF_8)); + verify(emailUtils).sendAlertEmail("Warning: CopyDetailReportsAction failed.\nEncountered: " + + "expected on file: invoice_details_2017-10_TheRegistrar_hello.csv"); + } + @Test public void testFail_registrarDoesntExist_doesntCopy() throws IOException { writeGcsFile( diff --git a/javatests/google/registry/billing/GenerateInvoicesActionTest.java b/javatests/google/registry/billing/GenerateInvoicesActionTest.java index b8b0e8a2f..d3da38182 100644 --- a/javatests/google/registry/billing/GenerateInvoicesActionTest.java +++ b/javatests/google/registry/billing/GenerateInvoicesActionTest.java @@ -53,6 +53,7 @@ public class GenerateInvoicesActionTest { private Templates templates; private Launch launch; private FakeResponse response; + private BillingEmailUtils emailUtils; private GenerateInvoicesAction action; @Before @@ -61,6 +62,7 @@ public class GenerateInvoicesActionTest { projects = mock(Projects.class); templates = mock(Templates.class); launch = mock(Launch.class); + emailUtils = mock(BillingEmailUtils.class); when(dataflow.projects()).thenReturn(projects); when(projects.templates()).thenReturn(templates); when(templates.launch(any(String.class), any(LaunchTemplateParameters.class))) @@ -72,8 +74,15 @@ public class GenerateInvoicesActionTest { job.setId("12345"); when(launch.execute()).thenReturn(new LaunchTemplateResponse().setJob(job)); - action = new GenerateInvoicesAction( - "test-project", "gs://test-project-beam", new YearMonth(2017, 10), dataflow, response); + action = + new GenerateInvoicesAction( + "test-project", + "gs://test-project-beam", + "gs://test-project-beam/templates/invoicing", + new YearMonth(2017, 10), + dataflow, + response, + emailUtils); } @Test @@ -96,7 +105,8 @@ public class GenerateInvoicesActionTest { new TaskMatcher() .url("/_dr/task/publishInvoices") .method("POST") - .param("jobId", "12345"); + .param("jobId", "12345") + .param("yearMonth", "2017-10"); assertTasksEnqueued("billing", matcher); } @@ -106,5 +116,6 @@ public class GenerateInvoicesActionTest { action.run(); assertThat(response.getStatus()).isEqualTo(500); assertThat(response.getPayload()).isEqualTo("Template launch failed: expected"); + verify(emailUtils).sendAlertEmail("Template Launch failed due to expected"); } } diff --git a/javatests/google/registry/billing/PublishInvoicesActionTest.java b/javatests/google/registry/billing/PublishInvoicesActionTest.java index dae688e6a..d73dfca00 100644 --- a/javatests/google/registry/billing/PublishInvoicesActionTest.java +++ b/javatests/google/registry/billing/PublishInvoicesActionTest.java @@ -34,6 +34,7 @@ import google.registry.testing.AppEngineRule; import google.registry.testing.FakeResponse; import google.registry.testing.TaskQueueHelper.TaskMatcher; import java.io.IOException; +import org.joda.time.YearMonth; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -70,7 +71,8 @@ public class PublishInvoicesActionTest { emailUtils = mock(BillingEmailUtils.class); response = new FakeResponse(); uploadAction = - new PublishInvoicesAction("test-project", "12345", emailUtils, dataflow, response); + new PublishInvoicesAction( + "test-project", "12345", emailUtils, dataflow, response, new YearMonth(2017, 10)); } @Test @@ -78,12 +80,12 @@ public class PublishInvoicesActionTest { expectedJob.setCurrentState("JOB_STATE_DONE"); uploadAction.run(); assertThat(response.getStatus()).isEqualTo(SC_OK); - verify(emailUtils).emailInvoiceLink(); + verify(emailUtils).emailOverallInvoice(); TaskMatcher matcher = new TaskMatcher() .url("/_dr/task/copyDetailReports") .method("POST") - .param("directoryPrefix", "results/"); + .param("yearMonth", "2017-10"); assertTasksEnqueued("retryable-cron-tasks", matcher); } @@ -92,6 +94,7 @@ public class PublishInvoicesActionTest { expectedJob.setCurrentState("JOB_STATE_FAILED"); uploadAction.run(); assertThat(response.getStatus()).isEqualTo(SC_NO_CONTENT); + verify(emailUtils).sendAlertEmail("Dataflow job 12345 ended in status failure."); } @Test @@ -108,5 +111,6 @@ public class PublishInvoicesActionTest { assertThat(response.getStatus()).isEqualTo(SC_INTERNAL_SERVER_ERROR); assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8); assertThat(response.getPayload()).isEqualTo("Template launch failed: expected"); + verify(emailUtils).sendAlertEmail("Publish action failed due to expected"); } }