Fix expand recurring billing event pipeline (#1928)

This commit is contained in:
Lai Jiang 2023-02-06 11:33:57 -05:00 committed by GitHub
parent c6e77276b6
commit 706cf69b7f

View file

@ -182,7 +182,7 @@ public class ExpandRecurringBillingEventsPipeline implements Serializable {
+ "AND event_Time < :endTime "
// Recurrence should not close before start time.
+ "AND :startTime < recurrence_end_time "
// Last expansion should happen at least one year before start time.
// Last expansion should happen at least one year before end time.
+ "AND recurrence_last_expansion < :oneYearAgo "
// The recurrence should not close before next expansion time.
+ "AND recurrence_last_expansion + INTERVAL '1 YEAR' < recurrence_end_time",
@ -239,20 +239,44 @@ public class ExpandRecurringBillingEventsPipeline implements Serializable {
private void expandOneRecurring(Long recurringId, ImmutableSet.Builder<ImmutableObject> results) {
Recurring recurring = tm().loadByKey(Recurring.createVKey(recurringId));
// Determine the complete set of EventTimes this recurring event should expand to within
// [max(recurrenceLastExpansion + 1 yr, startTime), min(recurrenceEndTime, endTime)).
//
// This range should always be legal for recurrings that are returned from the query. However,
// it is possible that the recurring has changed between when the read transformation occurred
// and now. This could be caused by some out-of-process mutations (such as a domain deletion
// closing out a previously open-ended recurrence), or more subtly, Beam could execute the same
// work multiple times due to transient communication issues between workers and the scheduler.
// Such opportunistic retries are OK for pure functional transformations, but can cause
// unexpected behavior when side effects are executed more than once. For example, the
// recurrence_last_expansion field could be updated by a worker after a success expansion, which
// failed to report the status to the scheduler in time, which in turn scheduled another worker
// to work on the same batch. The second worker would see a new recurrence_last_expansion that
// causes the range to be illegal.
//
// The best way to handle any unexpected behavior is to simply drop the recurring from
// expansion, if its new state still calls for an expansion, it would be picked up the next time
// the pipeline runs.
ImmutableSet<DateTime> eventTimes;
try {
eventTimes =
ImmutableSet.copyOf(
recurring
.getRecurrenceTimeOfYear()
.getInstancesInRange(
Range.closedOpen(
latestOf(recurring.getRecurrenceLastExpansion().plusYears(1), startTime),
earliestOf(recurring.getRecurrenceEndTime(), endTime))));
} catch (IllegalArgumentException e) {
return;
}
recurringsInScopeCounter.inc();
Domain domain = tm().loadByKey(Domain.createVKey(recurring.getDomainRepoId()));
Registry tld = Registry.get(domain.getTld());
// Determine the complete set of EventTimes this recurring event should expand to within
// [max(recurrenceLastExpansion + 1 yr, startTime), min(recurrenceEndTime, endTime)).
ImmutableSet<DateTime> eventTimes =
ImmutableSet.copyOf(
recurring
.getRecurrenceTimeOfYear()
.getInstancesInRange(
Range.closedOpen(
latestOf(recurring.getRecurrenceLastExpansion().plusYears(1), startTime),
earliestOf(recurring.getRecurrenceEndTime(), endTime))));
// Find the times for which the OneTime billing event are already created, making this expansion
// idempotent. There is no need to match to the domain repo ID as the cancellation matching