From 08bcf579a5db8068b8587ee326cbbb05ddbc3af3 Mon Sep 17 00:00:00 2001 From: Lai Jiang Date: Fri, 16 Feb 2024 15:43:40 -0500 Subject: [PATCH] Remove Duplicate billing events from the invoicing pipeline (#2326) The Distinct transform removes duplicates based on the serialized format of the elements. By providing a deterministic coder, we can guarantee that no duplicates exist. --- .../registry/beam/billing/BillingEvent.java | 85 +++++++++++++++---- .../beam/billing/InvoicingPipeline.java | 22 ++--- .../beam/billing/BillingEventTest.java | 13 ++- .../beam/billing/InvoicingPipelineTest.java | 83 +++++++++--------- 4 files changed, 132 insertions(+), 71 deletions(-) diff --git a/core/src/main/java/google/registry/beam/billing/BillingEvent.java b/core/src/main/java/google/registry/beam/billing/BillingEvent.java index 98f13524b..f91820cb8 100644 --- a/core/src/main/java/google/registry/beam/billing/BillingEvent.java +++ b/core/src/main/java/google/registry/beam/billing/BillingEvent.java @@ -21,26 +21,21 @@ import google.registry.reporting.billing.BillingModule; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.io.Serializable; import java.util.regex.Pattern; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.DoubleCoder; +import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.coders.VarLongCoder; import org.joda.time.DateTime; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; -/** - * A POJO representing a single billable event, parsed from a {@code SchemaAndRecord}. - * - *

This is a trivially serializable class that allows Beam to transform the results of a Cloud - * SQL query into a standard Java representation, giving us the type guarantees and ease of - * manipulation Cloud SQL lacks. - */ +/** A POJO representing a single billable event, parsed from a {@code SchemaAndRecord}. */ @AutoValue -public abstract class BillingEvent implements Serializable { - - private static final long serialVersionUID = -3593088371541450077L; +public abstract class BillingEvent { private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss zzz"); @@ -85,7 +80,7 @@ public abstract class BillingEvent implements Serializable { /** Returns the tld this event was generated for. */ abstract String tld(); - /** Returns the billable action this event was generated for (i.e. RENEW, CREATE, TRANSFER...) */ + /** Returns the billable action this event was generated for (i.e., RENEW, CREATE, TRANSFER...) */ abstract String action(); /** Returns the fully qualified domain name this event was generated for. */ @@ -97,7 +92,7 @@ public abstract class BillingEvent implements Serializable { /** Returns the number of years this billing event is made out for. */ abstract int years(); - /** Returns the 3-letter currency code for the billing event (i.e. USD or JPY.) */ + /** Returns the 3-letter currency code for the billing event (i.e., USD or JPY.) */ abstract String currency(); /** Returns the cost associated with this billing event. */ @@ -203,9 +198,7 @@ public abstract class BillingEvent implements Serializable { /** Key for each {@code BillingEvent}, when aggregating for the overall invoice. */ @AutoValue - abstract static class InvoiceGroupingKey implements Serializable { - - private static final long serialVersionUID = -151561764235256205L; + abstract static class InvoiceGroupingKey { private static final ImmutableList INVOICE_HEADERS = ImmutableList.of( @@ -277,8 +270,14 @@ public abstract class BillingEvent implements Serializable { /** Coder that provides deterministic (de)serialization for {@code InvoiceGroupingKey}. */ static class InvoiceGroupingKeyCoder extends AtomicCoder { + private static final Coder stringCoder = StringUtf8Coder.of(); + private static final InvoiceGroupingKeyCoder INSTANCE = new InvoiceGroupingKeyCoder(); - private static final long serialVersionUID = 6680701524304107547L; + public static InvoiceGroupingKeyCoder of() { + return INSTANCE; + } + + private InvoiceGroupingKeyCoder() {} @Override public void encode(InvoiceGroupingKey value, OutputStream outStream) throws IOException { @@ -295,7 +294,6 @@ public abstract class BillingEvent implements Serializable { @Override public InvoiceGroupingKey decode(InputStream inStream) throws IOException { - Coder stringCoder = StringUtf8Coder.of(); return new AutoValue_BillingEvent_InvoiceGroupingKey( stringCoder.decode(inStream), stringCoder.decode(inStream), @@ -308,4 +306,55 @@ public abstract class BillingEvent implements Serializable { } } } + + static class BillingEventCoder extends AtomicCoder { + private static final Coder stringCoder = StringUtf8Coder.of(); + private static final Coder integerCoder = VarIntCoder.of(); + private static final Coder longCoder = VarLongCoder.of(); + private static final Coder doubleCoder = DoubleCoder.of(); + private static final BillingEventCoder INSTANCE = new BillingEventCoder(); + + static NullableCoder ofNullable() { + return NullableCoder.of(INSTANCE); + } + + private BillingEventCoder() {} + + @Override + public void encode(BillingEvent value, OutputStream outStream) throws IOException { + longCoder.encode(value.id(), outStream); + stringCoder.encode(DATE_TIME_FORMATTER.print(value.billingTime()), outStream); + stringCoder.encode(DATE_TIME_FORMATTER.print(value.eventTime()), outStream); + stringCoder.encode(value.registrarId(), outStream); + stringCoder.encode(value.billingId(), outStream); + stringCoder.encode(value.poNumber(), outStream); + stringCoder.encode(value.tld(), outStream); + stringCoder.encode(value.action(), outStream); + stringCoder.encode(value.domain(), outStream); + stringCoder.encode(value.repositoryId(), outStream); + integerCoder.encode(value.years(), outStream); + stringCoder.encode(value.currency(), outStream); + doubleCoder.encode(value.amount(), outStream); + stringCoder.encode(value.flags(), outStream); + } + + @Override + public BillingEvent decode(InputStream inStream) throws IOException { + return new AutoValue_BillingEvent( + longCoder.decode(inStream), + DATE_TIME_FORMATTER.parseDateTime(stringCoder.decode(inStream)), + DATE_TIME_FORMATTER.parseDateTime(stringCoder.decode(inStream)), + stringCoder.decode(inStream), + stringCoder.decode(inStream), + stringCoder.decode(inStream), + stringCoder.decode(inStream), + stringCoder.decode(inStream), + stringCoder.decode(inStream), + stringCoder.decode(inStream), + integerCoder.decode(inStream), + stringCoder.decode(inStream), + doubleCoder.decode(inStream), + stringCoder.decode(inStream)); + } + } } diff --git a/core/src/main/java/google/registry/beam/billing/InvoicingPipeline.java b/core/src/main/java/google/registry/beam/billing/InvoicingPipeline.java index af199fb08..7b375daa3 100644 --- a/core/src/main/java/google/registry/beam/billing/InvoicingPipeline.java +++ b/core/src/main/java/google/registry/beam/billing/InvoicingPipeline.java @@ -18,6 +18,7 @@ import static com.google.common.collect.ImmutableSet.toImmutableSet; import static org.apache.beam.sdk.values.TypeDescriptors.strings; import com.google.common.flogger.FluentLogger; +import google.registry.beam.billing.BillingEvent.BillingEventCoder; import google.registry.beam.billing.BillingEvent.InvoiceGroupingKey; import google.registry.beam.billing.BillingEvent.InvoiceGroupingKey.InvoiceGroupingKeyCoder; import google.registry.beam.common.RegistryJpaIO; @@ -30,6 +31,7 @@ import google.registry.reporting.billing.BillingModule; import google.registry.util.DomainNameUtils; import google.registry.util.ResourceUtils; import google.registry.util.SqlTemplate; +import java.io.Serial; import java.io.Serializable; import java.time.YearMonth; import java.util.Objects; @@ -37,13 +39,13 @@ import java.util.Optional; import java.util.regex.Pattern; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.FileIO; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Contextful; import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.Distinct; import org.apache.beam.sdk.transforms.Filter; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; @@ -65,7 +67,7 @@ import org.joda.money.CurrencyUnit; */ public class InvoicingPipeline implements Serializable { - private static final long serialVersionUID = 5386330443625580081L; + @Serial private static final long serialVersionUID = 5386330443625580081L; private static final Pattern SQL_COMMENT_REGEX = Pattern.compile("^\\s*--.*\\n", Pattern.MULTILINE); @@ -97,13 +99,11 @@ public class InvoicingPipeline implements Serializable { Read read = RegistryJpaIO.read( makeCloudSqlQuery(options.getYearMonth()), false, row -> parseRow(row).orElse(null)) - .withCoder(SerializableCoder.of(google.registry.beam.billing.BillingEvent.class)); - - PCollection billingEventsWithNulls = - pipeline.apply("Read BillingEvents from Cloud SQL", read); - - // Remove null billing events - return billingEventsWithNulls.apply(Filter.by(Objects::nonNull)); + .withCoder(BillingEventCoder.ofNullable()); + return pipeline + .apply("Read BillingEvents from Cloud SQL", read) + .apply("Remove null elements", Filter.by(Objects::nonNull)) + .apply("Remove duplicates", Distinct.create()); } private static Optional parseRow(Object[] row) { @@ -142,7 +142,7 @@ public class InvoicingPipeline implements Serializable { extends PTransform< PCollection, PCollection> { - private static final long serialVersionUID = -8090619008258393728L; + @Serial private static final long serialVersionUID = -8090619008258393728L; @Override public PCollection expand( @@ -152,9 +152,9 @@ public class InvoicingPipeline implements Serializable { "Map to invoicing key", MapElements.into(TypeDescriptor.of(InvoiceGroupingKey.class)) .via(google.registry.beam.billing.BillingEvent::getInvoiceGroupingKey)) + .setCoder(InvoiceGroupingKeyCoder.of()) .apply( "Filter out free events", Filter.by((InvoiceGroupingKey key) -> key.unitPrice() != 0)) - .setCoder(new InvoiceGroupingKeyCoder()) .apply("Count occurrences", Count.perElement()) .apply( "Format as CSVs", diff --git a/core/src/test/java/google/registry/beam/billing/BillingEventTest.java b/core/src/test/java/google/registry/beam/billing/BillingEventTest.java index b86d6017b..4b01682c1 100644 --- a/core/src/test/java/google/registry/beam/billing/BillingEventTest.java +++ b/core/src/test/java/google/registry/beam/billing/BillingEventTest.java @@ -16,12 +16,14 @@ package google.registry.beam.billing; import static com.google.common.truth.Truth.assertThat; +import google.registry.beam.billing.BillingEvent.BillingEventCoder; import google.registry.beam.billing.BillingEvent.InvoiceGroupingKey; import google.registry.beam.billing.BillingEvent.InvoiceGroupingKey.InvoiceGroupingKeyCoder; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import org.apache.beam.sdk.coders.NullableCoder; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.junit.jupiter.api.BeforeEach; @@ -120,13 +122,22 @@ class BillingEventTest { @Test void testInvoiceGroupingKeyCoder_deterministicSerialization() throws IOException { InvoiceGroupingKey invoiceKey = event.getInvoiceGroupingKey(); - InvoiceGroupingKeyCoder coder = new InvoiceGroupingKeyCoder(); + InvoiceGroupingKeyCoder coder = InvoiceGroupingKeyCoder.of(); ByteArrayOutputStream outStream = new ByteArrayOutputStream(); coder.encode(invoiceKey, outStream); InputStream inStream = new ByteArrayInputStream(outStream.toByteArray()); assertThat(coder.decode(inStream)).isEqualTo(invoiceKey); } + @Test + void testBillingEventCoder_deterministicSerialization() throws IOException { + NullableCoder coder = BillingEventCoder.ofNullable(); + ByteArrayOutputStream outStream = new ByteArrayOutputStream(); + coder.encode(event, outStream); + InputStream inStream = new ByteArrayInputStream(outStream.toByteArray()); + assertThat(coder.decode(inStream)).isEqualTo(event); + } + @Test void testGetDetailReportHeader() { assertThat(BillingEvent.getHeader()) diff --git a/core/src/test/java/google/registry/beam/billing/InvoicingPipelineTest.java b/core/src/test/java/google/registry/beam/billing/InvoicingPipelineTest.java index b8f06ea14..f3ec47135 100644 --- a/core/src/test/java/google/registry/beam/billing/InvoicingPipelineTest.java +++ b/core/src/test/java/google/registry/beam/billing/InvoicingPipelineTest.java @@ -36,6 +36,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSortedMap; import com.google.common.testing.TestLogHandler; import google.registry.beam.TestPipelineExtension; +import google.registry.beam.billing.BillingEvent.BillingEventCoder; import google.registry.model.billing.BillingBase.Flag; import google.registry.model.billing.BillingBase.Reason; import google.registry.model.billing.BillingCancellation; @@ -51,13 +52,13 @@ import google.registry.persistence.transaction.JpaTestExtensions.JpaIntegrationT import google.registry.testing.FakeClock; import google.registry.util.ResourceUtils; import java.io.File; +import java.io.Serial; import java.nio.file.Files; import java.nio.file.Path; import java.util.Arrays; import java.util.Map.Entry; import java.util.Optional; import java.util.logging.Logger; -import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Create; @@ -251,9 +252,7 @@ class InvoicingPipelineTest { options.setYearMonth(YEAR_MONTH); options.setInvoiceFilePrefix(INVOICE_FILE_PREFIX); billingEvents = - pipeline.apply( - Create.of(INPUT_EVENTS) - .withCoder(SerializableCoder.of(google.registry.beam.billing.BillingEvent.class))); + pipeline.apply(Create.of(INPUT_EVENTS).withCoder(BillingEventCoder.ofNullable())); } @Test @@ -346,26 +345,26 @@ class InvoicingPipelineTest { @Test void testSuccess_makeCloudSqlQuery() throws Exception { - // Pipeline must be run due to the TestPipelineExtension + // The Pipeline must run due to TestPipelineExtension's checks. pipeline.run().waitUntilFinish(); // Test that comments are removed from the .sql file correctly assertThat(InvoicingPipeline.makeCloudSqlQuery("2017-10")) .isEqualTo( - '\n' - + "SELECT b, r FROM BillingEvent b\n" - + "JOIN Registrar r ON b.clientId = r.registrarId\n" - + "JOIN Domain d ON b.domainRepoId = d.repoId\n" - + "JOIN Tld t ON t.tldStr = d.tld\n" - + "LEFT JOIN BillingCancellation c ON b.id = c.billingEvent\n" - + "LEFT JOIN BillingCancellation cr ON b.cancellationMatchingBillingEvent =" - + " cr.billingRecurrence\n" - + "WHERE r.billingAccountMap IS NOT NULL\n" - + "AND r.type = 'REAL'\n" - + "AND t.invoicingEnabled IS TRUE\n" - + "AND b.billingTime BETWEEN CAST('2017-10-01' AS timestamp) AND CAST('2017-11-01'" - + " AS timestamp)\n" - + "AND c.id IS NULL\n" - + "AND cr.id IS NULL\n"); + """ + + SELECT b, r FROM BillingEvent b + JOIN Registrar r ON b.clientId = r.registrarId + JOIN Domain d ON b.domainRepoId = d.repoId + JOIN Tld t ON t.tldStr = d.tld + LEFT JOIN BillingCancellation c ON b.id = c.billingEvent + LEFT JOIN BillingCancellation cr ON b.cancellationMatchingBillingEvent = cr.billingRecurrence + WHERE r.billingAccountMap IS NOT NULL + AND r.type = 'REAL' + AND t.invoicingEnabled IS TRUE + AND b.billingTime BETWEEN CAST('2017-10-01' AS timestamp) AND CAST('2017-11-01' AS timestamp) + AND c.id IS NULL + AND cr.id IS NULL + """); } /** Returns the text contents of a file under the beamBucket/results directory. */ @@ -604,31 +603,33 @@ class InvoicingPipelineTest { PCollection, PCollection> { - private static final long serialVersionUID = 2695033474967615250L; + @Serial private static final long serialVersionUID = 2695033474967615250L; @Override public PCollection expand( PCollection input) { - return input.apply( - "Map to invoicing key", - MapElements.into(TypeDescriptor.of(google.registry.beam.billing.BillingEvent.class)) - .via( - billingEvent -> - google.registry.beam.billing.BillingEvent.create( - billingEvent.id(), - billingEvent.billingTime(), - billingEvent.eventTime(), - billingEvent.registrarId(), - billingEvent.billingId(), - billingEvent.poNumber(), - billingEvent.tld(), - billingEvent.action(), - billingEvent.domain(), - "REPO-ID", - billingEvent.years(), - billingEvent.currency(), - billingEvent.amount(), - billingEvent.flags()))); + return input + .apply( + "Map to invoicing key", + MapElements.into(TypeDescriptor.of(google.registry.beam.billing.BillingEvent.class)) + .via( + billingEvent -> + google.registry.beam.billing.BillingEvent.create( + billingEvent.id(), + billingEvent.billingTime(), + billingEvent.eventTime(), + billingEvent.registrarId(), + billingEvent.billingId(), + billingEvent.poNumber(), + billingEvent.tld(), + billingEvent.action(), + billingEvent.domain(), + "REPO-ID", + billingEvent.years(), + billingEvent.currency(), + billingEvent.amount(), + billingEvent.flags()))) + .setCoder(BillingEventCoder.ofNullable()); } } }