From c7d44a0ee4753fa35ba7439d4a59f261d089de21 Mon Sep 17 00:00:00 2001 From: sarahcaseybot Date: Fri, 6 Aug 2021 15:56:04 -0400 Subject: [PATCH] Migrate invoicing pipeline to read from Cloud SQL (#1220) * Save entities to Cloud SQL for tests * Fix merge conflict * Filter out non-real registrars and non-invoicing TLDs * Add 1 month filter * Handle cancellations * Add to pipeline * Use database in pipeline * fix formatting * Add a full pipeline test * Fix repo ids in tests * Move query to separate file * Remove unused variables * Remove unnecessary debugging remnant * Reformat sql file * Add jpql issue description * Use DateTimeUtils * Fix license header year * Fix SQL formatting * Use regex pattern * Fix string building * Add test for makeCloudSqlQuery * Add clarifying comment --- .../google/registry/util/DateTimeUtils.java | 9 + .../beam/invoicing/InvoicingPipeline.java | 84 +++- .../invoicing/InvoicingPipelineOptions.java | 5 + .../billing/GenerateInvoicesAction.java | 12 + .../sql/cloud_sql_billing_events.sql | 39 ++ .../beam/invoicing_pipeline_metadata.json | 8 + .../beam/invoicing/InvoicingPipelineTest.java | 398 +++++++++++++++++- .../billing/GenerateInvoicesActionTest.java | 3 + 8 files changed, 531 insertions(+), 27 deletions(-) create mode 100644 core/src/main/resources/google/registry/beam/invoicing/sql/cloud_sql_billing_events.sql diff --git a/common/src/main/java/google/registry/util/DateTimeUtils.java b/common/src/main/java/google/registry/util/DateTimeUtils.java index ca18dfece..a3542e7ff 100644 --- a/common/src/main/java/google/registry/util/DateTimeUtils.java +++ b/common/src/main/java/google/registry/util/DateTimeUtils.java @@ -100,6 +100,15 @@ public class DateTimeUtils { return ZonedDateTime.ofInstant(instant, ZoneId.of(dateTime.getZone().getID()).normalized()); } + /** + * Converts a Joda {@link DateTime} object to an equivalent java.time {@link ZonedDateTime} + * object. + */ + public static ZonedDateTime toZonedDateTime(DateTime dateTime, ZoneId zoneId) { + java.time.Instant instant = java.time.Instant.ofEpochMilli(dateTime.getMillis()); + return ZonedDateTime.ofInstant(instant, zoneId); + } + /** * Converts a java.time {@link ZonedDateTime} object to an equivalent Joda {@link DateTime} * object. diff --git a/core/src/main/java/google/registry/beam/invoicing/InvoicingPipeline.java b/core/src/main/java/google/registry/beam/invoicing/InvoicingPipeline.java index 7806caa51..b2b169d66 100644 --- a/core/src/main/java/google/registry/beam/invoicing/InvoicingPipeline.java +++ b/core/src/main/java/google/registry/beam/invoicing/InvoicingPipeline.java @@ -14,18 +14,29 @@ package google.registry.beam.invoicing; +import static com.google.common.collect.ImmutableSet.toImmutableSet; import static google.registry.beam.BeamUtils.getQueryFromFile; import static org.apache.beam.sdk.values.TypeDescriptors.strings; +import google.registry.beam.common.RegistryJpaIO; +import google.registry.beam.common.RegistryJpaIO.Read; import google.registry.beam.invoicing.BillingEvent.InvoiceGroupingKey; import google.registry.beam.invoicing.BillingEvent.InvoiceGroupingKey.InvoiceGroupingKeyCoder; +import google.registry.model.billing.BillingEvent.Flag; +import google.registry.model.registrar.Registrar; +import google.registry.persistence.PersistenceModule.TransactionIsolationLevel; import google.registry.reporting.billing.BillingModule; +import google.registry.util.DateTimeUtils; +import google.registry.util.DomainNameUtils; import google.registry.util.SqlTemplate; import java.io.Serializable; import java.time.LocalDateTime; import java.time.LocalTime; import java.time.YearMonth; +import java.time.ZoneId; import java.time.format.DateTimeFormatter; +import java.util.Optional; +import java.util.regex.Pattern; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.SerializableCoder; @@ -58,6 +69,9 @@ public class InvoicingPipeline implements Serializable { private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"); + private static final Pattern SQL_COMMENT_REGEX = + Pattern.compile("^\\s*--.*\\n", Pattern.MULTILINE); + private final InvoicingPipelineOptions options; InvoicingPipeline(InvoicingPipelineOptions options) { @@ -71,21 +85,60 @@ public class InvoicingPipeline implements Serializable { } void setupPipeline(Pipeline pipeline) { + options.setIsolationOverride(TransactionIsolationLevel.TRANSACTION_READ_COMMITTED); PCollection billingEvents = - pipeline.apply( - "Read BillingEvents from Bigquery", - BigQueryIO.read(BillingEvent::parseFromRecord) - .fromQuery(makeQuery(options.getYearMonth(), options.getProject())) - .withCoder(SerializableCoder.of(BillingEvent.class)) - .usingStandardSql() - .withoutValidation() - .withTemplateCompatibility()); + options.getDatabase().equals("DATASTORE") + ? readFromBigQuery(options, pipeline) + : readFromCloudSql(options, pipeline); saveInvoiceCsv(billingEvents, options); saveDetailedCsv(billingEvents, options); } + static PCollection readFromBigQuery( + InvoicingPipelineOptions options, Pipeline pipeline) { + return pipeline.apply( + "Read BillingEvents from Bigquery", + BigQueryIO.read(BillingEvent::parseFromRecord) + .fromQuery(makeQuery(options.getYearMonth(), options.getProject())) + .withCoder(SerializableCoder.of(BillingEvent.class)) + .usingStandardSql() + .withoutValidation() + .withTemplateCompatibility()); + } + + static PCollection readFromCloudSql( + InvoicingPipelineOptions options, Pipeline pipeline) { + Read read = + RegistryJpaIO.read( + makeCloudSqlQuery(options.getYearMonth()), false, InvoicingPipeline::parseRow); + + return pipeline.apply("Read BillingEvents from Cloud SQL", read); + } + + private static BillingEvent parseRow(Object[] row) { + google.registry.model.billing.BillingEvent.OneTime oneTime = + (google.registry.model.billing.BillingEvent.OneTime) row[0]; + Registrar registrar = (Registrar) row[1]; + return BillingEvent.create( + oneTime.getId(), + DateTimeUtils.toZonedDateTime(oneTime.getBillingTime(), ZoneId.of("UTC")), + DateTimeUtils.toZonedDateTime(oneTime.getEventTime(), ZoneId.of("UTC")), + registrar.getClientId(), + registrar.getBillingIdentifier().toString(), + registrar.getPoNumber().orElse(""), + DomainNameUtils.getTldFromDomainName(oneTime.getTargetId()), + oneTime.getReason().toString(), + oneTime.getTargetId(), + oneTime.getDomainRepoId(), + Optional.ofNullable(oneTime.getPeriodYears()).orElse(0), + oneTime.getCost().getCurrencyUnit().toString(), + oneTime.getCost().getAmount().doubleValue(), + String.join( + " ", oneTime.getFlags().stream().map(Flag::toString).collect(toImmutableSet()))); + } + /** Transform that converts a {@code BillingEvent} into an invoice CSV row. */ private static class GenerateInvoiceRows extends PTransform, PCollection> { @@ -171,6 +224,21 @@ public class InvoicingPipeline implements Serializable { .build(); } + /** Create the Cloud SQL query for a given yearMonth at runtime. */ + static String makeCloudSqlQuery(String yearMonth) { + YearMonth endMonth = YearMonth.parse(yearMonth).plusMonths(1); + String queryWithComments = + SqlTemplate.create( + getQueryFromFile(InvoicingPipeline.class, "cloud_sql_billing_events.sql")) + .put("FIRST_TIMESTAMP_OF_MONTH", yearMonth.concat("-01")) + .put( + "LAST_TIMESTAMP_OF_MONTH", + String.format("%d-%d-01", endMonth.getYear(), endMonth.getMonthValue())) + .build(); + // Remove the comments from the query string + return SQL_COMMENT_REGEX.matcher(queryWithComments).replaceAll(""); + } + public static void main(String[] args) { PipelineOptionsFactory.register(InvoicingPipelineOptions.class); InvoicingPipelineOptions options = diff --git a/core/src/main/java/google/registry/beam/invoicing/InvoicingPipelineOptions.java b/core/src/main/java/google/registry/beam/invoicing/InvoicingPipelineOptions.java index ff4d5a69d..9a9fab709 100644 --- a/core/src/main/java/google/registry/beam/invoicing/InvoicingPipelineOptions.java +++ b/core/src/main/java/google/registry/beam/invoicing/InvoicingPipelineOptions.java @@ -30,6 +30,11 @@ public interface InvoicingPipelineOptions extends RegistryPipelineOptions { void setInvoiceFilePrefix(String value); + @Description("The database to read data from.") + String getDatabase(); + + void setDatabase(String value); + @Description("The GCS bucket URL for invoices and detailed reports to be uploaded.") String getBillingBucketUrl(); diff --git a/core/src/main/java/google/registry/reporting/billing/GenerateInvoicesAction.java b/core/src/main/java/google/registry/reporting/billing/GenerateInvoicesAction.java index a6770ca2e..1d7c6d146 100644 --- a/core/src/main/java/google/registry/reporting/billing/GenerateInvoicesAction.java +++ b/core/src/main/java/google/registry/reporting/billing/GenerateInvoicesAction.java @@ -15,6 +15,8 @@ package google.registry.reporting.billing; import static google.registry.beam.BeamUtils.createJobName; +import static google.registry.persistence.transaction.TransactionManagerFactory.tm; +import static google.registry.reporting.ReportingModule.DATABASE; import static google.registry.reporting.ReportingUtils.enqueueBeamReportingTask; import static google.registry.reporting.billing.BillingModule.PARAM_SHOULD_PUBLISH; import static google.registry.request.Action.Method.POST; @@ -70,6 +72,7 @@ public class GenerateInvoicesAction implements Runnable { private final Clock clock; private final Response response; private final Dataflow dataflow; + private final String database; @Inject GenerateInvoicesAction( @@ -79,6 +82,7 @@ public class GenerateInvoicesAction implements Runnable { @Config("billingBucketUrl") String billingBucketUrl, @Config("invoiceFilePrefix") String invoiceFilePrefix, @Parameter(PARAM_SHOULD_PUBLISH) boolean shouldPublish, + @Parameter(DATABASE) String database, YearMonth yearMonth, BillingEmailUtils emailUtils, Clock clock, @@ -87,9 +91,15 @@ public class GenerateInvoicesAction implements Runnable { this.projectId = projectId; this.jobRegion = jobRegion; this.stagingBucketUrl = stagingBucketUrl; + // When generating the invoices using Cloud SQL before database cutover, save the reports in a + // separate bucket so that it does not overwrite the Datastore invoices. + if (tm().isOfy() && database.equals("CLOUD_SQL")) { + billingBucketUrl = billingBucketUrl.concat("-sql"); + } this.billingBucketUrl = billingBucketUrl; this.invoiceFilePrefix = invoiceFilePrefix; this.shouldPublish = shouldPublish; + this.database = database; this.yearMonth = yearMonth; this.emailUtils = emailUtils; this.clock = clock; @@ -113,6 +123,8 @@ public class GenerateInvoicesAction implements Runnable { yearMonth.toString("yyyy-MM"), "invoiceFilePrefix", invoiceFilePrefix, + "database", + database, "billingBucketUrl", billingBucketUrl)); LaunchFlexTemplateResponse launchResponse = diff --git a/core/src/main/resources/google/registry/beam/invoicing/sql/cloud_sql_billing_events.sql b/core/src/main/resources/google/registry/beam/invoicing/sql/cloud_sql_billing_events.sql new file mode 100644 index 000000000..c2549f384 --- /dev/null +++ b/core/src/main/resources/google/registry/beam/invoicing/sql/cloud_sql_billing_events.sql @@ -0,0 +1,39 @@ +-- Copyright 2021 The Nomulus Authors. All Rights Reserved. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +-- This query gathers all non-canceled billing events for a given +-- YEAR_MONTH in yyyy-MM format. + +-- This query differs from the one for BigQuery because JPQL does not have all +-- the same functionality as a native query. Since there is no JPQL equivalent +-- for REGEXP_EXTRACT the tld cannot be extracted from the targetId in the +-- BillingEvent table. This requires an additional join with the domain table in +-- order to find the value for the TLD associated with the billing event. Also, +-- JPQL does not allow subqueries in the SELECT and FROM clauses. This prevents +-- the query from being able to filter out certain data early in the query to +-- reduce the size of intermediate results. It may be useful to measure this +-- query's performance and consider switching to using a native query. + +SELECT b, r FROM BillingEvent b +JOIN Registrar r ON b.clientId = r.clientIdentifier +JOIN Domain d ON b.domainRepoId = d.repoId +JOIN Tld t ON t.tldStrId = d.tld +LEFT JOIN BillingCancellation c ON b.id = c.refOneTime.billingId +LEFT JOIN BillingCancellation cr ON b.cancellationMatchingBillingEvent = cr.refRecurring.billingId +WHERE r.billingIdentifier IS NOT NULL +AND r.type = 'REAL' +AND t.invoicingEnabled IS TRUE +AND b.billingTime BETWEEN CAST('%FIRST_TIMESTAMP_OF_MONTH%' AS timestamp) AND CAST('%LAST_TIMESTAMP_OF_MONTH%' AS timestamp) +AND c.id IS NULL +AND cr.id IS NULL diff --git a/core/src/main/resources/google/registry/beam/invoicing_pipeline_metadata.json b/core/src/main/resources/google/registry/beam/invoicing_pipeline_metadata.json index 2b709cbef..971e8112f 100644 --- a/core/src/main/resources/google/registry/beam/invoicing_pipeline_metadata.json +++ b/core/src/main/resources/google/registry/beam/invoicing_pipeline_metadata.json @@ -61,6 +61,14 @@ "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ] + }, + { + "name": "database", + "label": "Database to read from.", + "helpText": "DATASTORE or CLOUD_SQL.", + "regexes": [ + "^DATASTORE|CLOUD_SQL$" + ] } ] } diff --git a/core/src/test/java/google/registry/beam/invoicing/InvoicingPipelineTest.java b/core/src/test/java/google/registry/beam/invoicing/InvoicingPipelineTest.java index 821df35c7..0acc2be4e 100644 --- a/core/src/test/java/google/registry/beam/invoicing/InvoicingPipelineTest.java +++ b/core/src/test/java/google/registry/beam/invoicing/InvoicingPipelineTest.java @@ -14,11 +14,39 @@ package google.registry.beam.invoicing; +import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.truth.Truth.assertThat; +import static google.registry.model.registry.Registry.TldState.GENERAL_AVAILABILITY; +import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; +import static google.registry.persistence.transaction.TransactionManagerFactory.removeTmOverrideForTest; +import static google.registry.persistence.transaction.TransactionManagerFactory.setTmForTest; +import static google.registry.testing.DatabaseHelper.createTld; +import static google.registry.testing.DatabaseHelper.newRegistry; +import static google.registry.testing.DatabaseHelper.persistActiveDomain; +import static google.registry.testing.DatabaseHelper.persistNewRegistrar; +import static google.registry.testing.DatabaseHelper.persistResource; +import static google.registry.util.DateTimeUtils.END_OF_TIME; +import static google.registry.util.DateTimeUtils.START_OF_TIME; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ImmutableSortedMap; import google.registry.beam.TestPipelineExtension; +import google.registry.model.billing.BillingEvent.Cancellation; +import google.registry.model.billing.BillingEvent.Flag; +import google.registry.model.billing.BillingEvent.OneTime; +import google.registry.model.billing.BillingEvent.Reason; +import google.registry.model.billing.BillingEvent.Recurring; +import google.registry.model.domain.DomainBase; +import google.registry.model.domain.DomainHistory; +import google.registry.model.registrar.Registrar; +import google.registry.model.registry.Registry; +import google.registry.model.reporting.HistoryEntry; +import google.registry.persistence.transaction.JpaTestRules; +import google.registry.persistence.transaction.JpaTestRules.JpaIntegrationTestExtension; +import google.registry.testing.DatastoreEntityExtension; +import google.registry.testing.FakeClock; import google.registry.testing.TestDataHelper; import google.registry.util.ResourceUtils; import java.io.File; @@ -26,12 +54,22 @@ import java.nio.file.Files; import java.nio.file.Path; import java.time.ZoneId; import java.time.ZonedDateTime; +import java.util.Arrays; import java.util.Map.Entry; +import java.util.Optional; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.joda.money.CurrencyUnit; +import org.joda.money.Money; +import org.joda.time.DateTime; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.api.io.TempDir; @@ -61,7 +99,7 @@ class InvoicingPipelineTest { 20.5, ""), BillingEvent.create( - 1, + 2, ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")), ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")), "theRegistrar", @@ -76,7 +114,7 @@ class InvoicingPipelineTest { 20.5, ""), BillingEvent.create( - 1, + 3, ZonedDateTime.of(2017, 10, 2, 0, 0, 0, 0, ZoneId.of("UTC")), ZonedDateTime.of(2017, 9, 29, 0, 0, 0, 0, ZoneId.of("UTC")), "theRegistrar", @@ -88,10 +126,10 @@ class InvoicingPipelineTest { "REPO-ID", 5, "JPY", - 70.75, + 70.0, ""), BillingEvent.create( - 1, + 4, ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")), ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")), "bestdomains", @@ -106,7 +144,7 @@ class InvoicingPipelineTest { 20.5, ""), BillingEvent.create( - 1, + 5, ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")), ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")), "anotherRegistrar", @@ -118,10 +156,10 @@ class InvoicingPipelineTest { "REPO-ID", 1, "USD", - 0, + 0.0, "SUNRISE ANCHOR_TENANT"), BillingEvent.create( - 1, + 6, ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")), ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")), "theRegistrar", @@ -133,10 +171,10 @@ class InvoicingPipelineTest { "REPO-ID", 0, "USD", - 0, + 0.0, ""), BillingEvent.create( - 1, + 7, ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")), ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")), "theRegistrar", @@ -148,49 +186,57 @@ class InvoicingPipelineTest { "REPO-ID", 0, "USD", - 20, + 20.0, "")); private static final ImmutableMap> EXPECTED_DETAILED_REPORT_MAP = ImmutableMap.of( "invoice_details_2017-10_theRegistrar_test.csv", ImmutableList.of( - "1,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,theRegistrar,234,," + "2,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,theRegistrar,234,," + "test,RENEW,mydomain2.test,REPO-ID,3,USD,20.50,", "1,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,theRegistrar,234,," + "test,RENEW,mydomain.test,REPO-ID,3,USD,20.50,", - "1,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,theRegistrar,234,," + "7,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,theRegistrar,234,," + "test,SERVER_STATUS,update-prohibited.test,REPO-ID,0,USD,20.00,", - "1,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,theRegistrar,234,," + "6,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,theRegistrar,234,," + "test,SERVER_STATUS,locked.test,REPO-ID,0,USD,0.00,"), "invoice_details_2017-10_theRegistrar_hello.csv", ImmutableList.of( - "1,2017-10-02 00:00:00 UTC,2017-09-29 00:00:00 UTC,theRegistrar,234,," - + "hello,CREATE,mydomain3.hello,REPO-ID,5,JPY,70.75,"), + "3,2017-10-02 00:00:00 UTC,2017-09-29 00:00:00 UTC,theRegistrar,234,," + + "hello,CREATE,mydomain3.hello,REPO-ID,5,JPY,70.00,"), "invoice_details_2017-10_bestdomains_test.csv", ImmutableList.of( - "1,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,bestdomains,456,116688," + "4,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,bestdomains,456,116688," + "test,RENEW,mydomain4.test,REPO-ID,1,USD,20.50,"), "invoice_details_2017-10_anotherRegistrar_test.csv", ImmutableList.of( - "1,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,anotherRegistrar,789,," + "5,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,anotherRegistrar,789,," + "test,CREATE,mydomain5.test,REPO-ID,1,USD,0.00,SUNRISE ANCHOR_TENANT")); private static final ImmutableList EXPECTED_INVOICE_OUTPUT = ImmutableList.of( "2017-10-01,2020-09-30,234,41.00,USD,10125,1,PURCHASE,theRegistrar - test,2," + "RENEW | TLD: test | TERM: 3-year,20.50,USD,", - "2017-10-01,2022-09-30,234,70.75,JPY,10125,1,PURCHASE,theRegistrar - hello,1," - + "CREATE | TLD: hello | TERM: 5-year,70.75,JPY,", + "2017-10-01,2022-09-30,234,70.00,JPY,10125,1,PURCHASE,theRegistrar - hello,1," + + "CREATE | TLD: hello | TERM: 5-year,70.00,JPY,", "2017-10-01,,234,20.00,USD,10125,1,PURCHASE,theRegistrar - test,1," + "SERVER_STATUS | TLD: test | TERM: 0-year,20.00,USD,", "2017-10-01,2018-09-30,456,20.50,USD,10125,1,PURCHASE,bestdomains - test,1," + "RENEW | TLD: test | TERM: 1-year,20.50,USD,116688"); + @RegisterExtension + @Order(Order.DEFAULT - 1) + final transient DatastoreEntityExtension datastore = new DatastoreEntityExtension(); + @RegisterExtension final TestPipelineExtension pipeline = TestPipelineExtension.create().enableAbandonedNodeEnforcement(true); + @RegisterExtension + final JpaIntegrationTestExtension database = + new JpaTestRules.Builder().withClock(new FakeClock()).buildIntegrationTestRule(); + @TempDir Path tmpDir; private final InvoicingPipelineOptions options = @@ -218,6 +264,37 @@ class InvoicingPipelineTest { pipeline.run(); } + @Test + void testSuccess_fullSqlPipeline() throws Exception { + setTmForTest(jpaTm()); + setupCloudSql(); + options.setDatabase("CLOUD_SQL"); + InvoicingPipeline invoicingPipeline = new InvoicingPipeline(options); + invoicingPipeline.setupPipeline(pipeline); + pipeline.run(options).waitUntilFinish(); + // Verify invoice CSV + ImmutableList overallInvoice = resultFileContents("REG-INV-2017-10.csv"); + assertThat(overallInvoice.get(0)) + .isEqualTo( + "StartDate,EndDate,ProductAccountKey,Amount,AmountCurrency,BillingProductCode," + + "SalesChannel,LineItemType,UsageGroupingKey,Quantity,Description,UnitPrice," + + "UnitPriceCurrency,PONumber"); + assertThat(overallInvoice.subList(1, overallInvoice.size())) + .containsExactlyElementsIn(EXPECTED_INVOICE_OUTPUT); + removeTmOverrideForTest(); + } + + @Test + void testSuccess_readFromCloudSql() throws Exception { + setTmForTest(jpaTm()); + setupCloudSql(); + PCollection billingEvents = InvoicingPipeline.readFromCloudSql(options, pipeline); + billingEvents = billingEvents.apply(new changeDomainRepo()); + PAssert.that(billingEvents).containsInAnyOrder(INPUT_EVENTS); + pipeline.run().waitUntilFinish(); + removeTmOverrideForTest(); + } + @Test void testSuccess_saveInvoiceCsv() throws Exception { InvoicingPipeline.saveInvoiceCsv(billingEvents, options); @@ -247,6 +324,30 @@ class InvoicingPipelineTest { } } + @Test + void testSuccess_makeCloudSqlQuery() throws Exception { + // Pipeline must be run due to the TestPipelineExtension + pipeline.run().waitUntilFinish(); + // Test that comments are removed from the .sql file correctly + assertThat(InvoicingPipeline.makeCloudSqlQuery("2017-10")) + .isEqualTo( + "\n" + + "SELECT b, r FROM BillingEvent b\n" + + "JOIN Registrar r ON b.clientId = r.clientIdentifier\n" + + "JOIN Domain d ON b.domainRepoId = d.repoId\n" + + "JOIN Tld t ON t.tldStrId = d.tld\n" + + "LEFT JOIN BillingCancellation c ON b.id = c.refOneTime.billingId\n" + + "LEFT JOIN BillingCancellation cr ON b.cancellationMatchingBillingEvent =" + + " cr.refRecurring.billingId\n" + + "WHERE r.billingIdentifier IS NOT NULL\n" + + "AND r.type = 'REAL'\n" + + "AND t.invoicingEnabled IS TRUE\n" + + "AND b.billingTime BETWEEN CAST('2017-10-01' AS timestamp) AND CAST('2017-11-01'" + + " AS timestamp)\n" + + "AND c.id IS NULL\n" + + "AND cr.id IS NULL\n"); + } + /** Returns the text contents of a file under the beamBucket/results directory. */ private ImmutableList resultFileContents(String filename) throws Exception { File resultFile = @@ -256,4 +357,263 @@ class InvoicingPipelineTest { return ImmutableList.copyOf( ResourceUtils.readResourceUtf8(resultFile.toURI().toURL()).split("\n")); } + + private void setupCloudSql() { + // Populate billing events in Cloud SQL to match existing test data for Datastore + persistNewRegistrar("NewRegistrar"); + persistNewRegistrar("TheRegistrar"); + Registrar registrar1 = persistNewRegistrar("theRegistrar"); + registrar1 = registrar1.asBuilder().setBillingIdentifier(234L).build(); + persistResource(registrar1); + Registrar registrar2 = persistNewRegistrar("bestdomains"); + registrar2 = + registrar2 + .asBuilder() + .setBillingIdentifier(456L) + .setPoNumber(Optional.of("116688")) + .build(); + persistResource(registrar2); + Registrar registrar3 = persistNewRegistrar("anotherRegistrar"); + registrar3 = registrar3.asBuilder().setBillingIdentifier(789L).build(); + persistResource(registrar3); + + Registry test = + newRegistry("test", "_TEST", ImmutableSortedMap.of(START_OF_TIME, GENERAL_AVAILABILITY)) + .asBuilder() + .setInvoicingEnabled(true) + .build(); + persistResource(test); + Registry hello = + newRegistry("hello", "_HELLO", ImmutableSortedMap.of(START_OF_TIME, GENERAL_AVAILABILITY)) + .asBuilder() + .setInvoicingEnabled(true) + .build(); + persistResource(hello); + + DomainBase domain1 = persistActiveDomain("mydomain.test"); + DomainBase domain2 = persistActiveDomain("mydomain2.test"); + DomainBase domain3 = persistActiveDomain("mydomain3.hello"); + DomainBase domain4 = persistActiveDomain("mydomain4.test"); + DomainBase domain5 = persistActiveDomain("mydomain5.test"); + DomainBase domain6 = persistActiveDomain("locked.test"); + DomainBase domain7 = persistActiveDomain("update-prohibited.test"); + + persistOneTimeBillingEvent( + 1, domain1, registrar1, Reason.RENEW, 3, Money.of(CurrencyUnit.USD, 20.5)); + persistOneTimeBillingEvent( + 2, domain2, registrar1, Reason.RENEW, 3, Money.of(CurrencyUnit.USD, 20.5)); + persistOneTimeBillingEvent( + 3, + domain3, + registrar1, + Reason.CREATE, + 5, + Money.ofMajor(CurrencyUnit.JPY, 70), + DateTime.parse("2017-09-29T00:00:00.0Z"), + DateTime.parse("2017-10-02T00:00:00.0Z")); + persistOneTimeBillingEvent( + 4, domain4, registrar2, Reason.RENEW, 1, Money.of(CurrencyUnit.USD, 20.5)); + persistOneTimeBillingEvent( + 5, + domain5, + registrar3, + Reason.CREATE, + 1, + Money.of(CurrencyUnit.USD, 0), + DateTime.parse("2017-10-04T00:00:00.0Z"), + DateTime.parse("2017-10-04T00:00:00.0Z"), + Flag.SUNRISE, + Flag.ANCHOR_TENANT); + persistOneTimeBillingEvent( + 6, domain6, registrar1, Reason.SERVER_STATUS, 0, Money.of(CurrencyUnit.USD, 0)); + persistOneTimeBillingEvent( + 7, domain7, registrar1, Reason.SERVER_STATUS, 0, Money.of(CurrencyUnit.USD, 20)); + + // Add billing event for a non-billable registrar + Registrar registrar4 = persistNewRegistrar("noBillRegistrar"); + registrar4 = registrar4.asBuilder().setBillingIdentifier(null).build(); + persistResource(registrar4); + DomainBase domain8 = persistActiveDomain("non-billable.test"); + persistOneTimeBillingEvent( + 8, domain8, registrar4, Reason.RENEW, 3, Money.of(CurrencyUnit.USD, 20.5)); + + // Add billing event for a non-real registrar + Registrar registrar5 = persistNewRegistrar("notRealRegistrar"); + registrar5 = + registrar5 + .asBuilder() + .setIanaIdentifier(null) + .setBillingIdentifier(456L) + .setType(Registrar.Type.OTE) + .build(); + persistResource(registrar5); + DomainBase domain9 = persistActiveDomain("not-real.test"); + persistOneTimeBillingEvent( + 9, domain9, registrar5, Reason.RENEW, 3, Money.of(CurrencyUnit.USD, 20.5)); + + // Add billing event for a non-invoicing TLD + createTld("nobill"); + DomainBase domain10 = persistActiveDomain("test.nobill"); + persistOneTimeBillingEvent( + 10, domain10, registrar1, Reason.RENEW, 3, Money.of(CurrencyUnit.USD, 20.5)); + + // Add billing event before October 2017 + DomainBase domain11 = persistActiveDomain("july.test"); + persistOneTimeBillingEvent( + 11, + domain11, + registrar1, + Reason.CREATE, + 5, + Money.ofMajor(CurrencyUnit.JPY, 70), + DateTime.parse("2017-06-29T00:00:00.0Z"), + DateTime.parse("2017-07-02T00:00:00.0Z")); + + // Add a billing event with a corresponding cancellation + DomainBase domain12 = persistActiveDomain("cancel.test"); + OneTime oneTime = + persistOneTimeBillingEvent( + 12, domain12, registrar1, Reason.RENEW, 3, Money.of(CurrencyUnit.USD, 20.5)); + DomainHistory domainHistory = persistDomainHistory(domain12, registrar1); + + Cancellation cancellation = + new Cancellation() + .asBuilder() + .setId(1) + .setClientId(registrar1.getClientId()) + .setDomainHistoryRevisionId(domainHistory.getId()) + .setEventTime(DateTime.parse("2017-10-05T00:00:00.0Z")) + .setBillingTime(DateTime.parse("2017-10-04T00:00:00.0Z")) + .setOneTimeEventKey(oneTime.createVKey()) + .setTargetId(domain12.getDomainName()) + .setReason(Reason.RENEW) + .setParent(domainHistory) + .build(); + persistResource(cancellation); + + // Add billing event with a corresponding recurring billing event and cancellation + DomainBase domain13 = persistActiveDomain("cancel-recurring.test"); + DomainHistory domainHistoryRecurring = persistDomainHistory(domain13, registrar1); + + Recurring recurring = + new Recurring() + .asBuilder() + .setClientId(registrar1.getClientId()) + .setRecurrenceEndTime(END_OF_TIME) + .setId(1) + .setParent(domainHistoryRecurring) + .setTargetId(domain13.getDomainName()) + .setEventTime(DateTime.parse("2017-10-04T00:00:00.0Z")) + .setReason(Reason.RENEW) + .build(); + persistResource(recurring); + OneTime oneTimeRecurring = + persistOneTimeBillingEvent( + 13, domain13, registrar1, Reason.RENEW, 3, Money.of(CurrencyUnit.USD, 20.5)); + oneTimeRecurring = + oneTimeRecurring + .asBuilder() + .setCancellationMatchingBillingEvent(recurring.createVKey()) + .setFlags(ImmutableSet.of(Flag.SYNTHETIC)) + .setSyntheticCreationTime(DateTime.parse("2017-10-03T00:00:00.0Z")) + .build(); + persistResource(oneTimeRecurring); + + Cancellation cancellationRecurring = + new Cancellation() + .asBuilder() + .setId(2) + .setClientId(registrar1.getClientId()) + .setDomainHistoryRevisionId(domainHistoryRecurring.getId()) + .setEventTime(DateTime.parse("2017-10-05T00:00:00.0Z")) + .setBillingTime(DateTime.parse("2017-10-04T00:00:00.0Z")) + .setRecurringEventKey(recurring.createVKey()) + .setTargetId(domain13.getDomainName()) + .setReason(Reason.RENEW) + .setParent(domainHistoryRecurring) + .build(); + persistResource(cancellationRecurring); + } + + private DomainHistory persistDomainHistory(DomainBase domainBase, Registrar registrar) { + DomainHistory domainHistory = + new DomainHistory.Builder() + .setType(HistoryEntry.Type.DOMAIN_RENEW) + .setModificationTime(DateTime.parse("2017-10-04T00:00:00.0Z")) + .setDomain(domainBase) + .setClientId(registrar.getClientId()) + .build(); + return persistResource(domainHistory); + } + + private OneTime persistOneTimeBillingEvent( + int id, DomainBase domainBase, Registrar registrar, Reason reason, int years, Money money) { + return persistOneTimeBillingEvent( + id, + domainBase, + registrar, + reason, + years, + money, + DateTime.parse("2017-10-04T00:00:00.0Z"), + DateTime.parse("2017-10-04T00:00:00.0Z")); + } + + private OneTime persistOneTimeBillingEvent( + int id, + DomainBase domainBase, + Registrar registrar, + Reason reason, + int years, + Money money, + DateTime eventTime, + DateTime billingTime, + Flag... flags) { + google.registry.model.billing.BillingEvent.OneTime.Builder billingEventBuilder = + new OneTime() + .asBuilder() + .setId(id) + .setBillingTime(billingTime) + .setEventTime(eventTime) + .setClientId(registrar.getClientId()) + .setReason(reason) + .setTargetId(domainBase.getDomainName()) + .setDomainRepoId("REPO-ID") + .setCost(money) + .setFlags(Arrays.stream(flags).collect(toImmutableSet())) + .setParent(persistDomainHistory(domainBase, registrar)); + + if (years > 0) { + billingEventBuilder.setPeriodYears(years); + } + + return persistResource(billingEventBuilder.build()); + } + + private static class changeDomainRepo + extends PTransform, PCollection> { + @Override + public PCollection expand(PCollection input) { + return input.apply( + "Map to invoicing key", + MapElements.into(TypeDescriptor.of(BillingEvent.class)) + .via( + billingEvent -> + BillingEvent.create( + billingEvent.id(), + billingEvent.billingTime(), + billingEvent.eventTime(), + billingEvent.registrarId(), + billingEvent.billingId(), + billingEvent.poNumber(), + billingEvent.tld(), + billingEvent.action(), + billingEvent.domain(), + "REPO-ID", + billingEvent.years(), + billingEvent.currency(), + billingEvent.amount(), + billingEvent.flags()))); + } + } } diff --git a/core/src/test/java/google/registry/reporting/billing/GenerateInvoicesActionTest.java b/core/src/test/java/google/registry/reporting/billing/GenerateInvoicesActionTest.java index a9fcb6109..3c8db1b99 100644 --- a/core/src/test/java/google/registry/reporting/billing/GenerateInvoicesActionTest.java +++ b/core/src/test/java/google/registry/reporting/billing/GenerateInvoicesActionTest.java @@ -53,6 +53,7 @@ class GenerateInvoicesActionTest extends BeamActionTestBase { "billing_bucket", "REG-INV", true, + "DATASTORE", new YearMonth(2017, 10), emailUtils, clock, @@ -82,6 +83,7 @@ class GenerateInvoicesActionTest extends BeamActionTestBase { "billing_bucket", "REG-INV", false, + "DATASTORE", new YearMonth(2017, 10), emailUtils, clock, @@ -105,6 +107,7 @@ class GenerateInvoicesActionTest extends BeamActionTestBase { "billing_bucket", "REG-INV", false, + "DATASTORE", new YearMonth(2017, 10), emailUtils, clock,