mirror of
https://github.com/google/nomulus.git
synced 2025-05-16 17:37:13 +02:00
Add integration test for billing pipeline IO
This creates an end-to-end test that checks for proper billing pipeline IO writes. The only remaining test would be to add a test for the Bigquery query, but see b/70839142 for why I've deemed that more work than worthwhile. ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=179706561
This commit is contained in:
parent
7d165a08cf
commit
eb07768200
3 changed files with 171 additions and 89 deletions
|
@ -22,6 +22,7 @@ import org.apache.beam.runners.dataflow.DataflowRunner;
|
|||
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
|
||||
import org.apache.beam.sdk.Pipeline;
|
||||
import org.apache.beam.sdk.coders.SerializableCoder;
|
||||
import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
|
||||
import org.apache.beam.sdk.io.FileBasedSink;
|
||||
import org.apache.beam.sdk.io.TextIO;
|
||||
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
|
||||
|
@ -82,33 +83,26 @@ public class InvoicingPipeline {
|
|||
.usingStandardSql()
|
||||
.withoutValidation());
|
||||
|
||||
billingEvents.apply(
|
||||
"Write events to separate CSVs keyed by registrarId_tld pair",
|
||||
TextIO.<BillingEvent>writeCustomType()
|
||||
.to(
|
||||
InvoicingUtils.makeDestinationFunction(beamBucket + "/results"),
|
||||
InvoicingUtils.makeEmptyDestinationParams(beamBucket + "/results"))
|
||||
.withFormatFunction(BillingEvent::toCsv)
|
||||
.withoutSharding()
|
||||
.withTempDirectory(
|
||||
FileBasedSink.convertToFileResourceIfPossible(beamBucket + "/temporary"))
|
||||
.withHeader(BillingEvent.getHeader()));
|
||||
|
||||
billingEvents
|
||||
.apply("Generate overall invoice rows", new GenerateInvoiceRows())
|
||||
.apply(
|
||||
"Write overall invoice to CSV",
|
||||
TextIO.write()
|
||||
.to(beamBucket + "/results/overall_invoice")
|
||||
.withHeader(InvoiceGroupingKey.invoiceHeader())
|
||||
.withoutSharding()
|
||||
.withSuffix(".csv"));
|
||||
|
||||
applyTerminalTransforms(billingEvents);
|
||||
p.run();
|
||||
}
|
||||
|
||||
/**
|
||||
* Applies output transforms to the {@code BillingEvent} source collection.
|
||||
*
|
||||
* <p>This is factored out purely to facilitate testing.
|
||||
*/
|
||||
void applyTerminalTransforms(PCollection<BillingEvent> billingEvents) {
|
||||
billingEvents.apply(
|
||||
"Write events to separate CSVs keyed by registrarId_tld pair", writeDetailReports());
|
||||
|
||||
billingEvents
|
||||
.apply("Generate overall invoice rows", new GenerateInvoiceRows())
|
||||
.apply("Write overall invoice to CSV", writeInvoice());
|
||||
}
|
||||
|
||||
/** Transform that converts a {@code BillingEvent} into an invoice CSV row. */
|
||||
static class GenerateInvoiceRows
|
||||
private static class GenerateInvoiceRows
|
||||
extends PTransform<PCollection<BillingEvent>, PCollection<String>> {
|
||||
@Override
|
||||
public PCollection<String> expand(PCollection<BillingEvent> input) {
|
||||
|
@ -125,4 +119,26 @@ public class InvoicingPipeline {
|
|||
.via((KV<InvoiceGroupingKey, Long> kv) -> kv.getKey().toCsv(kv.getValue())));
|
||||
}
|
||||
}
|
||||
|
||||
/** Returns an IO transform that writes the overall invoice to a single CSV file. */
|
||||
private TextIO.Write writeInvoice() {
|
||||
return TextIO.write()
|
||||
.to(beamBucket + "/results/overall_invoice")
|
||||
.withHeader(InvoiceGroupingKey.invoiceHeader())
|
||||
.withoutSharding()
|
||||
.withSuffix(".csv");
|
||||
}
|
||||
|
||||
/** Returns an IO transform that writes detail reports to registrar-tld keyed CSV files. */
|
||||
private TextIO.TypedWrite<BillingEvent, Params> writeDetailReports() {
|
||||
return TextIO.<BillingEvent>writeCustomType()
|
||||
.to(
|
||||
InvoicingUtils.makeDestinationFunction(beamBucket + "/results"),
|
||||
InvoicingUtils.makeEmptyDestinationParams(beamBucket + "/results"))
|
||||
.withFormatFunction(BillingEvent::toCsv)
|
||||
.withoutSharding()
|
||||
.withTempDirectory(FileBasedSink.convertToFileResourceIfPossible(beamBucket + "/temporary"))
|
||||
.withHeader(BillingEvent.getHeader())
|
||||
.withSuffix(".csv");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@ java_library(
|
|||
resources = glob(["testdata/*"]),
|
||||
deps = [
|
||||
"//java/google/registry/beam",
|
||||
"//java/google/registry/util",
|
||||
"//javatests/google/registry/testing",
|
||||
"@com_google_apis_google_api_services_bigquery",
|
||||
"@com_google_dagger",
|
||||
|
@ -33,6 +34,7 @@ java_library(
|
|||
GenTestRules(
|
||||
name = "GeneratedTestRules",
|
||||
default_test_size = "small",
|
||||
medium_tests = ["InvoicingPipelineTest.java"],
|
||||
test_files = glob(["*Test.java"]),
|
||||
deps = [":beam"],
|
||||
)
|
||||
|
|
|
@ -14,20 +14,26 @@
|
|||
|
||||
package google.registry.beam;
|
||||
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import google.registry.beam.InvoicingPipeline.GenerateInvoiceRows;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import google.registry.util.ResourceUtils;
|
||||
import java.io.File;
|
||||
import java.time.ZoneId;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.Map.Entry;
|
||||
import org.apache.beam.runners.direct.DirectRunner;
|
||||
import org.apache.beam.sdk.options.PipelineOptions;
|
||||
import org.apache.beam.sdk.options.PipelineOptionsFactory;
|
||||
import org.apache.beam.sdk.testing.PAssert;
|
||||
import org.apache.beam.sdk.testing.TestPipeline;
|
||||
import org.apache.beam.sdk.transforms.Create;
|
||||
import org.apache.beam.sdk.values.PCollection;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.JUnit4;
|
||||
|
||||
|
@ -44,79 +50,137 @@ public class InvoicingPipelineTest {
|
|||
}
|
||||
|
||||
@Rule public final transient TestPipeline p = TestPipeline.fromOptions(pipelineOptions);
|
||||
@Rule public final TemporaryFolder tempFolder = new TemporaryFolder();
|
||||
|
||||
@Test
|
||||
public void testGenerateInvoiceRowsFn() throws Exception {
|
||||
ImmutableList<BillingEvent> inputRows =
|
||||
private InvoicingPipeline invoicingPipeline;
|
||||
|
||||
@Before
|
||||
public void initializePipeline() {
|
||||
invoicingPipeline = new InvoicingPipeline();
|
||||
invoicingPipeline.projectId = "test-project";
|
||||
invoicingPipeline.beamBucket = tempFolder.getRoot().getAbsolutePath();
|
||||
}
|
||||
|
||||
private ImmutableList<BillingEvent> getInputEvents() {
|
||||
return ImmutableList.of(
|
||||
BillingEvent.create(
|
||||
1,
|
||||
ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
|
||||
ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
|
||||
"theRegistrar",
|
||||
"234",
|
||||
"test",
|
||||
"RENEW",
|
||||
"mydomain.test",
|
||||
"REPO-ID",
|
||||
3,
|
||||
"USD",
|
||||
20.5,
|
||||
""),
|
||||
BillingEvent.create(
|
||||
1,
|
||||
ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
|
||||
ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
|
||||
"theRegistrar",
|
||||
"234",
|
||||
"test",
|
||||
"RENEW",
|
||||
"mydomain2.test",
|
||||
"REPO-ID",
|
||||
3,
|
||||
"USD",
|
||||
20.5,
|
||||
""),
|
||||
BillingEvent.create(
|
||||
1,
|
||||
ZonedDateTime.of(2017, 10, 2, 0, 0, 0, 0, ZoneId.of("UTC")),
|
||||
ZonedDateTime.of(2017, 9, 29, 0, 0, 0, 0, ZoneId.of("UTC")),
|
||||
"theRegistrar",
|
||||
"234",
|
||||
"hello",
|
||||
"CREATE",
|
||||
"mydomain3.hello",
|
||||
"REPO-ID",
|
||||
5,
|
||||
"JPY",
|
||||
70.75,
|
||||
""),
|
||||
BillingEvent.create(
|
||||
1,
|
||||
ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
|
||||
ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
|
||||
"googledomains",
|
||||
"456",
|
||||
"test",
|
||||
"RENEW",
|
||||
"mydomain4.test",
|
||||
"REPO-ID",
|
||||
1,
|
||||
"USD",
|
||||
20.5,
|
||||
""));
|
||||
}
|
||||
|
||||
/** Returns a map from filename to expected contents for detail reports. */
|
||||
private ImmutableMap<String, ImmutableList<String>> getExpectedDetailReportMap() {
|
||||
return ImmutableMap.of(
|
||||
"theRegistrar_test.csv",
|
||||
ImmutableList.of(
|
||||
BillingEvent.create(
|
||||
1,
|
||||
ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
|
||||
ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
|
||||
"theRegistrar",
|
||||
"234",
|
||||
"test",
|
||||
"RENEW",
|
||||
"mydomain.test",
|
||||
"REPO-ID",
|
||||
3,
|
||||
"USD",
|
||||
20.5,
|
||||
""),
|
||||
BillingEvent.create(
|
||||
1,
|
||||
ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
|
||||
ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
|
||||
"theRegistrar",
|
||||
"234",
|
||||
"test",
|
||||
"RENEW",
|
||||
"mydomain2.test",
|
||||
"REPO-ID",
|
||||
3,
|
||||
"USD",
|
||||
20.5,
|
||||
""),
|
||||
BillingEvent.create(
|
||||
1,
|
||||
ZonedDateTime.of(2017, 10, 2, 0, 0, 0, 0, ZoneId.of("UTC")),
|
||||
ZonedDateTime.of(2017, 9, 29, 0, 0, 0, 0, ZoneId.of("UTC")),
|
||||
"theRegistrar",
|
||||
"234",
|
||||
"hello",
|
||||
"CREATE",
|
||||
"mydomain3.hello",
|
||||
"REPO-ID",
|
||||
5,
|
||||
"JPY",
|
||||
70.75,
|
||||
""),
|
||||
BillingEvent.create(
|
||||
1,
|
||||
ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
|
||||
ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
|
||||
"googledomains",
|
||||
"456",
|
||||
"test",
|
||||
"RENEW",
|
||||
"mydomain4.test",
|
||||
"REPO-ID",
|
||||
1,
|
||||
"USD",
|
||||
20.5,
|
||||
""));
|
||||
"1,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,theRegistrar,234,"
|
||||
+ "test,RENEW,mydomain2.test,REPO-ID,3,USD,20.50,",
|
||||
"1,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,theRegistrar,234,"
|
||||
+ "test,RENEW,mydomain.test,REPO-ID,3,USD,20.50,"),
|
||||
"theRegistrar_hello.csv",
|
||||
ImmutableList.of(
|
||||
"1,2017-10-02 00:00:00 UTC,2017-09-29 00:00:00 UTC,theRegistrar,234,"
|
||||
+ "hello,CREATE,mydomain3.hello,REPO-ID,5,JPY,70.75,"),
|
||||
"googledomains_test.csv",
|
||||
ImmutableList.of(
|
||||
"1,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,googledomains,456,"
|
||||
+ "test,RENEW,mydomain4.test,REPO-ID,1,USD,20.50,"));
|
||||
}
|
||||
|
||||
PCollection<BillingEvent> input = p.apply(Create.of(inputRows));
|
||||
PCollection<String> output = input.apply(new GenerateInvoiceRows());
|
||||
|
||||
ImmutableList<String> outputStrings = ImmutableList.of(
|
||||
private ImmutableList<String> getExpectedInvoiceOutput() {
|
||||
return ImmutableList.of(
|
||||
"2017-10-01,2020-09-30,234,41.00,USD,10125,1,PURCHASE,theRegistrar - test,2,"
|
||||
+ "RENEW | TLD: test | TERM: 3-year,20.50,USD,",
|
||||
"2017-10-01,2022-09-30,234,70.75,JPY,10125,1,PURCHASE,theRegistrar - hello,1,"
|
||||
+ "CREATE | TLD: hello | TERM: 5-year,70.75,JPY,",
|
||||
"2017-10-01,2018-09-30,456,20.50,USD,10125,1,PURCHASE,googledomains - test,1,"
|
||||
+ "RENEW | TLD: test | TERM: 1-year,20.50,USD,");
|
||||
PAssert.that(output).containsInAnyOrder(outputStrings);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEndToEndPipeline_generatesExpectedFiles() throws Exception {
|
||||
ImmutableList<BillingEvent> inputRows = getInputEvents();
|
||||
PCollection<BillingEvent> input = p.apply(Create.of(inputRows));
|
||||
invoicingPipeline.applyTerminalTransforms(input);
|
||||
p.run();
|
||||
|
||||
for (Entry<String, ImmutableList<String>> entry : getExpectedDetailReportMap().entrySet()) {
|
||||
ImmutableList<String> detailReport = resultFileContents(entry.getKey());
|
||||
assertThat(detailReport.get(0))
|
||||
.isEqualTo("id,billingTime,eventTime,registrarId,billingId,tld,action,"
|
||||
+ "domain,repositoryId,years,currency,amount,flags");
|
||||
assertThat(detailReport.subList(1, detailReport.size()))
|
||||
.containsExactlyElementsIn(entry.getValue());
|
||||
}
|
||||
|
||||
ImmutableList<String> overallInvoice = resultFileContents("overall_invoice.csv");
|
||||
assertThat(overallInvoice.get(0))
|
||||
.isEqualTo(
|
||||
"StartDate,EndDate,ProductAccountKey,Amount,AmountCurrency,BillingProductCode,"
|
||||
+ "SalesChannel,LineItemType,UsageGroupingKey,Quantity,Description,UnitPrice,"
|
||||
+ "UnitPriceCurrency,PONumber");
|
||||
assertThat(overallInvoice.subList(1, overallInvoice.size()))
|
||||
.containsExactlyElementsIn(getExpectedInvoiceOutput());
|
||||
}
|
||||
|
||||
/** Returns the text contents of a file under the beamBucket/results directory. */
|
||||
private ImmutableList<String> resultFileContents(String filename) throws Exception {
|
||||
File resultFile =
|
||||
new File(String.format("%s/results/%s", invoicingPipeline.beamBucket, filename));
|
||||
return ImmutableList.copyOf(
|
||||
ResourceUtils.readResourceUtf8(resultFile.toURI().toURL()).split("\n"));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue