Remove Duplicate billing events from the invoicing pipeline (#2326)

The Distinct transform removes duplicates based on the serialized format
of the elements. By providing a deterministic coder, we can guarantee
that no duplicates exist.
This commit is contained in:
Lai Jiang 2024-02-16 15:43:40 -05:00 committed by GitHub
parent 7d2330c943
commit 08bcf579a5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 132 additions and 71 deletions

View file

@ -21,26 +21,21 @@ import google.registry.reporting.billing.BillingModule;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.io.Serializable;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.DoubleCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter; import org.joda.time.format.DateTimeFormatter;
/** /** A POJO representing a single billable event, parsed from a {@code SchemaAndRecord}. */
* A POJO representing a single billable event, parsed from a {@code SchemaAndRecord}.
*
* <p>This is a trivially serializable class that allows Beam to transform the results of a Cloud
* SQL query into a standard Java representation, giving us the type guarantees and ease of
* manipulation Cloud SQL lacks.
*/
@AutoValue @AutoValue
public abstract class BillingEvent implements Serializable { public abstract class BillingEvent {
private static final long serialVersionUID = -3593088371541450077L;
private static final DateTimeFormatter DATE_TIME_FORMATTER = private static final DateTimeFormatter DATE_TIME_FORMATTER =
DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss zzz"); DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss zzz");
@ -85,7 +80,7 @@ public abstract class BillingEvent implements Serializable {
/** Returns the tld this event was generated for. */ /** Returns the tld this event was generated for. */
abstract String tld(); abstract String tld();
/** Returns the billable action this event was generated for (i.e. RENEW, CREATE, TRANSFER...) */ /** Returns the billable action this event was generated for (i.e., RENEW, CREATE, TRANSFER...) */
abstract String action(); abstract String action();
/** Returns the fully qualified domain name this event was generated for. */ /** Returns the fully qualified domain name this event was generated for. */
@ -97,7 +92,7 @@ public abstract class BillingEvent implements Serializable {
/** Returns the number of years this billing event is made out for. */ /** Returns the number of years this billing event is made out for. */
abstract int years(); abstract int years();
/** Returns the 3-letter currency code for the billing event (i.e. USD or JPY.) */ /** Returns the 3-letter currency code for the billing event (i.e., USD or JPY.) */
abstract String currency(); abstract String currency();
/** Returns the cost associated with this billing event. */ /** Returns the cost associated with this billing event. */
@ -203,9 +198,7 @@ public abstract class BillingEvent implements Serializable {
/** Key for each {@code BillingEvent}, when aggregating for the overall invoice. */ /** Key for each {@code BillingEvent}, when aggregating for the overall invoice. */
@AutoValue @AutoValue
abstract static class InvoiceGroupingKey implements Serializable { abstract static class InvoiceGroupingKey {
private static final long serialVersionUID = -151561764235256205L;
private static final ImmutableList<String> INVOICE_HEADERS = private static final ImmutableList<String> INVOICE_HEADERS =
ImmutableList.of( ImmutableList.of(
@ -277,8 +270,14 @@ public abstract class BillingEvent implements Serializable {
/** Coder that provides deterministic (de)serialization for {@code InvoiceGroupingKey}. */ /** Coder that provides deterministic (de)serialization for {@code InvoiceGroupingKey}. */
static class InvoiceGroupingKeyCoder extends AtomicCoder<InvoiceGroupingKey> { static class InvoiceGroupingKeyCoder extends AtomicCoder<InvoiceGroupingKey> {
private static final Coder<String> stringCoder = StringUtf8Coder.of();
private static final InvoiceGroupingKeyCoder INSTANCE = new InvoiceGroupingKeyCoder();
private static final long serialVersionUID = 6680701524304107547L; public static InvoiceGroupingKeyCoder of() {
return INSTANCE;
}
private InvoiceGroupingKeyCoder() {}
@Override @Override
public void encode(InvoiceGroupingKey value, OutputStream outStream) throws IOException { public void encode(InvoiceGroupingKey value, OutputStream outStream) throws IOException {
@ -295,7 +294,6 @@ public abstract class BillingEvent implements Serializable {
@Override @Override
public InvoiceGroupingKey decode(InputStream inStream) throws IOException { public InvoiceGroupingKey decode(InputStream inStream) throws IOException {
Coder<String> stringCoder = StringUtf8Coder.of();
return new AutoValue_BillingEvent_InvoiceGroupingKey( return new AutoValue_BillingEvent_InvoiceGroupingKey(
stringCoder.decode(inStream), stringCoder.decode(inStream),
stringCoder.decode(inStream), stringCoder.decode(inStream),
@ -308,4 +306,55 @@ public abstract class BillingEvent implements Serializable {
} }
} }
} }
static class BillingEventCoder extends AtomicCoder<BillingEvent> {
private static final Coder<String> stringCoder = StringUtf8Coder.of();
private static final Coder<Integer> integerCoder = VarIntCoder.of();
private static final Coder<Long> longCoder = VarLongCoder.of();
private static final Coder<Double> doubleCoder = DoubleCoder.of();
private static final BillingEventCoder INSTANCE = new BillingEventCoder();
static NullableCoder<BillingEvent> ofNullable() {
return NullableCoder.of(INSTANCE);
}
private BillingEventCoder() {}
@Override
public void encode(BillingEvent value, OutputStream outStream) throws IOException {
longCoder.encode(value.id(), outStream);
stringCoder.encode(DATE_TIME_FORMATTER.print(value.billingTime()), outStream);
stringCoder.encode(DATE_TIME_FORMATTER.print(value.eventTime()), outStream);
stringCoder.encode(value.registrarId(), outStream);
stringCoder.encode(value.billingId(), outStream);
stringCoder.encode(value.poNumber(), outStream);
stringCoder.encode(value.tld(), outStream);
stringCoder.encode(value.action(), outStream);
stringCoder.encode(value.domain(), outStream);
stringCoder.encode(value.repositoryId(), outStream);
integerCoder.encode(value.years(), outStream);
stringCoder.encode(value.currency(), outStream);
doubleCoder.encode(value.amount(), outStream);
stringCoder.encode(value.flags(), outStream);
}
@Override
public BillingEvent decode(InputStream inStream) throws IOException {
return new AutoValue_BillingEvent(
longCoder.decode(inStream),
DATE_TIME_FORMATTER.parseDateTime(stringCoder.decode(inStream)),
DATE_TIME_FORMATTER.parseDateTime(stringCoder.decode(inStream)),
stringCoder.decode(inStream),
stringCoder.decode(inStream),
stringCoder.decode(inStream),
stringCoder.decode(inStream),
stringCoder.decode(inStream),
stringCoder.decode(inStream),
stringCoder.decode(inStream),
integerCoder.decode(inStream),
stringCoder.decode(inStream),
doubleCoder.decode(inStream),
stringCoder.decode(inStream));
}
}
} }

View file

@ -18,6 +18,7 @@ import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static org.apache.beam.sdk.values.TypeDescriptors.strings; import static org.apache.beam.sdk.values.TypeDescriptors.strings;
import com.google.common.flogger.FluentLogger; import com.google.common.flogger.FluentLogger;
import google.registry.beam.billing.BillingEvent.BillingEventCoder;
import google.registry.beam.billing.BillingEvent.InvoiceGroupingKey; import google.registry.beam.billing.BillingEvent.InvoiceGroupingKey;
import google.registry.beam.billing.BillingEvent.InvoiceGroupingKey.InvoiceGroupingKeyCoder; import google.registry.beam.billing.BillingEvent.InvoiceGroupingKey.InvoiceGroupingKeyCoder;
import google.registry.beam.common.RegistryJpaIO; import google.registry.beam.common.RegistryJpaIO;
@ -30,6 +31,7 @@ import google.registry.reporting.billing.BillingModule;
import google.registry.util.DomainNameUtils; import google.registry.util.DomainNameUtils;
import google.registry.util.ResourceUtils; import google.registry.util.ResourceUtils;
import google.registry.util.SqlTemplate; import google.registry.util.SqlTemplate;
import java.io.Serial;
import java.io.Serializable; import java.io.Serializable;
import java.time.YearMonth; import java.time.YearMonth;
import java.util.Objects; import java.util.Objects;
@ -37,13 +39,13 @@ import java.util.Optional;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.FileIO; import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Contextful; import org.apache.beam.sdk.transforms.Contextful;
import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.transforms.Filter; import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.PTransform;
@ -65,7 +67,7 @@ import org.joda.money.CurrencyUnit;
*/ */
public class InvoicingPipeline implements Serializable { public class InvoicingPipeline implements Serializable {
private static final long serialVersionUID = 5386330443625580081L; @Serial private static final long serialVersionUID = 5386330443625580081L;
private static final Pattern SQL_COMMENT_REGEX = private static final Pattern SQL_COMMENT_REGEX =
Pattern.compile("^\\s*--.*\\n", Pattern.MULTILINE); Pattern.compile("^\\s*--.*\\n", Pattern.MULTILINE);
@ -97,13 +99,11 @@ public class InvoicingPipeline implements Serializable {
Read<Object[], google.registry.beam.billing.BillingEvent> read = Read<Object[], google.registry.beam.billing.BillingEvent> read =
RegistryJpaIO.<Object[], google.registry.beam.billing.BillingEvent>read( RegistryJpaIO.<Object[], google.registry.beam.billing.BillingEvent>read(
makeCloudSqlQuery(options.getYearMonth()), false, row -> parseRow(row).orElse(null)) makeCloudSqlQuery(options.getYearMonth()), false, row -> parseRow(row).orElse(null))
.withCoder(SerializableCoder.of(google.registry.beam.billing.BillingEvent.class)); .withCoder(BillingEventCoder.ofNullable());
return pipeline
PCollection<google.registry.beam.billing.BillingEvent> billingEventsWithNulls = .apply("Read BillingEvents from Cloud SQL", read)
pipeline.apply("Read BillingEvents from Cloud SQL", read); .apply("Remove null elements", Filter.by(Objects::nonNull))
.apply("Remove duplicates", Distinct.create());
// Remove null billing events
return billingEventsWithNulls.apply(Filter.by(Objects::nonNull));
} }
private static Optional<google.registry.beam.billing.BillingEvent> parseRow(Object[] row) { private static Optional<google.registry.beam.billing.BillingEvent> parseRow(Object[] row) {
@ -142,7 +142,7 @@ public class InvoicingPipeline implements Serializable {
extends PTransform< extends PTransform<
PCollection<google.registry.beam.billing.BillingEvent>, PCollection<String>> { PCollection<google.registry.beam.billing.BillingEvent>, PCollection<String>> {
private static final long serialVersionUID = -8090619008258393728L; @Serial private static final long serialVersionUID = -8090619008258393728L;
@Override @Override
public PCollection<String> expand( public PCollection<String> expand(
@ -152,9 +152,9 @@ public class InvoicingPipeline implements Serializable {
"Map to invoicing key", "Map to invoicing key",
MapElements.into(TypeDescriptor.of(InvoiceGroupingKey.class)) MapElements.into(TypeDescriptor.of(InvoiceGroupingKey.class))
.via(google.registry.beam.billing.BillingEvent::getInvoiceGroupingKey)) .via(google.registry.beam.billing.BillingEvent::getInvoiceGroupingKey))
.setCoder(InvoiceGroupingKeyCoder.of())
.apply( .apply(
"Filter out free events", Filter.by((InvoiceGroupingKey key) -> key.unitPrice() != 0)) "Filter out free events", Filter.by((InvoiceGroupingKey key) -> key.unitPrice() != 0))
.setCoder(new InvoiceGroupingKeyCoder())
.apply("Count occurrences", Count.perElement()) .apply("Count occurrences", Count.perElement())
.apply( .apply(
"Format as CSVs", "Format as CSVs",

View file

@ -16,12 +16,14 @@ package google.registry.beam.billing;
import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.Truth.assertThat;
import google.registry.beam.billing.BillingEvent.BillingEventCoder;
import google.registry.beam.billing.BillingEvent.InvoiceGroupingKey; import google.registry.beam.billing.BillingEvent.InvoiceGroupingKey;
import google.registry.beam.billing.BillingEvent.InvoiceGroupingKey.InvoiceGroupingKeyCoder; import google.registry.beam.billing.BillingEvent.InvoiceGroupingKey.InvoiceGroupingKeyCoder;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import org.apache.beam.sdk.coders.NullableCoder;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.DateTimeZone; import org.joda.time.DateTimeZone;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
@ -120,13 +122,22 @@ class BillingEventTest {
@Test @Test
void testInvoiceGroupingKeyCoder_deterministicSerialization() throws IOException { void testInvoiceGroupingKeyCoder_deterministicSerialization() throws IOException {
InvoiceGroupingKey invoiceKey = event.getInvoiceGroupingKey(); InvoiceGroupingKey invoiceKey = event.getInvoiceGroupingKey();
InvoiceGroupingKeyCoder coder = new InvoiceGroupingKeyCoder(); InvoiceGroupingKeyCoder coder = InvoiceGroupingKeyCoder.of();
ByteArrayOutputStream outStream = new ByteArrayOutputStream(); ByteArrayOutputStream outStream = new ByteArrayOutputStream();
coder.encode(invoiceKey, outStream); coder.encode(invoiceKey, outStream);
InputStream inStream = new ByteArrayInputStream(outStream.toByteArray()); InputStream inStream = new ByteArrayInputStream(outStream.toByteArray());
assertThat(coder.decode(inStream)).isEqualTo(invoiceKey); assertThat(coder.decode(inStream)).isEqualTo(invoiceKey);
} }
@Test
void testBillingEventCoder_deterministicSerialization() throws IOException {
NullableCoder<BillingEvent> coder = BillingEventCoder.ofNullable();
ByteArrayOutputStream outStream = new ByteArrayOutputStream();
coder.encode(event, outStream);
InputStream inStream = new ByteArrayInputStream(outStream.toByteArray());
assertThat(coder.decode(inStream)).isEqualTo(event);
}
@Test @Test
void testGetDetailReportHeader() { void testGetDetailReportHeader() {
assertThat(BillingEvent.getHeader()) assertThat(BillingEvent.getHeader())

View file

@ -36,6 +36,7 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedMap; import com.google.common.collect.ImmutableSortedMap;
import com.google.common.testing.TestLogHandler; import com.google.common.testing.TestLogHandler;
import google.registry.beam.TestPipelineExtension; import google.registry.beam.TestPipelineExtension;
import google.registry.beam.billing.BillingEvent.BillingEventCoder;
import google.registry.model.billing.BillingBase.Flag; import google.registry.model.billing.BillingBase.Flag;
import google.registry.model.billing.BillingBase.Reason; import google.registry.model.billing.BillingBase.Reason;
import google.registry.model.billing.BillingCancellation; import google.registry.model.billing.BillingCancellation;
@ -51,13 +52,13 @@ import google.registry.persistence.transaction.JpaTestExtensions.JpaIntegrationT
import google.registry.testing.FakeClock; import google.registry.testing.FakeClock;
import google.registry.util.ResourceUtils; import google.registry.util.ResourceUtils;
import java.io.File; import java.io.File;
import java.io.Serial;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.Arrays; import java.util.Arrays;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Optional; import java.util.Optional;
import java.util.logging.Logger; import java.util.logging.Logger;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.Create;
@ -251,9 +252,7 @@ class InvoicingPipelineTest {
options.setYearMonth(YEAR_MONTH); options.setYearMonth(YEAR_MONTH);
options.setInvoiceFilePrefix(INVOICE_FILE_PREFIX); options.setInvoiceFilePrefix(INVOICE_FILE_PREFIX);
billingEvents = billingEvents =
pipeline.apply( pipeline.apply(Create.of(INPUT_EVENTS).withCoder(BillingEventCoder.ofNullable()));
Create.of(INPUT_EVENTS)
.withCoder(SerializableCoder.of(google.registry.beam.billing.BillingEvent.class)));
} }
@Test @Test
@ -346,26 +345,26 @@ class InvoicingPipelineTest {
@Test @Test
void testSuccess_makeCloudSqlQuery() throws Exception { void testSuccess_makeCloudSqlQuery() throws Exception {
// Pipeline must be run due to the TestPipelineExtension // The Pipeline must run due to TestPipelineExtension's checks.
pipeline.run().waitUntilFinish(); pipeline.run().waitUntilFinish();
// Test that comments are removed from the .sql file correctly // Test that comments are removed from the .sql file correctly
assertThat(InvoicingPipeline.makeCloudSqlQuery("2017-10")) assertThat(InvoicingPipeline.makeCloudSqlQuery("2017-10"))
.isEqualTo( .isEqualTo(
'\n' """
+ "SELECT b, r FROM BillingEvent b\n"
+ "JOIN Registrar r ON b.clientId = r.registrarId\n" SELECT b, r FROM BillingEvent b
+ "JOIN Domain d ON b.domainRepoId = d.repoId\n" JOIN Registrar r ON b.clientId = r.registrarId
+ "JOIN Tld t ON t.tldStr = d.tld\n" JOIN Domain d ON b.domainRepoId = d.repoId
+ "LEFT JOIN BillingCancellation c ON b.id = c.billingEvent\n" JOIN Tld t ON t.tldStr = d.tld
+ "LEFT JOIN BillingCancellation cr ON b.cancellationMatchingBillingEvent =" LEFT JOIN BillingCancellation c ON b.id = c.billingEvent
+ " cr.billingRecurrence\n" LEFT JOIN BillingCancellation cr ON b.cancellationMatchingBillingEvent = cr.billingRecurrence
+ "WHERE r.billingAccountMap IS NOT NULL\n" WHERE r.billingAccountMap IS NOT NULL
+ "AND r.type = 'REAL'\n" AND r.type = 'REAL'
+ "AND t.invoicingEnabled IS TRUE\n" AND t.invoicingEnabled IS TRUE
+ "AND b.billingTime BETWEEN CAST('2017-10-01' AS timestamp) AND CAST('2017-11-01'" AND b.billingTime BETWEEN CAST('2017-10-01' AS timestamp) AND CAST('2017-11-01' AS timestamp)
+ " AS timestamp)\n" AND c.id IS NULL
+ "AND c.id IS NULL\n" AND cr.id IS NULL
+ "AND cr.id IS NULL\n"); """);
} }
/** Returns the text contents of a file under the beamBucket/results directory. */ /** Returns the text contents of a file under the beamBucket/results directory. */
@ -604,31 +603,33 @@ class InvoicingPipelineTest {
PCollection<google.registry.beam.billing.BillingEvent>, PCollection<google.registry.beam.billing.BillingEvent>,
PCollection<google.registry.beam.billing.BillingEvent>> { PCollection<google.registry.beam.billing.BillingEvent>> {
private static final long serialVersionUID = 2695033474967615250L; @Serial private static final long serialVersionUID = 2695033474967615250L;
@Override @Override
public PCollection<google.registry.beam.billing.BillingEvent> expand( public PCollection<google.registry.beam.billing.BillingEvent> expand(
PCollection<google.registry.beam.billing.BillingEvent> input) { PCollection<google.registry.beam.billing.BillingEvent> input) {
return input.apply( return input
"Map to invoicing key", .apply(
MapElements.into(TypeDescriptor.of(google.registry.beam.billing.BillingEvent.class)) "Map to invoicing key",
.via( MapElements.into(TypeDescriptor.of(google.registry.beam.billing.BillingEvent.class))
billingEvent -> .via(
google.registry.beam.billing.BillingEvent.create( billingEvent ->
billingEvent.id(), google.registry.beam.billing.BillingEvent.create(
billingEvent.billingTime(), billingEvent.id(),
billingEvent.eventTime(), billingEvent.billingTime(),
billingEvent.registrarId(), billingEvent.eventTime(),
billingEvent.billingId(), billingEvent.registrarId(),
billingEvent.poNumber(), billingEvent.billingId(),
billingEvent.tld(), billingEvent.poNumber(),
billingEvent.action(), billingEvent.tld(),
billingEvent.domain(), billingEvent.action(),
"REPO-ID", billingEvent.domain(),
billingEvent.years(), "REPO-ID",
billingEvent.currency(), billingEvent.years(),
billingEvent.amount(), billingEvent.currency(),
billingEvent.flags()))); billingEvent.amount(),
billingEvent.flags())))
.setCoder(BillingEventCoder.ofNullable());
} }
} }
} }