Change default beam job region (#1937)

For reasons that I cannot explain, the same expand recurring billing
event pipeline would fail in us-east1 but succeed in us-central1.

See:

https://pantheon.corp.google.com/dataflow/jobs/us-central1/2023-02-09_14_52_24-162498476138221714;graphView=0?project=domain-registry

https://pantheon.corp.google.com/dataflow/jobs/us-east1/2023-02-09_14_26_07-4564782062878841960;graphView=1?project=domain-registry

Also improved how the accuracy of the metrics:

It is observed that both counters are consistently higher for the same
start and end times when running in dry run mode. There is no way to
test for consistency when not running in dry run, for obviously reasons.

I can make the recurrings in scope counter consistent by not updating it
in a side-effect-causing transaction, but there is no way around the
other counter. It can only be trusted when running in dry run mode,
unfortunately.
This commit is contained in:
Lai Jiang 2023-02-13 15:57:32 -05:00 committed by GitHub
parent b709a0ce48
commit bac9a25800
3 changed files with 12 additions and 15 deletions

View file

@ -138,9 +138,15 @@ public class ExpandRecurringBillingEventsPipeline implements Serializable {
private final boolean isDryRun;
private final boolean advanceCursor;
private final Counter recurringsInScopeCounter =
Metrics.counter("ExpandBilling", "RecurringsInScope");
private final Counter expandedOneTimeCounter =
Metrics.counter("ExpandBilling", "ExpandedOneTime");
Metrics.counter("ExpandBilling", "Recurrings in scope for expansion");
// Note that this counter is only accurate when running in dry run mode. Because SQL persistence
// is a side effect and not idempotent, a transaction to save OneTimes could be successful but the
// transform that contains it could be still be retried, rolling back the counter increment. The
// same transform, when retried, would skip the already expanded OneTime, causing this counter to
// be lower than it should be when not in dry run mode.
// See: https://beam.apache.org/documentation/programming-guide/#user-code-idempotence
private final Counter oneTimesToExpandCounter =
Metrics.counter("ExpandBilling", "OneTimes that would be expanded");
ExpandRecurringBillingEventsPipeline(
ExpandRecurringBillingEventsPipelineOptions options, Clock clock) {
@ -195,6 +201,7 @@ public class ExpandRecurringBillingEventsPipeline implements Serializable {
endTime.minusYears(1)),
true,
(BigInteger id) -> {
recurringsInScopeCounter.inc();
// Note that because all elements are mapped to the same dummy key, the next
// batching transform will effectively be serial. This however does not matter for
// our use case because the elements were obtained from a SQL read query, which
@ -271,13 +278,9 @@ public class ExpandRecurringBillingEventsPipeline implements Serializable {
} catch (IllegalArgumentException e) {
return;
}
recurringsInScopeCounter.inc();
Domain domain = tm().loadByKey(Domain.createVKey(recurring.getDomainRepoId()));
Registry tld = Registry.get(domain.getTld());
// Find the times for which the OneTime billing event are already created, making this expansion
// idempotent. There is no need to match to the domain repo ID as the cancellation matching
// billing event itself can only be for a single domain.
@ -301,7 +304,7 @@ public class ExpandRecurringBillingEventsPipeline implements Serializable {
// Create new OneTime and DomainHistory for EventTimes that needs to be expanded.
for (DateTime eventTime : eventTimesToExpand) {
recurrenceLastExpansionTime = latestOf(recurrenceLastExpansionTime, eventTime);
expandedOneTimeCounter.inc();
oneTimesToExpandCounter.inc();
DateTime billingTime = eventTime.plus(tld.getAutoRenewGracePeriodLength());
// Note that the DomainHistory is created as of transaction time, as opposed to event time.
// This might be counterintuitive because other DomainHistories are created at the time

View file

@ -427,7 +427,7 @@ misc:
beam:
# The default region to run Apache Beam (Cloud Dataflow) jobs in.
defaultJobRegion: us-east1
defaultJobRegion: us-central1
# The GCE machine type to use when a job is CPU-intensive (e. g. RDE). Be sure
# to check the VM CPU quota for the job region. In a massively parallel
# pipeline this quota can be easily reached and needs to be raised, otherwise

View file

@ -23,12 +23,6 @@
"helpText": "The exclusive upper bound of the operation window, in ISO 8601 format.",
"is_optional": false
},
{
"name": "shard",
"label": "The exclusive upper bound of the operation window.",
"helpText": "The exclusive upper bound of the operation window, in ISO 8601 format.",
"is_optional": true
},
{
"name": "isDryRun",
"label": "Whether this job is a dry run.",