diff --git a/java/google/registry/beam/InvoicingPipeline.java b/java/google/registry/beam/InvoicingPipeline.java index cc1804399..f47c0a40c 100644 --- a/java/google/registry/beam/InvoicingPipeline.java +++ b/java/google/registry/beam/InvoicingPipeline.java @@ -22,6 +22,7 @@ import org.apache.beam.runners.dataflow.DataflowRunner; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params; import org.apache.beam.sdk.io.FileBasedSink; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; @@ -82,33 +83,26 @@ public class InvoicingPipeline { .usingStandardSql() .withoutValidation()); - billingEvents.apply( - "Write events to separate CSVs keyed by registrarId_tld pair", - TextIO.writeCustomType() - .to( - InvoicingUtils.makeDestinationFunction(beamBucket + "/results"), - InvoicingUtils.makeEmptyDestinationParams(beamBucket + "/results")) - .withFormatFunction(BillingEvent::toCsv) - .withoutSharding() - .withTempDirectory( - FileBasedSink.convertToFileResourceIfPossible(beamBucket + "/temporary")) - .withHeader(BillingEvent.getHeader())); - - billingEvents - .apply("Generate overall invoice rows", new GenerateInvoiceRows()) - .apply( - "Write overall invoice to CSV", - TextIO.write() - .to(beamBucket + "/results/overall_invoice") - .withHeader(InvoiceGroupingKey.invoiceHeader()) - .withoutSharding() - .withSuffix(".csv")); - + applyTerminalTransforms(billingEvents); p.run(); } + /** + * Applies output transforms to the {@code BillingEvent} source collection. + * + *

This is factored out purely to facilitate testing. + */ + void applyTerminalTransforms(PCollection billingEvents) { + billingEvents.apply( + "Write events to separate CSVs keyed by registrarId_tld pair", writeDetailReports()); + + billingEvents + .apply("Generate overall invoice rows", new GenerateInvoiceRows()) + .apply("Write overall invoice to CSV", writeInvoice()); + } + /** Transform that converts a {@code BillingEvent} into an invoice CSV row. */ - static class GenerateInvoiceRows + private static class GenerateInvoiceRows extends PTransform, PCollection> { @Override public PCollection expand(PCollection input) { @@ -125,4 +119,26 @@ public class InvoicingPipeline { .via((KV kv) -> kv.getKey().toCsv(kv.getValue()))); } } + + /** Returns an IO transform that writes the overall invoice to a single CSV file. */ + private TextIO.Write writeInvoice() { + return TextIO.write() + .to(beamBucket + "/results/overall_invoice") + .withHeader(InvoiceGroupingKey.invoiceHeader()) + .withoutSharding() + .withSuffix(".csv"); + } + + /** Returns an IO transform that writes detail reports to registrar-tld keyed CSV files. */ + private TextIO.TypedWrite writeDetailReports() { + return TextIO.writeCustomType() + .to( + InvoicingUtils.makeDestinationFunction(beamBucket + "/results"), + InvoicingUtils.makeEmptyDestinationParams(beamBucket + "/results")) + .withFormatFunction(BillingEvent::toCsv) + .withoutSharding() + .withTempDirectory(FileBasedSink.convertToFileResourceIfPossible(beamBucket + "/temporary")) + .withHeader(BillingEvent.getHeader()) + .withSuffix(".csv"); + } } diff --git a/javatests/google/registry/beam/BUILD b/javatests/google/registry/beam/BUILD index ac0d923cf..c531f78ae 100644 --- a/javatests/google/registry/beam/BUILD +++ b/javatests/google/registry/beam/BUILD @@ -13,6 +13,7 @@ java_library( resources = glob(["testdata/*"]), deps = [ "//java/google/registry/beam", + "//java/google/registry/util", "//javatests/google/registry/testing", "@com_google_apis_google_api_services_bigquery", "@com_google_dagger", @@ -33,6 +34,7 @@ java_library( GenTestRules( name = "GeneratedTestRules", default_test_size = "small", + medium_tests = ["InvoicingPipelineTest.java"], test_files = glob(["*Test.java"]), deps = [":beam"], ) diff --git a/javatests/google/registry/beam/InvoicingPipelineTest.java b/javatests/google/registry/beam/InvoicingPipelineTest.java index 55302dccc..944172e06 100644 --- a/javatests/google/registry/beam/InvoicingPipelineTest.java +++ b/javatests/google/registry/beam/InvoicingPipelineTest.java @@ -14,20 +14,26 @@ package google.registry.beam; +import static com.google.common.truth.Truth.assertThat; + import com.google.common.collect.ImmutableList; -import google.registry.beam.InvoicingPipeline.GenerateInvoiceRows; +import com.google.common.collect.ImmutableMap; +import google.registry.util.ResourceUtils; +import java.io.File; import java.time.ZoneId; import java.time.ZonedDateTime; +import java.util.Map.Entry; import org.apache.beam.runners.direct.DirectRunner; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.PCollection; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -44,79 +50,137 @@ public class InvoicingPipelineTest { } @Rule public final transient TestPipeline p = TestPipeline.fromOptions(pipelineOptions); + @Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); - @Test - public void testGenerateInvoiceRowsFn() throws Exception { - ImmutableList inputRows = + private InvoicingPipeline invoicingPipeline; + + @Before + public void initializePipeline() { + invoicingPipeline = new InvoicingPipeline(); + invoicingPipeline.projectId = "test-project"; + invoicingPipeline.beamBucket = tempFolder.getRoot().getAbsolutePath(); + } + + private ImmutableList getInputEvents() { + return ImmutableList.of( + BillingEvent.create( + 1, + ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")), + ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")), + "theRegistrar", + "234", + "test", + "RENEW", + "mydomain.test", + "REPO-ID", + 3, + "USD", + 20.5, + ""), + BillingEvent.create( + 1, + ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")), + ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")), + "theRegistrar", + "234", + "test", + "RENEW", + "mydomain2.test", + "REPO-ID", + 3, + "USD", + 20.5, + ""), + BillingEvent.create( + 1, + ZonedDateTime.of(2017, 10, 2, 0, 0, 0, 0, ZoneId.of("UTC")), + ZonedDateTime.of(2017, 9, 29, 0, 0, 0, 0, ZoneId.of("UTC")), + "theRegistrar", + "234", + "hello", + "CREATE", + "mydomain3.hello", + "REPO-ID", + 5, + "JPY", + 70.75, + ""), + BillingEvent.create( + 1, + ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")), + ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")), + "googledomains", + "456", + "test", + "RENEW", + "mydomain4.test", + "REPO-ID", + 1, + "USD", + 20.5, + "")); + } + + /** Returns a map from filename to expected contents for detail reports. */ + private ImmutableMap> getExpectedDetailReportMap() { + return ImmutableMap.of( + "theRegistrar_test.csv", ImmutableList.of( - BillingEvent.create( - 1, - ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")), - ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")), - "theRegistrar", - "234", - "test", - "RENEW", - "mydomain.test", - "REPO-ID", - 3, - "USD", - 20.5, - ""), - BillingEvent.create( - 1, - ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")), - ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")), - "theRegistrar", - "234", - "test", - "RENEW", - "mydomain2.test", - "REPO-ID", - 3, - "USD", - 20.5, - ""), - BillingEvent.create( - 1, - ZonedDateTime.of(2017, 10, 2, 0, 0, 0, 0, ZoneId.of("UTC")), - ZonedDateTime.of(2017, 9, 29, 0, 0, 0, 0, ZoneId.of("UTC")), - "theRegistrar", - "234", - "hello", - "CREATE", - "mydomain3.hello", - "REPO-ID", - 5, - "JPY", - 70.75, - ""), - BillingEvent.create( - 1, - ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")), - ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")), - "googledomains", - "456", - "test", - "RENEW", - "mydomain4.test", - "REPO-ID", - 1, - "USD", - 20.5, - "")); + "1,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,theRegistrar,234," + + "test,RENEW,mydomain2.test,REPO-ID,3,USD,20.50,", + "1,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,theRegistrar,234," + + "test,RENEW,mydomain.test,REPO-ID,3,USD,20.50,"), + "theRegistrar_hello.csv", + ImmutableList.of( + "1,2017-10-02 00:00:00 UTC,2017-09-29 00:00:00 UTC,theRegistrar,234," + + "hello,CREATE,mydomain3.hello,REPO-ID,5,JPY,70.75,"), + "googledomains_test.csv", + ImmutableList.of( + "1,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,googledomains,456," + + "test,RENEW,mydomain4.test,REPO-ID,1,USD,20.50,")); + } - PCollection input = p.apply(Create.of(inputRows)); - PCollection output = input.apply(new GenerateInvoiceRows()); - - ImmutableList outputStrings = ImmutableList.of( + private ImmutableList getExpectedInvoiceOutput() { + return ImmutableList.of( "2017-10-01,2020-09-30,234,41.00,USD,10125,1,PURCHASE,theRegistrar - test,2," + "RENEW | TLD: test | TERM: 3-year,20.50,USD,", "2017-10-01,2022-09-30,234,70.75,JPY,10125,1,PURCHASE,theRegistrar - hello,1," + "CREATE | TLD: hello | TERM: 5-year,70.75,JPY,", "2017-10-01,2018-09-30,456,20.50,USD,10125,1,PURCHASE,googledomains - test,1," + "RENEW | TLD: test | TERM: 1-year,20.50,USD,"); - PAssert.that(output).containsInAnyOrder(outputStrings); + } + + @Test + public void testEndToEndPipeline_generatesExpectedFiles() throws Exception { + ImmutableList inputRows = getInputEvents(); + PCollection input = p.apply(Create.of(inputRows)); + invoicingPipeline.applyTerminalTransforms(input); p.run(); + + for (Entry> entry : getExpectedDetailReportMap().entrySet()) { + ImmutableList detailReport = resultFileContents(entry.getKey()); + assertThat(detailReport.get(0)) + .isEqualTo("id,billingTime,eventTime,registrarId,billingId,tld,action," + + "domain,repositoryId,years,currency,amount,flags"); + assertThat(detailReport.subList(1, detailReport.size())) + .containsExactlyElementsIn(entry.getValue()); + } + + ImmutableList overallInvoice = resultFileContents("overall_invoice.csv"); + assertThat(overallInvoice.get(0)) + .isEqualTo( + "StartDate,EndDate,ProductAccountKey,Amount,AmountCurrency,BillingProductCode," + + "SalesChannel,LineItemType,UsageGroupingKey,Quantity,Description,UnitPrice," + + "UnitPriceCurrency,PONumber"); + assertThat(overallInvoice.subList(1, overallInvoice.size())) + .containsExactlyElementsIn(getExpectedInvoiceOutput()); + } + + /** Returns the text contents of a file under the beamBucket/results directory. */ + private ImmutableList resultFileContents(String filename) throws Exception { + File resultFile = + new File(String.format("%s/results/%s", invoicingPipeline.beamBucket, filename)); + return ImmutableList.copyOf( + ResourceUtils.readResourceUtf8(resultFile.toURI().toURL()).split("\n")); } }