diff --git a/core/src/main/java/google/registry/beam/billing/ExpandRecurringBillingEventsPipeline.java b/core/src/main/java/google/registry/beam/billing/ExpandRecurringBillingEventsPipeline.java index 7066583d2..8a4fabfea 100644 --- a/core/src/main/java/google/registry/beam/billing/ExpandRecurringBillingEventsPipeline.java +++ b/core/src/main/java/google/registry/beam/billing/ExpandRecurringBillingEventsPipeline.java @@ -138,9 +138,15 @@ public class ExpandRecurringBillingEventsPipeline implements Serializable { private final boolean isDryRun; private final boolean advanceCursor; private final Counter recurringsInScopeCounter = - Metrics.counter("ExpandBilling", "RecurringsInScope"); - private final Counter expandedOneTimeCounter = - Metrics.counter("ExpandBilling", "ExpandedOneTime"); + Metrics.counter("ExpandBilling", "Recurrings in scope for expansion"); + // Note that this counter is only accurate when running in dry run mode. Because SQL persistence + // is a side effect and not idempotent, a transaction to save OneTimes could be successful but the + // transform that contains it could be still be retried, rolling back the counter increment. The + // same transform, when retried, would skip the already expanded OneTime, causing this counter to + // be lower than it should be when not in dry run mode. + // See: https://beam.apache.org/documentation/programming-guide/#user-code-idempotence + private final Counter oneTimesToExpandCounter = + Metrics.counter("ExpandBilling", "OneTimes that would be expanded"); ExpandRecurringBillingEventsPipeline( ExpandRecurringBillingEventsPipelineOptions options, Clock clock) { @@ -195,6 +201,7 @@ public class ExpandRecurringBillingEventsPipeline implements Serializable { endTime.minusYears(1)), true, (BigInteger id) -> { + recurringsInScopeCounter.inc(); // Note that because all elements are mapped to the same dummy key, the next // batching transform will effectively be serial. This however does not matter for // our use case because the elements were obtained from a SQL read query, which @@ -271,13 +278,9 @@ public class ExpandRecurringBillingEventsPipeline implements Serializable { } catch (IllegalArgumentException e) { return; } - - recurringsInScopeCounter.inc(); Domain domain = tm().loadByKey(Domain.createVKey(recurring.getDomainRepoId())); Registry tld = Registry.get(domain.getTld()); - - // Find the times for which the OneTime billing event are already created, making this expansion // idempotent. There is no need to match to the domain repo ID as the cancellation matching // billing event itself can only be for a single domain. @@ -301,7 +304,7 @@ public class ExpandRecurringBillingEventsPipeline implements Serializable { // Create new OneTime and DomainHistory for EventTimes that needs to be expanded. for (DateTime eventTime : eventTimesToExpand) { recurrenceLastExpansionTime = latestOf(recurrenceLastExpansionTime, eventTime); - expandedOneTimeCounter.inc(); + oneTimesToExpandCounter.inc(); DateTime billingTime = eventTime.plus(tld.getAutoRenewGracePeriodLength()); // Note that the DomainHistory is created as of transaction time, as opposed to event time. // This might be counterintuitive because other DomainHistories are created at the time diff --git a/core/src/main/java/google/registry/config/files/default-config.yaml b/core/src/main/java/google/registry/config/files/default-config.yaml index ae41e6e5c..cb6951fa1 100644 --- a/core/src/main/java/google/registry/config/files/default-config.yaml +++ b/core/src/main/java/google/registry/config/files/default-config.yaml @@ -427,7 +427,7 @@ misc: beam: # The default region to run Apache Beam (Cloud Dataflow) jobs in. - defaultJobRegion: us-east1 + defaultJobRegion: us-central1 # The GCE machine type to use when a job is CPU-intensive (e. g. RDE). Be sure # to check the VM CPU quota for the job region. In a massively parallel # pipeline this quota can be easily reached and needs to be raised, otherwise diff --git a/core/src/main/resources/google/registry/beam/expand_recurring_billing_events_pipeline_metadata.json b/core/src/main/resources/google/registry/beam/expand_recurring_billing_events_pipeline_metadata.json index 7519abd3b..d34e4acb0 100644 --- a/core/src/main/resources/google/registry/beam/expand_recurring_billing_events_pipeline_metadata.json +++ b/core/src/main/resources/google/registry/beam/expand_recurring_billing_events_pipeline_metadata.json @@ -23,12 +23,6 @@ "helpText": "The exclusive upper bound of the operation window, in ISO 8601 format.", "is_optional": false }, - { - "name": "shard", - "label": "The exclusive upper bound of the operation window.", - "helpText": "The exclusive upper bound of the operation window, in ISO 8601 format.", - "is_optional": true - }, { "name": "isDryRun", "label": "Whether this job is a dry run.",