diff --git a/core/src/main/java/google/registry/beam/billing/BillingEvent.java b/core/src/main/java/google/registry/beam/billing/BillingEvent.java
index 98f13524b..f91820cb8 100644
--- a/core/src/main/java/google/registry/beam/billing/BillingEvent.java
+++ b/core/src/main/java/google/registry/beam/billing/BillingEvent.java
@@ -21,26 +21,21 @@ import google.registry.reporting.billing.BillingModule;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.io.Serializable;
import java.util.regex.Pattern;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.DoubleCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
-/**
- * A POJO representing a single billable event, parsed from a {@code SchemaAndRecord}.
- *
- *
This is a trivially serializable class that allows Beam to transform the results of a Cloud
- * SQL query into a standard Java representation, giving us the type guarantees and ease of
- * manipulation Cloud SQL lacks.
- */
+/** A POJO representing a single billable event, parsed from a {@code SchemaAndRecord}. */
@AutoValue
-public abstract class BillingEvent implements Serializable {
-
- private static final long serialVersionUID = -3593088371541450077L;
+public abstract class BillingEvent {
private static final DateTimeFormatter DATE_TIME_FORMATTER =
DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss zzz");
@@ -85,7 +80,7 @@ public abstract class BillingEvent implements Serializable {
/** Returns the tld this event was generated for. */
abstract String tld();
- /** Returns the billable action this event was generated for (i.e. RENEW, CREATE, TRANSFER...) */
+ /** Returns the billable action this event was generated for (i.e., RENEW, CREATE, TRANSFER...) */
abstract String action();
/** Returns the fully qualified domain name this event was generated for. */
@@ -97,7 +92,7 @@ public abstract class BillingEvent implements Serializable {
/** Returns the number of years this billing event is made out for. */
abstract int years();
- /** Returns the 3-letter currency code for the billing event (i.e. USD or JPY.) */
+ /** Returns the 3-letter currency code for the billing event (i.e., USD or JPY.) */
abstract String currency();
/** Returns the cost associated with this billing event. */
@@ -203,9 +198,7 @@ public abstract class BillingEvent implements Serializable {
/** Key for each {@code BillingEvent}, when aggregating for the overall invoice. */
@AutoValue
- abstract static class InvoiceGroupingKey implements Serializable {
-
- private static final long serialVersionUID = -151561764235256205L;
+ abstract static class InvoiceGroupingKey {
private static final ImmutableList INVOICE_HEADERS =
ImmutableList.of(
@@ -277,8 +270,14 @@ public abstract class BillingEvent implements Serializable {
/** Coder that provides deterministic (de)serialization for {@code InvoiceGroupingKey}. */
static class InvoiceGroupingKeyCoder extends AtomicCoder {
+ private static final Coder stringCoder = StringUtf8Coder.of();
+ private static final InvoiceGroupingKeyCoder INSTANCE = new InvoiceGroupingKeyCoder();
- private static final long serialVersionUID = 6680701524304107547L;
+ public static InvoiceGroupingKeyCoder of() {
+ return INSTANCE;
+ }
+
+ private InvoiceGroupingKeyCoder() {}
@Override
public void encode(InvoiceGroupingKey value, OutputStream outStream) throws IOException {
@@ -295,7 +294,6 @@ public abstract class BillingEvent implements Serializable {
@Override
public InvoiceGroupingKey decode(InputStream inStream) throws IOException {
- Coder stringCoder = StringUtf8Coder.of();
return new AutoValue_BillingEvent_InvoiceGroupingKey(
stringCoder.decode(inStream),
stringCoder.decode(inStream),
@@ -308,4 +306,55 @@ public abstract class BillingEvent implements Serializable {
}
}
}
+
+ static class BillingEventCoder extends AtomicCoder {
+ private static final Coder stringCoder = StringUtf8Coder.of();
+ private static final Coder integerCoder = VarIntCoder.of();
+ private static final Coder longCoder = VarLongCoder.of();
+ private static final Coder doubleCoder = DoubleCoder.of();
+ private static final BillingEventCoder INSTANCE = new BillingEventCoder();
+
+ static NullableCoder ofNullable() {
+ return NullableCoder.of(INSTANCE);
+ }
+
+ private BillingEventCoder() {}
+
+ @Override
+ public void encode(BillingEvent value, OutputStream outStream) throws IOException {
+ longCoder.encode(value.id(), outStream);
+ stringCoder.encode(DATE_TIME_FORMATTER.print(value.billingTime()), outStream);
+ stringCoder.encode(DATE_TIME_FORMATTER.print(value.eventTime()), outStream);
+ stringCoder.encode(value.registrarId(), outStream);
+ stringCoder.encode(value.billingId(), outStream);
+ stringCoder.encode(value.poNumber(), outStream);
+ stringCoder.encode(value.tld(), outStream);
+ stringCoder.encode(value.action(), outStream);
+ stringCoder.encode(value.domain(), outStream);
+ stringCoder.encode(value.repositoryId(), outStream);
+ integerCoder.encode(value.years(), outStream);
+ stringCoder.encode(value.currency(), outStream);
+ doubleCoder.encode(value.amount(), outStream);
+ stringCoder.encode(value.flags(), outStream);
+ }
+
+ @Override
+ public BillingEvent decode(InputStream inStream) throws IOException {
+ return new AutoValue_BillingEvent(
+ longCoder.decode(inStream),
+ DATE_TIME_FORMATTER.parseDateTime(stringCoder.decode(inStream)),
+ DATE_TIME_FORMATTER.parseDateTime(stringCoder.decode(inStream)),
+ stringCoder.decode(inStream),
+ stringCoder.decode(inStream),
+ stringCoder.decode(inStream),
+ stringCoder.decode(inStream),
+ stringCoder.decode(inStream),
+ stringCoder.decode(inStream),
+ stringCoder.decode(inStream),
+ integerCoder.decode(inStream),
+ stringCoder.decode(inStream),
+ doubleCoder.decode(inStream),
+ stringCoder.decode(inStream));
+ }
+ }
}
diff --git a/core/src/main/java/google/registry/beam/billing/InvoicingPipeline.java b/core/src/main/java/google/registry/beam/billing/InvoicingPipeline.java
index af199fb08..7b375daa3 100644
--- a/core/src/main/java/google/registry/beam/billing/InvoicingPipeline.java
+++ b/core/src/main/java/google/registry/beam/billing/InvoicingPipeline.java
@@ -18,6 +18,7 @@ import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static org.apache.beam.sdk.values.TypeDescriptors.strings;
import com.google.common.flogger.FluentLogger;
+import google.registry.beam.billing.BillingEvent.BillingEventCoder;
import google.registry.beam.billing.BillingEvent.InvoiceGroupingKey;
import google.registry.beam.billing.BillingEvent.InvoiceGroupingKey.InvoiceGroupingKeyCoder;
import google.registry.beam.common.RegistryJpaIO;
@@ -30,6 +31,7 @@ import google.registry.reporting.billing.BillingModule;
import google.registry.util.DomainNameUtils;
import google.registry.util.ResourceUtils;
import google.registry.util.SqlTemplate;
+import java.io.Serial;
import java.io.Serializable;
import java.time.YearMonth;
import java.util.Objects;
@@ -37,13 +39,13 @@ import java.util.Optional;
import java.util.regex.Pattern;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Contextful;
import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
@@ -65,7 +67,7 @@ import org.joda.money.CurrencyUnit;
*/
public class InvoicingPipeline implements Serializable {
- private static final long serialVersionUID = 5386330443625580081L;
+ @Serial private static final long serialVersionUID = 5386330443625580081L;
private static final Pattern SQL_COMMENT_REGEX =
Pattern.compile("^\\s*--.*\\n", Pattern.MULTILINE);
@@ -97,13 +99,11 @@ public class InvoicingPipeline implements Serializable {
Read