From 147d133aefa2a3c996be58f693c70dfdca08c5ec Mon Sep 17 00:00:00 2001 From: sarahcaseybot Date: Fri, 22 Apr 2022 13:00:50 -0400 Subject: [PATCH] Don't fail invoicing on missing PAK (#1595) * Don't fail invoicing on missing PAK * Skip line if missing PAK * Add log check in test --- .../beam/invoicing/InvoicingPipeline.java | 59 +++++++++++-------- .../beam/invoicing/InvoicingPipelineTest.java | 33 +++++++---- 2 files changed, 54 insertions(+), 38 deletions(-) diff --git a/core/src/main/java/google/registry/beam/invoicing/InvoicingPipeline.java b/core/src/main/java/google/registry/beam/invoicing/InvoicingPipeline.java index 9c4e0560c..44ad0a341 100644 --- a/core/src/main/java/google/registry/beam/invoicing/InvoicingPipeline.java +++ b/core/src/main/java/google/registry/beam/invoicing/InvoicingPipeline.java @@ -14,11 +14,11 @@ package google.registry.beam.invoicing; -import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static google.registry.beam.BeamUtils.getQueryFromFile; import static org.apache.beam.sdk.values.TypeDescriptors.strings; +import com.google.common.flogger.FluentLogger; import google.registry.beam.common.RegistryJpaIO; import google.registry.beam.common.RegistryJpaIO.Read; import google.registry.beam.invoicing.BillingEvent.InvoiceGroupingKey; @@ -36,6 +36,7 @@ import java.time.LocalTime; import java.time.YearMonth; import java.time.ZoneId; import java.time.format.DateTimeFormatter; +import java.util.Objects; import java.util.Optional; import java.util.regex.Pattern; import org.apache.beam.sdk.Pipeline; @@ -75,6 +76,8 @@ public class InvoicingPipeline implements Serializable { private static final Pattern SQL_COMMENT_REGEX = Pattern.compile("^\\s*--.*\\n", Pattern.MULTILINE); + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); + private final InvoicingPipelineOptions options; InvoicingPipeline(InvoicingPipelineOptions options) { @@ -115,38 +118,44 @@ public class InvoicingPipeline implements Serializable { InvoicingPipelineOptions options, Pipeline pipeline) { Read read = RegistryJpaIO.read( - makeCloudSqlQuery(options.getYearMonth()), false, InvoicingPipeline::parseRow); + makeCloudSqlQuery(options.getYearMonth()), false, row -> parseRow(row).orElse(null)); - return pipeline.apply("Read BillingEvents from Cloud SQL", read); + PCollection billingEventsWithNulls = + pipeline.apply("Read BillingEvents from Cloud SQL", read); + + // Remove null billing events + return billingEventsWithNulls.apply(Filter.by(Objects::nonNull)); } - private static BillingEvent parseRow(Object[] row) { + private static Optional parseRow(Object[] row) { google.registry.model.billing.BillingEvent.OneTime oneTime = (google.registry.model.billing.BillingEvent.OneTime) row[0]; Registrar registrar = (Registrar) row[1]; CurrencyUnit currency = oneTime.getCost().getCurrencyUnit(); - checkState( - registrar.getBillingAccountMap().containsKey(currency), - "Registrar %s does not have a product account key for the currency unit: %s", - registrar.getRegistrarId(), - currency); + if (!registrar.getBillingAccountMap().containsKey(currency)) { + logger.atSevere().log( + "Registrar %s does not have a product account key for the currency unit: %s", + registrar.getRegistrarId(), currency); + return Optional.empty(); + } - return BillingEvent.create( - oneTime.getId(), - DateTimeUtils.toZonedDateTime(oneTime.getBillingTime(), ZoneId.of("UTC")), - DateTimeUtils.toZonedDateTime(oneTime.getEventTime(), ZoneId.of("UTC")), - registrar.getRegistrarId(), - registrar.getBillingAccountMap().get(currency), - registrar.getPoNumber().orElse(""), - DomainNameUtils.getTldFromDomainName(oneTime.getTargetId()), - oneTime.getReason().toString(), - oneTime.getTargetId(), - oneTime.getDomainRepoId(), - Optional.ofNullable(oneTime.getPeriodYears()).orElse(0), - oneTime.getCost().getCurrencyUnit().toString(), - oneTime.getCost().getAmount().doubleValue(), - String.join( - " ", oneTime.getFlags().stream().map(Flag::toString).collect(toImmutableSet()))); + return Optional.of( + BillingEvent.create( + oneTime.getId(), + DateTimeUtils.toZonedDateTime(oneTime.getBillingTime(), ZoneId.of("UTC")), + DateTimeUtils.toZonedDateTime(oneTime.getEventTime(), ZoneId.of("UTC")), + registrar.getRegistrarId(), + registrar.getBillingAccountMap().get(currency), + registrar.getPoNumber().orElse(""), + DomainNameUtils.getTldFromDomainName(oneTime.getTargetId()), + oneTime.getReason().toString(), + oneTime.getTargetId(), + oneTime.getDomainRepoId(), + Optional.ofNullable(oneTime.getPeriodYears()).orElse(0), + oneTime.getCost().getCurrencyUnit().toString(), + oneTime.getCost().getAmount().doubleValue(), + String.join( + " ", oneTime.getFlags().stream().map(Flag::toString).collect(toImmutableSet())))); } /** Transform that converts a {@code BillingEvent} into an invoice CSV row. */ diff --git a/core/src/test/java/google/registry/beam/invoicing/InvoicingPipelineTest.java b/core/src/test/java/google/registry/beam/invoicing/InvoicingPipelineTest.java index d36083ff3..c5c24b9cd 100644 --- a/core/src/test/java/google/registry/beam/invoicing/InvoicingPipelineTest.java +++ b/core/src/test/java/google/registry/beam/invoicing/InvoicingPipelineTest.java @@ -22,17 +22,19 @@ import static google.registry.testing.DatabaseHelper.newRegistry; import static google.registry.testing.DatabaseHelper.persistActiveDomain; import static google.registry.testing.DatabaseHelper.persistNewRegistrar; import static google.registry.testing.DatabaseHelper.persistResource; +import static google.registry.testing.LogsSubject.assertAboutLogs; import static google.registry.util.DateTimeUtils.END_OF_TIME; import static google.registry.util.DateTimeUtils.START_OF_TIME; +import static java.util.logging.Level.SEVERE; import static org.joda.money.CurrencyUnit.CAD; import static org.joda.money.CurrencyUnit.JPY; import static org.joda.money.CurrencyUnit.USD; -import static org.junit.jupiter.api.Assertions.assertThrows; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSortedMap; +import com.google.common.testing.TestLogHandler; import google.registry.beam.TestPipelineExtension; import google.registry.model.billing.BillingEvent.Cancellation; import google.registry.model.billing.BillingEvent.Flag; @@ -59,7 +61,7 @@ import java.time.ZonedDateTime; import java.util.Arrays; import java.util.Map.Entry; import java.util.Optional; -import org.apache.beam.sdk.Pipeline.PipelineExecutionException; +import java.util.logging.Logger; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; @@ -252,9 +254,14 @@ class InvoicingPipelineTest { private File billingBucketUrl; private PCollection billingEvents; + private final TestLogHandler logHandler = new TestLogHandler(); + + private final Logger loggerToIntercept = + Logger.getLogger(InvoicingPipeline.class.getCanonicalName()); @BeforeEach void beforeEach() throws Exception { + loggerToIntercept.addHandler(logHandler); billingBucketUrl = Files.createDirectory(tmpDir.resolve(BILLING_BUCKET_URL)).toFile(); options.setBillingBucketUrl(billingBucketUrl.getAbsolutePath()); options.setYearMonth(YEAR_MONTH); @@ -300,8 +307,9 @@ class InvoicingPipelineTest { } @Test - void testFailure_readFromCloudSqlMissingPAK() throws Exception { - Registrar registrar = persistNewRegistrar("TheRegistrar"); + void testSuccess_readFromCloudSqlMissingPAK() throws Exception { + setupCloudSql(); + Registrar registrar = persistNewRegistrar("ARegistrar"); registrar = registrar .asBuilder() @@ -317,17 +325,16 @@ class InvoicingPipelineTest { persistResource(test); DomainBase domain = persistActiveDomain("mycanadiandomain.test"); - persistOneTimeBillingEvent(1, domain, registrar, Reason.RENEW, 3, Money.of(CAD, 20.5)); + persistOneTimeBillingEvent(25, domain, registrar, Reason.RENEW, 3, Money.of(CAD, 20.5)); PCollection billingEvents = InvoicingPipeline.readFromCloudSql(options, pipeline); billingEvents = billingEvents.apply(new ChangeDomainRepo()); - PAssert.that(billingEvents).empty(); - PipelineExecutionException thrown = - assertThrows(PipelineExecutionException.class, () -> pipeline.run().waitUntilFinish()); - assertThat(thrown) - .hasMessageThat() - .contains( - "Registrar TheRegistrar does not have a product account key for the currency unit:" - + " CAD"); + PAssert.that(billingEvents).containsInAnyOrder(INPUT_EVENTS); + pipeline.run().waitUntilFinish(); + assertAboutLogs() + .that(logHandler) + .hasLogAtLevelWithMessage( + SEVERE, + "Registrar ARegistrar does not have a product account key for the currency unit: CAD"); } @Test