diff --git a/core/src/main/java/google/registry/beam/billing/ExpandRecurringBillingEventsPipeline.java b/core/src/main/java/google/registry/beam/billing/ExpandRecurringBillingEventsPipeline.java index 327ab7f33..7066583d2 100644 --- a/core/src/main/java/google/registry/beam/billing/ExpandRecurringBillingEventsPipeline.java +++ b/core/src/main/java/google/registry/beam/billing/ExpandRecurringBillingEventsPipeline.java @@ -182,7 +182,7 @@ public class ExpandRecurringBillingEventsPipeline implements Serializable { + "AND event_Time < :endTime " // Recurrence should not close before start time. + "AND :startTime < recurrence_end_time " - // Last expansion should happen at least one year before start time. + // Last expansion should happen at least one year before end time. + "AND recurrence_last_expansion < :oneYearAgo " // The recurrence should not close before next expansion time. + "AND recurrence_last_expansion + INTERVAL '1 YEAR' < recurrence_end_time", @@ -239,20 +239,44 @@ public class ExpandRecurringBillingEventsPipeline implements Serializable { private void expandOneRecurring(Long recurringId, ImmutableSet.Builder results) { Recurring recurring = tm().loadByKey(Recurring.createVKey(recurringId)); + + // Determine the complete set of EventTimes this recurring event should expand to within + // [max(recurrenceLastExpansion + 1 yr, startTime), min(recurrenceEndTime, endTime)). + // + // This range should always be legal for recurrings that are returned from the query. However, + // it is possible that the recurring has changed between when the read transformation occurred + // and now. This could be caused by some out-of-process mutations (such as a domain deletion + // closing out a previously open-ended recurrence), or more subtly, Beam could execute the same + // work multiple times due to transient communication issues between workers and the scheduler. + // Such opportunistic retries are OK for pure functional transformations, but can cause + // unexpected behavior when side effects are executed more than once. For example, the + // recurrence_last_expansion field could be updated by a worker after a success expansion, which + // failed to report the status to the scheduler in time, which in turn scheduled another worker + // to work on the same batch. The second worker would see a new recurrence_last_expansion that + // causes the range to be illegal. + // + // The best way to handle any unexpected behavior is to simply drop the recurring from + // expansion, if its new state still calls for an expansion, it would be picked up the next time + // the pipeline runs. + ImmutableSet eventTimes; + try { + eventTimes = + ImmutableSet.copyOf( + recurring + .getRecurrenceTimeOfYear() + .getInstancesInRange( + Range.closedOpen( + latestOf(recurring.getRecurrenceLastExpansion().plusYears(1), startTime), + earliestOf(recurring.getRecurrenceEndTime(), endTime)))); + } catch (IllegalArgumentException e) { + return; + } + recurringsInScopeCounter.inc(); Domain domain = tm().loadByKey(Domain.createVKey(recurring.getDomainRepoId())); Registry tld = Registry.get(domain.getTld()); - // Determine the complete set of EventTimes this recurring event should expand to within - // [max(recurrenceLastExpansion + 1 yr, startTime), min(recurrenceEndTime, endTime)). - ImmutableSet eventTimes = - ImmutableSet.copyOf( - recurring - .getRecurrenceTimeOfYear() - .getInstancesInRange( - Range.closedOpen( - latestOf(recurring.getRecurrenceLastExpansion().plusYears(1), startTime), - earliestOf(recurring.getRecurrenceEndTime(), endTime)))); + // Find the times for which the OneTime billing event are already created, making this expansion // idempotent. There is no need to match to the domain repo ID as the cancellation matching