mirror of
https://github.com/google/nomulus.git
synced 2025-07-22 18:55:58 +02:00
Update ExpandRecurringBillingEventsAction to use the beam pipeline (#1907)
Due to the way the beam pipeline is designed, it will expand an recurring billing event when its event time is in scope for expansion, instead of billing time. This means that the one time will be generated 45 days earlier. This would negate the need to check if the expansion is finished when generating monthly invoices. We will need to backfill the past 45 days of onetimes before the new code is deployed. As an illustration, with the old code, a cursor time of 2023-01-17 means that all auto-renewals whose billing time is before 2023-01-17 were created, which corresponds to an effective cursor time of 2022-12-03 (45 days before 2023-01-17) for event time. This cursor will need to be brought to 2023-01-17 to ensure that there is no gap in generated event times when switching to use the new code.
This commit is contained in:
parent
5210ac0419
commit
8088efb25c
10 changed files with 239 additions and 1482 deletions
|
@ -105,10 +105,22 @@ public class BatchModule {
|
|||
}
|
||||
|
||||
@Provides
|
||||
@Parameter(ExpandRecurringBillingEventsAction.PARAM_CURSOR_TIME)
|
||||
static Optional<DateTime> provideCursorTime(HttpServletRequest req) {
|
||||
@Parameter(ExpandRecurringBillingEventsAction.PARAM_START_TIME)
|
||||
static Optional<DateTime> provideStartTime(HttpServletRequest req) {
|
||||
return extractOptionalDatetimeParameter(
|
||||
req, ExpandRecurringBillingEventsAction.PARAM_CURSOR_TIME);
|
||||
req, ExpandRecurringBillingEventsAction.PARAM_START_TIME);
|
||||
}
|
||||
|
||||
@Provides
|
||||
@Parameter(ExpandRecurringBillingEventsAction.PARAM_END_TIME)
|
||||
static Optional<DateTime> provideEndTime(HttpServletRequest req) {
|
||||
return extractOptionalDatetimeParameter(req, ExpandRecurringBillingEventsAction.PARAM_END_TIME);
|
||||
}
|
||||
|
||||
@Provides
|
||||
@Parameter(ExpandRecurringBillingEventsAction.PARAM_ADVANCE_CURSOR)
|
||||
static boolean provideAdvanceCursor(HttpServletRequest req) {
|
||||
return extractBooleanParameter(req, ExpandRecurringBillingEventsAction.PARAM_ADVANCE_CURSOR);
|
||||
}
|
||||
|
||||
@Provides
|
||||
|
|
|
@ -15,58 +15,39 @@
|
|||
package google.registry.batch;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
import static com.google.common.collect.ImmutableSet.toImmutableSet;
|
||||
import static com.google.common.collect.Sets.difference;
|
||||
import static com.google.common.collect.Sets.newHashSet;
|
||||
import static google.registry.batch.BatchModule.PARAM_DRY_RUN;
|
||||
import static google.registry.beam.BeamUtils.createJobName;
|
||||
import static google.registry.model.common.Cursor.CursorType.RECURRING_BILLING;
|
||||
import static google.registry.model.domain.Period.Unit.YEARS;
|
||||
import static google.registry.model.reporting.HistoryEntry.Type.DOMAIN_AUTORENEW;
|
||||
import static google.registry.persistence.transaction.QueryComposer.Comparator.EQ;
|
||||
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
|
||||
import static google.registry.util.CollectionUtils.union;
|
||||
import static google.registry.util.DateTimeUtils.START_OF_TIME;
|
||||
import static google.registry.util.DateTimeUtils.earliestOf;
|
||||
import static google.registry.util.DomainNameUtils.getTldFromDomainName;
|
||||
import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
|
||||
import static javax.servlet.http.HttpServletResponse.SC_OK;
|
||||
|
||||
import com.google.auto.value.AutoValue;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Range;
|
||||
import com.google.common.collect.Streams;
|
||||
import com.google.api.services.dataflow.Dataflow;
|
||||
import com.google.api.services.dataflow.model.LaunchFlexTemplateParameter;
|
||||
import com.google.api.services.dataflow.model.LaunchFlexTemplateRequest;
|
||||
import com.google.api.services.dataflow.model.LaunchFlexTemplateResponse;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.flogger.FluentLogger;
|
||||
import google.registry.beam.billing.ExpandRecurringBillingEventsPipeline;
|
||||
import google.registry.config.RegistryConfig.Config;
|
||||
import google.registry.flows.domain.DomainPricingLogic;
|
||||
import google.registry.model.ImmutableObject;
|
||||
import google.registry.model.billing.BillingEvent;
|
||||
import google.registry.model.billing.BillingEvent.Flag;
|
||||
import google.registry.config.RegistryEnvironment;
|
||||
import google.registry.model.billing.BillingEvent.OneTime;
|
||||
import google.registry.model.billing.BillingEvent.Recurring;
|
||||
import google.registry.model.common.Cursor;
|
||||
import google.registry.model.domain.Domain;
|
||||
import google.registry.model.domain.DomainHistory;
|
||||
import google.registry.model.domain.Period;
|
||||
import google.registry.model.reporting.DomainTransactionRecord;
|
||||
import google.registry.model.reporting.DomainTransactionRecord.TransactionReportField;
|
||||
import google.registry.model.tld.Registry;
|
||||
import google.registry.persistence.VKey;
|
||||
import google.registry.request.Action;
|
||||
import google.registry.request.Parameter;
|
||||
import google.registry.request.Response;
|
||||
import google.registry.request.auth.Auth;
|
||||
import google.registry.util.Clock;
|
||||
import java.util.List;
|
||||
import java.io.IOException;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import javax.inject.Inject;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
/**
|
||||
* An action that expands {@link Recurring} billing events into synthetic {@link OneTime} events.
|
||||
*
|
||||
* <p>The cursor used throughout this action (overridden if necessary using the parameter {@code
|
||||
* cursorTime}) represents the inclusive lower bound on the range of billing times that will be
|
||||
* expanded as a result of the job (the exclusive upper bound being the execution time of the job).
|
||||
* An action that kicks off a {@link ExpandRecurringBillingEventsPipeline} dataflow job to expand
|
||||
* {@link Recurring} billing events into synthetic {@link OneTime} events.
|
||||
*/
|
||||
@Action(
|
||||
service = Action.Service.BACKEND,
|
||||
|
@ -74,303 +55,109 @@ import org.joda.time.DateTime;
|
|||
auth = Auth.AUTH_INTERNAL_OR_ADMIN)
|
||||
public class ExpandRecurringBillingEventsAction implements Runnable {
|
||||
|
||||
public static final String PARAM_CURSOR_TIME = "cursorTime";
|
||||
public static final String PARAM_START_TIME = "startTime";
|
||||
public static final String PARAM_END_TIME = "endTime";
|
||||
public static final String PARAM_ADVANCE_CURSOR = "advanceCursor";
|
||||
|
||||
private static final String PIPELINE_NAME = "expand_recurring_billing_events_pipeline";
|
||||
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
|
||||
|
||||
@Inject Clock clock;
|
||||
|
||||
@Inject
|
||||
@Config("jdbcBatchSize")
|
||||
int batchSize;
|
||||
@Parameter(PARAM_DRY_RUN)
|
||||
boolean isDryRun;
|
||||
|
||||
@Inject @Parameter(PARAM_DRY_RUN) boolean isDryRun;
|
||||
@Inject @Parameter(PARAM_CURSOR_TIME) Optional<DateTime> cursorTimeParam;
|
||||
@Inject
|
||||
@Parameter(PARAM_ADVANCE_CURSOR)
|
||||
boolean advanceCursor;
|
||||
|
||||
@Inject
|
||||
@Parameter(PARAM_START_TIME)
|
||||
Optional<DateTime> startTimeParam;
|
||||
|
||||
@Inject
|
||||
@Parameter(PARAM_END_TIME)
|
||||
Optional<DateTime> endTimeParam;
|
||||
|
||||
@Inject
|
||||
@Config("projectId")
|
||||
String projectId;
|
||||
|
||||
@Inject
|
||||
@Config("defaultJobRegion")
|
||||
String jobRegion;
|
||||
|
||||
@Inject
|
||||
@Config("beamStagingBucketUrl")
|
||||
String stagingBucketUrl;
|
||||
|
||||
@Inject Dataflow dataflow;
|
||||
|
||||
@Inject DomainPricingLogic domainPricingLogic;
|
||||
@Inject Response response;
|
||||
@Inject ExpandRecurringBillingEventsAction() {}
|
||||
|
||||
@Inject
|
||||
ExpandRecurringBillingEventsAction() {}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
DateTime executeTime = clock.nowUtc();
|
||||
DateTime persistedCursorTime =
|
||||
tm().transact(
|
||||
() ->
|
||||
tm().loadByKeyIfPresent(Cursor.createGlobalVKey(RECURRING_BILLING))
|
||||
.orElse(Cursor.createGlobal(RECURRING_BILLING, START_OF_TIME))
|
||||
.getCursorTime());
|
||||
DateTime cursorTime = cursorTimeParam.orElse(persistedCursorTime);
|
||||
checkArgument(!(isDryRun && advanceCursor), "Cannot advance the cursor in a dry run.");
|
||||
DateTime endTime = endTimeParam.orElse(clock.nowUtc());
|
||||
checkArgument(
|
||||
cursorTime.isBefore(executeTime), "Cursor time must be earlier than execution time.");
|
||||
logger.atInfo().log(
|
||||
"Running Recurring billing event expansion for billing time range [%s, %s).",
|
||||
cursorTime, executeTime);
|
||||
expandSqlBillingEventsInBatches(executeTime, cursorTime, persistedCursorTime);
|
||||
}
|
||||
|
||||
private void expandSqlBillingEventsInBatches(
|
||||
DateTime executeTime, DateTime cursorTime, DateTime persistedCursorTime) {
|
||||
int totalBillingEventsSaved = 0;
|
||||
long maxProcessedRecurrenceId = 0;
|
||||
SqlBatchResults sqlBatchResults;
|
||||
|
||||
do {
|
||||
final long prevMaxProcessedRecurrenceId = maxProcessedRecurrenceId;
|
||||
sqlBatchResults =
|
||||
tm().transact(
|
||||
() -> {
|
||||
Set<String> expandedDomains = newHashSet();
|
||||
int batchBillingEventsSaved = 0;
|
||||
long maxRecurrenceId = prevMaxProcessedRecurrenceId;
|
||||
List<Recurring> recurrings =
|
||||
tm().query(
|
||||
"FROM BillingRecurrence "
|
||||
+ "WHERE eventTime <= :executeTime "
|
||||
+ "AND eventTime < recurrenceEndTime "
|
||||
+ "AND id > :maxProcessedRecurrenceId "
|
||||
+ "AND recurrenceEndTime > :adjustedCursorTime "
|
||||
+ "ORDER BY id ASC",
|
||||
Recurring.class)
|
||||
.setParameter("executeTime", executeTime)
|
||||
.setParameter("maxProcessedRecurrenceId", prevMaxProcessedRecurrenceId)
|
||||
.setParameter(
|
||||
"adjustedCursorTime",
|
||||
cursorTime.minus(Registry.DEFAULT_AUTO_RENEW_GRACE_PERIOD))
|
||||
.setMaxResults(batchSize)
|
||||
.getResultList();
|
||||
for (Recurring recurring : recurrings) {
|
||||
if (expandedDomains.contains(recurring.getTargetId())) {
|
||||
// On the off chance this batch contains multiple recurrences for the same
|
||||
// domain (which is actually possible if a given domain is quickly renewed
|
||||
// multiple times in a row), then short-circuit after the first one is
|
||||
// processed that involves actually expanding a billing event. This is
|
||||
// necessary because otherwise we get an "Inserted/updated object reloaded"
|
||||
// error from Hibernate when those billing events would be loaded
|
||||
// inside a transaction where they were already written. Note, there is no
|
||||
// actual further work to be done in this case anyway, not unless it has
|
||||
// somehow been over a year since this action last ran successfully (and if
|
||||
// that were somehow true, the remaining billing events would still be
|
||||
// expanded on subsequent runs).
|
||||
continue;
|
||||
}
|
||||
int billingEventsSaved =
|
||||
expandBillingEvent(
|
||||
recurring, executeTime, cursorTime, isDryRun, domainPricingLogic);
|
||||
batchBillingEventsSaved += billingEventsSaved;
|
||||
if (billingEventsSaved > 0) {
|
||||
expandedDomains.add(recurring.getTargetId());
|
||||
}
|
||||
maxRecurrenceId = Math.max(maxRecurrenceId, recurring.getId());
|
||||
}
|
||||
return SqlBatchResults.create(
|
||||
batchBillingEventsSaved,
|
||||
maxRecurrenceId,
|
||||
maxRecurrenceId > prevMaxProcessedRecurrenceId);
|
||||
});
|
||||
totalBillingEventsSaved += sqlBatchResults.batchBillingEventsSaved();
|
||||
maxProcessedRecurrenceId = sqlBatchResults.maxProcessedRecurrenceId();
|
||||
if (sqlBatchResults.batchBillingEventsSaved() > 0) {
|
||||
logger.atInfo().log(
|
||||
"Saved %d billing events in batch (%d total) with max recurrence id %d.",
|
||||
sqlBatchResults.batchBillingEventsSaved(),
|
||||
totalBillingEventsSaved,
|
||||
maxProcessedRecurrenceId);
|
||||
} else {
|
||||
// If we're churning through a lot of no-op recurrences that don't need expanding (yet?),
|
||||
// then only log one no-op every so often as a good balance between letting the user track
|
||||
// that the action is still running while also not spamming the logs incessantly.
|
||||
logger.atInfo().atMostEvery(3, TimeUnit.MINUTES).log(
|
||||
"Processed up to max recurrence id %d (no billing events saved recently).",
|
||||
maxProcessedRecurrenceId);
|
||||
}
|
||||
} while (sqlBatchResults.shouldContinue());
|
||||
|
||||
if (!isDryRun) {
|
||||
logger.atInfo().log("Saved %d total OneTime billing events.", totalBillingEventsSaved);
|
||||
} else {
|
||||
logger.atInfo().log(
|
||||
"Generated %d total OneTime billing events (dry run).", totalBillingEventsSaved);
|
||||
}
|
||||
logger.atInfo().log(
|
||||
"Recurring event expansion %s complete for billing event range [%s, %s).",
|
||||
isDryRun ? "(dry run) " : "", cursorTime, executeTime);
|
||||
tm().transact(
|
||||
() -> {
|
||||
// Check for the unlikely scenario where the cursor has been altered during the
|
||||
// expansion.
|
||||
DateTime currentCursorTime =
|
||||
tm().loadByKeyIfPresent(Cursor.createGlobalVKey(RECURRING_BILLING))
|
||||
.orElse(Cursor.createGlobal(RECURRING_BILLING, START_OF_TIME))
|
||||
.getCursorTime();
|
||||
if (!currentCursorTime.equals(persistedCursorTime)) {
|
||||
throw new IllegalStateException(
|
||||
!endTime.isAfter(clock.nowUtc()), "End time (%s) must be at or before now", endTime);
|
||||
DateTime startTime =
|
||||
startTimeParam.orElse(
|
||||
tm().transact(
|
||||
() ->
|
||||
tm().loadByKeyIfPresent(Cursor.createGlobalVKey(RECURRING_BILLING))
|
||||
.orElse(Cursor.createGlobal(RECURRING_BILLING, START_OF_TIME))
|
||||
.getCursorTime()));
|
||||
checkArgument(
|
||||
startTime.isBefore(endTime),
|
||||
String.format("Start time (%s) must be before end time (%s)", startTime, endTime));
|
||||
LaunchFlexTemplateParameter launchParameter =
|
||||
new LaunchFlexTemplateParameter()
|
||||
.setJobName(
|
||||
createJobName(
|
||||
String.format(
|
||||
"Current cursor position %s does not match persisted cursor position %s.",
|
||||
currentCursorTime, persistedCursorTime));
|
||||
}
|
||||
if (!isDryRun) {
|
||||
tm().put(Cursor.createGlobal(RECURRING_BILLING, executeTime));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@AutoValue
|
||||
abstract static class SqlBatchResults {
|
||||
abstract int batchBillingEventsSaved();
|
||||
|
||||
abstract long maxProcessedRecurrenceId();
|
||||
|
||||
abstract boolean shouldContinue();
|
||||
|
||||
static SqlBatchResults create(
|
||||
int batchBillingEventsSaved, long maxProcessedRecurrenceId, boolean shouldContinue) {
|
||||
return new AutoValue_ExpandRecurringBillingEventsAction_SqlBatchResults(
|
||||
batchBillingEventsSaved, maxProcessedRecurrenceId, shouldContinue);
|
||||
"expand-billing-%s", startTime.toString("yyyy-MM-dd't'HH-mm-ss'z'")),
|
||||
clock))
|
||||
.setContainerSpecGcsPath(
|
||||
String.format("%s/%s_metadata.json", stagingBucketUrl, PIPELINE_NAME))
|
||||
.setParameters(
|
||||
new ImmutableMap.Builder<String, String>()
|
||||
.put("registryEnvironment", RegistryEnvironment.get().name())
|
||||
.put("startTime", startTime.toString("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"))
|
||||
.put("endTime", endTime.toString("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"))
|
||||
.put("isDryRun", Boolean.toString(isDryRun))
|
||||
.put("advanceCursor", Boolean.toString(advanceCursor))
|
||||
.build());
|
||||
logger.atInfo().log(
|
||||
"Launching recurring billing event expansion pipeline for event time range [%s, %s)%s.",
|
||||
startTime,
|
||||
endTime,
|
||||
isDryRun ? " in dry run mode" : advanceCursor ? "" : " without advancing the cursor");
|
||||
try {
|
||||
LaunchFlexTemplateResponse launchResponse =
|
||||
dataflow
|
||||
.projects()
|
||||
.locations()
|
||||
.flexTemplates()
|
||||
.launch(
|
||||
projectId,
|
||||
jobRegion,
|
||||
new LaunchFlexTemplateRequest().setLaunchParameter(launchParameter))
|
||||
.execute();
|
||||
logger.atInfo().log("Got response: %s", launchResponse.getJob().toPrettyString());
|
||||
response.setStatus(SC_OK);
|
||||
response.setPayload(
|
||||
String.format(
|
||||
"Launched recurring billing event expansion pipeline: %s",
|
||||
launchResponse.getJob().getId()));
|
||||
} catch (IOException e) {
|
||||
logger.atWarning().withCause(e).log("Pipeline Launch failed");
|
||||
response.setStatus(SC_INTERNAL_SERVER_ERROR);
|
||||
response.setPayload(String.format("Pipeline launch failed: %s", e.getMessage()));
|
||||
}
|
||||
}
|
||||
|
||||
private static int expandBillingEvent(
|
||||
Recurring recurring,
|
||||
DateTime executeTime,
|
||||
DateTime cursorTime,
|
||||
boolean isDryRun,
|
||||
DomainPricingLogic domainPricingLogic) {
|
||||
ImmutableSet.Builder<OneTime> syntheticOneTimesBuilder = new ImmutableSet.Builder<>();
|
||||
final Registry tld = Registry.get(getTldFromDomainName(recurring.getTargetId()));
|
||||
|
||||
// Determine the complete set of times at which this recurring event should
|
||||
// occur (up to and including the runtime of the action).
|
||||
Iterable<DateTime> eventTimes =
|
||||
recurring
|
||||
.getRecurrenceTimeOfYear()
|
||||
.getInstancesInRange(
|
||||
Range.closed(
|
||||
recurring.getEventTime(),
|
||||
earliestOf(recurring.getRecurrenceEndTime(), executeTime)));
|
||||
|
||||
// Convert these event times to billing times
|
||||
final ImmutableSet<DateTime> billingTimes =
|
||||
getBillingTimesInScope(eventTimes, cursorTime, executeTime, tld);
|
||||
|
||||
VKey<Domain> domainKey = VKey.create(Domain.class, recurring.getDomainRepoId());
|
||||
Iterable<OneTime> oneTimesForDomain;
|
||||
oneTimesForDomain =
|
||||
tm().createQueryComposer(OneTime.class)
|
||||
.where("domainRepoId", EQ, recurring.getDomainRepoId())
|
||||
.list();
|
||||
|
||||
// Determine the billing times that already have OneTime events persisted.
|
||||
ImmutableSet<DateTime> existingBillingTimes =
|
||||
getExistingBillingTimes(oneTimesForDomain, recurring);
|
||||
|
||||
ImmutableSet.Builder<DomainHistory> historyEntriesBuilder = new ImmutableSet.Builder<>();
|
||||
// Create synthetic OneTime events for all billing times that do not yet have
|
||||
// an event persisted.
|
||||
for (DateTime billingTime : difference(billingTimes, existingBillingTimes)) {
|
||||
// Construct a new HistoryEntry that parents over the OneTime
|
||||
DomainHistory historyEntry =
|
||||
new DomainHistory.Builder()
|
||||
.setBySuperuser(false)
|
||||
.setRegistrarId(recurring.getRegistrarId())
|
||||
.setModificationTime(tm().getTransactionTime())
|
||||
.setDomain(tm().loadByKey(domainKey))
|
||||
.setPeriod(Period.create(1, YEARS))
|
||||
.setReason("Domain autorenewal by ExpandRecurringBillingEventsAction")
|
||||
.setRequestedByRegistrar(false)
|
||||
.setType(DOMAIN_AUTORENEW)
|
||||
// Note: the following statement seems to not be entirely correct as manual renewal
|
||||
// during the autorenew grace period also closes out the existing recurrence, but in
|
||||
// that instance the autorenew history entry should still have the transaction records
|
||||
// for obvious reasons. It can be argued the history entry should always have the
|
||||
// transaction record, regardless of what happens afterward. If the domain is deleted
|
||||
// later during the autorenew grace period, another history entry for the delete would
|
||||
// record that mutation separately, but the previous autorenew should not have its
|
||||
// history entry retroactively altered, or in this case have the transaction records
|
||||
// omitted when its created belatedly (when billing time is in scope). However, since
|
||||
// we will be rewriting this action and only want to do the absolute minimum change to
|
||||
// fix it for now, we will leave the current logic in place to avoid any unnecessary
|
||||
// complications.
|
||||
//
|
||||
// Don't write a domain transaction record if the recurrence was
|
||||
// ended prior to the billing time (i.e. a domain was deleted
|
||||
// during the autorenew grace period).
|
||||
.setDomainTransactionRecords(
|
||||
recurring.getRecurrenceEndTime().isBefore(billingTime)
|
||||
? ImmutableSet.of()
|
||||
: ImmutableSet.of(
|
||||
DomainTransactionRecord.create(
|
||||
tld.getTldStr(),
|
||||
// We report this when the autorenew grace period
|
||||
// ends
|
||||
billingTime,
|
||||
TransactionReportField.netRenewsFieldFromYears(1),
|
||||
1)))
|
||||
.build();
|
||||
historyEntriesBuilder.add(historyEntry);
|
||||
|
||||
DateTime eventTime = billingTime.minus(tld.getAutoRenewGracePeriodLength());
|
||||
|
||||
syntheticOneTimesBuilder.add(
|
||||
new OneTime.Builder()
|
||||
.setBillingTime(billingTime)
|
||||
.setRegistrarId(recurring.getRegistrarId())
|
||||
// Determine the cost for a one-year renewal.
|
||||
.setCost(
|
||||
domainPricingLogic
|
||||
.getRenewPrice(tld, recurring.getTargetId(), eventTime, 1, recurring)
|
||||
.getRenewCost())
|
||||
.setEventTime(eventTime)
|
||||
.setFlags(union(recurring.getFlags(), Flag.SYNTHETIC))
|
||||
.setDomainHistory(historyEntry)
|
||||
.setPeriodYears(1)
|
||||
.setReason(recurring.getReason())
|
||||
.setSyntheticCreationTime(executeTime)
|
||||
.setCancellationMatchingBillingEvent(recurring)
|
||||
.setTargetId(recurring.getTargetId())
|
||||
.build());
|
||||
}
|
||||
Set<DomainHistory> historyEntries = historyEntriesBuilder.build();
|
||||
Set<OneTime> syntheticOneTimes = syntheticOneTimesBuilder.build();
|
||||
if (!isDryRun) {
|
||||
ImmutableSet<ImmutableObject> entitiesToSave =
|
||||
new ImmutableSet.Builder<ImmutableObject>()
|
||||
.addAll(historyEntries)
|
||||
.addAll(syntheticOneTimes)
|
||||
.build();
|
||||
tm().putAll(entitiesToSave);
|
||||
}
|
||||
return syntheticOneTimes.size();
|
||||
}
|
||||
|
||||
/**
|
||||
* Filters a set of {@link DateTime}s down to event times that are in scope for a particular
|
||||
* action run, given the cursor time and the action execution time.
|
||||
*/
|
||||
protected static ImmutableSet<DateTime> getBillingTimesInScope(
|
||||
Iterable<DateTime> eventTimes,
|
||||
DateTime cursorTime,
|
||||
DateTime executeTime,
|
||||
final Registry tld) {
|
||||
return Streams.stream(eventTimes)
|
||||
.map(eventTime -> eventTime.plus(tld.getAutoRenewGracePeriodLength()))
|
||||
.filter(Range.closedOpen(cursorTime, executeTime))
|
||||
.collect(toImmutableSet());
|
||||
}
|
||||
|
||||
/**
|
||||
* Determines an {@link ImmutableSet} of {@link DateTime}s that have already been persisted for a
|
||||
* given recurring billing event.
|
||||
*/
|
||||
private static ImmutableSet<DateTime> getExistingBillingTimes(
|
||||
Iterable<BillingEvent.OneTime> oneTimesForDomain,
|
||||
final BillingEvent.Recurring recurringEvent) {
|
||||
return Streams.stream(oneTimesForDomain)
|
||||
.filter(
|
||||
billingEvent ->
|
||||
recurringEvent
|
||||
.createVKey()
|
||||
.equals(billingEvent.getCancellationMatchingBillingEvent()))
|
||||
.map(OneTime::getBillingTime)
|
||||
.collect(toImmutableSet());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -148,7 +148,7 @@ public class ExpandRecurringBillingEventsPipeline implements Serializable {
|
|||
endTime = DateTime.parse(options.getEndTime());
|
||||
checkArgument(
|
||||
!endTime.isAfter(clock.nowUtc()),
|
||||
String.format("End time %s must be on or before now.", endTime));
|
||||
String.format("End time %s must be at or before now.", endTime));
|
||||
checkArgument(
|
||||
startTime.isBefore(endTime),
|
||||
String.format("[%s, %s) is not a valid window of operation.", startTime, endTime));
|
||||
|
|
|
@ -89,7 +89,7 @@
|
|||
</cron>
|
||||
|
||||
<cron>
|
||||
<url><![CDATA[/_dr/task/expandRecurringBillingEvents]]></url>
|
||||
<url><![CDATA[/_dr/task/expandRecurringBillingEvents?advanceCursor]]></url>
|
||||
<description>
|
||||
This job runs an action that creates synthetic OneTime billing events from Recurring billing
|
||||
events. Events are created for all instances of Recurring billing events that should exist
|
||||
|
|
|
@ -132,7 +132,7 @@
|
|||
</cron>
|
||||
|
||||
<cron>
|
||||
<url><![CDATA[/_dr/task/expandRecurringBillingEvents]]></url>
|
||||
<url><![CDATA[/_dr/task/expandRecurringBillingEvents?advanceCursor]]></url>
|
||||
<description>
|
||||
This job runs an action that creates synthetic OneTime billing events from Recurring billing
|
||||
events. Events are created for all instances of Recurring billing events that should exist
|
||||
|
|
|
@ -107,7 +107,7 @@
|
|||
</cron>
|
||||
|
||||
<cron>
|
||||
<url><![CDATA[/_dr/task/expandRecurringBillingEvents]]></url>
|
||||
<url><![CDATA[/_dr/task/expandRecurringBillingEvents?advanceCursor]]></url>
|
||||
<description>
|
||||
This job runs an action that creates synthetic OneTime billing events from Recurring billing
|
||||
events. Events are created for all instances of Recurring billing events that should exist
|
||||
|
|
|
@ -15,10 +15,7 @@
|
|||
package google.registry.reporting.billing;
|
||||
|
||||
import static google.registry.beam.BeamUtils.createJobName;
|
||||
import static google.registry.model.common.Cursor.CursorType.RECURRING_BILLING;
|
||||
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
|
||||
import static google.registry.request.Action.Method.POST;
|
||||
import static google.registry.util.DateTimeUtils.START_OF_TIME;
|
||||
import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
|
||||
import static javax.servlet.http.HttpServletResponse.SC_OK;
|
||||
|
||||
|
@ -32,7 +29,6 @@ import com.google.common.flogger.FluentLogger;
|
|||
import com.google.common.net.MediaType;
|
||||
import google.registry.config.RegistryConfig.Config;
|
||||
import google.registry.config.RegistryEnvironment;
|
||||
import google.registry.model.common.Cursor;
|
||||
import google.registry.persistence.PersistenceModule;
|
||||
import google.registry.reporting.ReportingModule;
|
||||
import google.registry.request.Action;
|
||||
|
@ -44,7 +40,6 @@ import google.registry.util.Clock;
|
|||
import google.registry.util.CloudTasksUtils;
|
||||
import java.io.IOException;
|
||||
import javax.inject.Inject;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Duration;
|
||||
import org.joda.time.YearMonth;
|
||||
|
||||
|
@ -113,19 +108,6 @@ public class GenerateInvoicesAction implements Runnable {
|
|||
response.setContentType(MediaType.PLAIN_TEXT_UTF_8);
|
||||
logger.atInfo().log("Launching invoicing pipeline for %s.", yearMonth);
|
||||
try {
|
||||
DateTime currentCursorTime =
|
||||
tm().transact(
|
||||
() ->
|
||||
tm().loadByKeyIfPresent(Cursor.createGlobalVKey(RECURRING_BILLING))
|
||||
.orElse(Cursor.createGlobal(RECURRING_BILLING, START_OF_TIME))
|
||||
.getCursorTime());
|
||||
|
||||
if (!YearMonth.fromDateFields(currentCursorTime.toDate()).isAfter(yearMonth)) {
|
||||
throw new IllegalStateException(
|
||||
"Latest billing events expansion cycle hasn't finished yet, terminating invoicing"
|
||||
+ " pipeline");
|
||||
}
|
||||
|
||||
LaunchFlexTemplateParameter parameter =
|
||||
new LaunchFlexTemplateParameter()
|
||||
.setJobName(createJobName("invoicing", clock))
|
||||
|
|
File diff suppressed because it is too large
Load diff
|
@ -126,7 +126,7 @@ public class ExpandRecurringBillingEventsPipelineTest {
|
|||
assertThrows(IllegalArgumentException.class, this::runPipeline);
|
||||
assertThat(thrown)
|
||||
.hasMessageThat()
|
||||
.contains("End time 2021-02-02T00:00:05.001Z must be on or before now");
|
||||
.contains("End time 2021-02-02T00:00:05.001Z must be at or before now");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -15,8 +15,6 @@
|
|||
package google.registry.reporting.billing;
|
||||
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
import static google.registry.model.common.Cursor.CursorType.RECURRING_BILLING;
|
||||
import static google.registry.testing.DatabaseHelper.persistResource;
|
||||
import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
|
||||
import static javax.servlet.http.HttpServletResponse.SC_OK;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
@ -26,7 +24,6 @@ import static org.mockito.Mockito.when;
|
|||
import com.google.cloud.tasks.v2.HttpMethod;
|
||||
import com.google.common.net.MediaType;
|
||||
import google.registry.beam.BeamActionTestBase;
|
||||
import google.registry.model.common.Cursor;
|
||||
import google.registry.persistence.transaction.JpaTestExtensions;
|
||||
import google.registry.persistence.transaction.JpaTestExtensions.JpaIntegrationTestExtension;
|
||||
import google.registry.reporting.ReportingModule;
|
||||
|
@ -35,10 +32,8 @@ import google.registry.testing.CloudTasksHelper.TaskMatcher;
|
|||
import google.registry.testing.FakeClock;
|
||||
import google.registry.util.CloudTasksUtils;
|
||||
import java.io.IOException;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Duration;
|
||||
import org.joda.time.YearMonth;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
|
||||
|
@ -55,13 +50,6 @@ class GenerateInvoicesActionTest extends BeamActionTestBase {
|
|||
private CloudTasksUtils cloudTasksUtils = cloudTasksHelper.getTestCloudTasksUtils();
|
||||
private GenerateInvoicesAction action;
|
||||
|
||||
@BeforeEach
|
||||
@Override
|
||||
protected void beforeEach() throws Exception {
|
||||
super.beforeEach();
|
||||
persistResource(Cursor.createGlobal(RECURRING_BILLING, DateTime.parse("2017-11-30TZ")));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testLaunchTemplateJob_withPublish() throws Exception {
|
||||
action =
|
||||
|
@ -142,55 +130,4 @@ class GenerateInvoicesActionTest extends BeamActionTestBase {
|
|||
verify(emailUtils).sendAlertEmail("Pipeline Launch failed due to Pipeline error");
|
||||
cloudTasksHelper.assertNoTasksEnqueued("beam-reporting");
|
||||
}
|
||||
|
||||
@Test
|
||||
void testFailsToGenerateInvoicesNotExpandedBillingEvents() throws Exception {
|
||||
persistResource(Cursor.createGlobal(RECURRING_BILLING, DateTime.parse("2017-10-30TZ")));
|
||||
action =
|
||||
new GenerateInvoicesAction(
|
||||
"test-project",
|
||||
"test-region",
|
||||
"staging_bucket",
|
||||
"billing_bucket",
|
||||
"REG-INV",
|
||||
false,
|
||||
new YearMonth(2017, 10),
|
||||
emailUtils,
|
||||
cloudTasksUtils,
|
||||
clock,
|
||||
response,
|
||||
dataflow);
|
||||
action.run();
|
||||
assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8);
|
||||
assertThat(response.getStatus()).isEqualTo(SC_INTERNAL_SERVER_ERROR);
|
||||
assertThat(response.getPayload())
|
||||
.isEqualTo(
|
||||
"Pipeline launch failed: Latest billing events expansion cycle hasn't finished yet,"
|
||||
+ " terminating invoicing pipeline");
|
||||
cloudTasksHelper.assertNoTasksEnqueued("beam-reporting");
|
||||
}
|
||||
|
||||
@Test
|
||||
void testSucceedsToGenerateInvoicesFirstDayOfTheYear() throws Exception {
|
||||
persistResource(Cursor.createGlobal(RECURRING_BILLING, DateTime.parse("2017-01-01T13:15:00Z")));
|
||||
action =
|
||||
new GenerateInvoicesAction(
|
||||
"test-project",
|
||||
"test-region",
|
||||
"staging_bucket",
|
||||
"billing_bucket",
|
||||
"REG-INV",
|
||||
false,
|
||||
new YearMonth(2016, 12),
|
||||
emailUtils,
|
||||
cloudTasksUtils,
|
||||
clock,
|
||||
response,
|
||||
dataflow);
|
||||
action.run();
|
||||
assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8);
|
||||
assertThat(response.getStatus()).isEqualTo(SC_OK);
|
||||
assertThat(response.getPayload()).isEqualTo("Launched invoicing pipeline: jobid");
|
||||
cloudTasksHelper.assertNoTasksEnqueued("beam-reporting");
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue