mirror of
https://github.com/google/nomulus.git
synced 2025-08-29 04:23:15 +02:00
Don't fail invoicing on missing PAK (#1595)
* Don't fail invoicing on missing PAK * Skip line if missing PAK * Add log check in test
This commit is contained in:
parent
c2e1f2e640
commit
147d133aef
2 changed files with 54 additions and 38 deletions
|
@ -14,11 +14,11 @@
|
||||||
|
|
||||||
package google.registry.beam.invoicing;
|
package google.registry.beam.invoicing;
|
||||||
|
|
||||||
import static com.google.common.base.Preconditions.checkState;
|
|
||||||
import static com.google.common.collect.ImmutableSet.toImmutableSet;
|
import static com.google.common.collect.ImmutableSet.toImmutableSet;
|
||||||
import static google.registry.beam.BeamUtils.getQueryFromFile;
|
import static google.registry.beam.BeamUtils.getQueryFromFile;
|
||||||
import static org.apache.beam.sdk.values.TypeDescriptors.strings;
|
import static org.apache.beam.sdk.values.TypeDescriptors.strings;
|
||||||
|
|
||||||
|
import com.google.common.flogger.FluentLogger;
|
||||||
import google.registry.beam.common.RegistryJpaIO;
|
import google.registry.beam.common.RegistryJpaIO;
|
||||||
import google.registry.beam.common.RegistryJpaIO.Read;
|
import google.registry.beam.common.RegistryJpaIO.Read;
|
||||||
import google.registry.beam.invoicing.BillingEvent.InvoiceGroupingKey;
|
import google.registry.beam.invoicing.BillingEvent.InvoiceGroupingKey;
|
||||||
|
@ -36,6 +36,7 @@ import java.time.LocalTime;
|
||||||
import java.time.YearMonth;
|
import java.time.YearMonth;
|
||||||
import java.time.ZoneId;
|
import java.time.ZoneId;
|
||||||
import java.time.format.DateTimeFormatter;
|
import java.time.format.DateTimeFormatter;
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
import org.apache.beam.sdk.Pipeline;
|
import org.apache.beam.sdk.Pipeline;
|
||||||
|
@ -75,6 +76,8 @@ public class InvoicingPipeline implements Serializable {
|
||||||
private static final Pattern SQL_COMMENT_REGEX =
|
private static final Pattern SQL_COMMENT_REGEX =
|
||||||
Pattern.compile("^\\s*--.*\\n", Pattern.MULTILINE);
|
Pattern.compile("^\\s*--.*\\n", Pattern.MULTILINE);
|
||||||
|
|
||||||
|
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
|
||||||
|
|
||||||
private final InvoicingPipelineOptions options;
|
private final InvoicingPipelineOptions options;
|
||||||
|
|
||||||
InvoicingPipeline(InvoicingPipelineOptions options) {
|
InvoicingPipeline(InvoicingPipelineOptions options) {
|
||||||
|
@ -115,23 +118,29 @@ public class InvoicingPipeline implements Serializable {
|
||||||
InvoicingPipelineOptions options, Pipeline pipeline) {
|
InvoicingPipelineOptions options, Pipeline pipeline) {
|
||||||
Read<Object[], BillingEvent> read =
|
Read<Object[], BillingEvent> read =
|
||||||
RegistryJpaIO.read(
|
RegistryJpaIO.read(
|
||||||
makeCloudSqlQuery(options.getYearMonth()), false, InvoicingPipeline::parseRow);
|
makeCloudSqlQuery(options.getYearMonth()), false, row -> parseRow(row).orElse(null));
|
||||||
|
|
||||||
return pipeline.apply("Read BillingEvents from Cloud SQL", read);
|
PCollection<BillingEvent> billingEventsWithNulls =
|
||||||
|
pipeline.apply("Read BillingEvents from Cloud SQL", read);
|
||||||
|
|
||||||
|
// Remove null billing events
|
||||||
|
return billingEventsWithNulls.apply(Filter.by(Objects::nonNull));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static BillingEvent parseRow(Object[] row) {
|
private static Optional<BillingEvent> parseRow(Object[] row) {
|
||||||
google.registry.model.billing.BillingEvent.OneTime oneTime =
|
google.registry.model.billing.BillingEvent.OneTime oneTime =
|
||||||
(google.registry.model.billing.BillingEvent.OneTime) row[0];
|
(google.registry.model.billing.BillingEvent.OneTime) row[0];
|
||||||
Registrar registrar = (Registrar) row[1];
|
Registrar registrar = (Registrar) row[1];
|
||||||
CurrencyUnit currency = oneTime.getCost().getCurrencyUnit();
|
CurrencyUnit currency = oneTime.getCost().getCurrencyUnit();
|
||||||
checkState(
|
if (!registrar.getBillingAccountMap().containsKey(currency)) {
|
||||||
registrar.getBillingAccountMap().containsKey(currency),
|
logger.atSevere().log(
|
||||||
"Registrar %s does not have a product account key for the currency unit: %s",
|
"Registrar %s does not have a product account key for the currency unit: %s",
|
||||||
registrar.getRegistrarId(),
|
registrar.getRegistrarId(), currency);
|
||||||
currency);
|
return Optional.empty();
|
||||||
|
}
|
||||||
|
|
||||||
return BillingEvent.create(
|
return Optional.of(
|
||||||
|
BillingEvent.create(
|
||||||
oneTime.getId(),
|
oneTime.getId(),
|
||||||
DateTimeUtils.toZonedDateTime(oneTime.getBillingTime(), ZoneId.of("UTC")),
|
DateTimeUtils.toZonedDateTime(oneTime.getBillingTime(), ZoneId.of("UTC")),
|
||||||
DateTimeUtils.toZonedDateTime(oneTime.getEventTime(), ZoneId.of("UTC")),
|
DateTimeUtils.toZonedDateTime(oneTime.getEventTime(), ZoneId.of("UTC")),
|
||||||
|
@ -146,7 +155,7 @@ public class InvoicingPipeline implements Serializable {
|
||||||
oneTime.getCost().getCurrencyUnit().toString(),
|
oneTime.getCost().getCurrencyUnit().toString(),
|
||||||
oneTime.getCost().getAmount().doubleValue(),
|
oneTime.getCost().getAmount().doubleValue(),
|
||||||
String.join(
|
String.join(
|
||||||
" ", oneTime.getFlags().stream().map(Flag::toString).collect(toImmutableSet())));
|
" ", oneTime.getFlags().stream().map(Flag::toString).collect(toImmutableSet()))));
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Transform that converts a {@code BillingEvent} into an invoice CSV row. */
|
/** Transform that converts a {@code BillingEvent} into an invoice CSV row. */
|
||||||
|
|
|
@ -22,17 +22,19 @@ import static google.registry.testing.DatabaseHelper.newRegistry;
|
||||||
import static google.registry.testing.DatabaseHelper.persistActiveDomain;
|
import static google.registry.testing.DatabaseHelper.persistActiveDomain;
|
||||||
import static google.registry.testing.DatabaseHelper.persistNewRegistrar;
|
import static google.registry.testing.DatabaseHelper.persistNewRegistrar;
|
||||||
import static google.registry.testing.DatabaseHelper.persistResource;
|
import static google.registry.testing.DatabaseHelper.persistResource;
|
||||||
|
import static google.registry.testing.LogsSubject.assertAboutLogs;
|
||||||
import static google.registry.util.DateTimeUtils.END_OF_TIME;
|
import static google.registry.util.DateTimeUtils.END_OF_TIME;
|
||||||
import static google.registry.util.DateTimeUtils.START_OF_TIME;
|
import static google.registry.util.DateTimeUtils.START_OF_TIME;
|
||||||
|
import static java.util.logging.Level.SEVERE;
|
||||||
import static org.joda.money.CurrencyUnit.CAD;
|
import static org.joda.money.CurrencyUnit.CAD;
|
||||||
import static org.joda.money.CurrencyUnit.JPY;
|
import static org.joda.money.CurrencyUnit.JPY;
|
||||||
import static org.joda.money.CurrencyUnit.USD;
|
import static org.joda.money.CurrencyUnit.USD;
|
||||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
|
||||||
|
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.common.collect.ImmutableSet;
|
import com.google.common.collect.ImmutableSet;
|
||||||
import com.google.common.collect.ImmutableSortedMap;
|
import com.google.common.collect.ImmutableSortedMap;
|
||||||
|
import com.google.common.testing.TestLogHandler;
|
||||||
import google.registry.beam.TestPipelineExtension;
|
import google.registry.beam.TestPipelineExtension;
|
||||||
import google.registry.model.billing.BillingEvent.Cancellation;
|
import google.registry.model.billing.BillingEvent.Cancellation;
|
||||||
import google.registry.model.billing.BillingEvent.Flag;
|
import google.registry.model.billing.BillingEvent.Flag;
|
||||||
|
@ -59,7 +61,7 @@ import java.time.ZonedDateTime;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
|
import java.util.logging.Logger;
|
||||||
import org.apache.beam.sdk.coders.SerializableCoder;
|
import org.apache.beam.sdk.coders.SerializableCoder;
|
||||||
import org.apache.beam.sdk.options.PipelineOptionsFactory;
|
import org.apache.beam.sdk.options.PipelineOptionsFactory;
|
||||||
import org.apache.beam.sdk.testing.PAssert;
|
import org.apache.beam.sdk.testing.PAssert;
|
||||||
|
@ -252,9 +254,14 @@ class InvoicingPipelineTest {
|
||||||
|
|
||||||
private File billingBucketUrl;
|
private File billingBucketUrl;
|
||||||
private PCollection<BillingEvent> billingEvents;
|
private PCollection<BillingEvent> billingEvents;
|
||||||
|
private final TestLogHandler logHandler = new TestLogHandler();
|
||||||
|
|
||||||
|
private final Logger loggerToIntercept =
|
||||||
|
Logger.getLogger(InvoicingPipeline.class.getCanonicalName());
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
void beforeEach() throws Exception {
|
void beforeEach() throws Exception {
|
||||||
|
loggerToIntercept.addHandler(logHandler);
|
||||||
billingBucketUrl = Files.createDirectory(tmpDir.resolve(BILLING_BUCKET_URL)).toFile();
|
billingBucketUrl = Files.createDirectory(tmpDir.resolve(BILLING_BUCKET_URL)).toFile();
|
||||||
options.setBillingBucketUrl(billingBucketUrl.getAbsolutePath());
|
options.setBillingBucketUrl(billingBucketUrl.getAbsolutePath());
|
||||||
options.setYearMonth(YEAR_MONTH);
|
options.setYearMonth(YEAR_MONTH);
|
||||||
|
@ -300,8 +307,9 @@ class InvoicingPipelineTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void testFailure_readFromCloudSqlMissingPAK() throws Exception {
|
void testSuccess_readFromCloudSqlMissingPAK() throws Exception {
|
||||||
Registrar registrar = persistNewRegistrar("TheRegistrar");
|
setupCloudSql();
|
||||||
|
Registrar registrar = persistNewRegistrar("ARegistrar");
|
||||||
registrar =
|
registrar =
|
||||||
registrar
|
registrar
|
||||||
.asBuilder()
|
.asBuilder()
|
||||||
|
@ -317,17 +325,16 @@ class InvoicingPipelineTest {
|
||||||
persistResource(test);
|
persistResource(test);
|
||||||
DomainBase domain = persistActiveDomain("mycanadiandomain.test");
|
DomainBase domain = persistActiveDomain("mycanadiandomain.test");
|
||||||
|
|
||||||
persistOneTimeBillingEvent(1, domain, registrar, Reason.RENEW, 3, Money.of(CAD, 20.5));
|
persistOneTimeBillingEvent(25, domain, registrar, Reason.RENEW, 3, Money.of(CAD, 20.5));
|
||||||
PCollection<BillingEvent> billingEvents = InvoicingPipeline.readFromCloudSql(options, pipeline);
|
PCollection<BillingEvent> billingEvents = InvoicingPipeline.readFromCloudSql(options, pipeline);
|
||||||
billingEvents = billingEvents.apply(new ChangeDomainRepo());
|
billingEvents = billingEvents.apply(new ChangeDomainRepo());
|
||||||
PAssert.that(billingEvents).empty();
|
PAssert.that(billingEvents).containsInAnyOrder(INPUT_EVENTS);
|
||||||
PipelineExecutionException thrown =
|
pipeline.run().waitUntilFinish();
|
||||||
assertThrows(PipelineExecutionException.class, () -> pipeline.run().waitUntilFinish());
|
assertAboutLogs()
|
||||||
assertThat(thrown)
|
.that(logHandler)
|
||||||
.hasMessageThat()
|
.hasLogAtLevelWithMessage(
|
||||||
.contains(
|
SEVERE,
|
||||||
"Registrar TheRegistrar does not have a product account key for the currency unit:"
|
"Registrar ARegistrar does not have a product account key for the currency unit: CAD");
|
||||||
+ " CAD");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue