mirror of
https://github.com/google/nomulus.git
synced 2025-04-29 19:47:51 +02:00
Add a beam pipeline to expand recurring billing event (#1881)
This will replace the ExpandRecurringBillingEventsAction, which has a couple of issues: 1) The action starts with too many Recurrings that are later filtered out because their expanded OneTimes are not actually in scope. This is due to the Recurrings not recording its latest expanded event time, and therefore many Recurrings that are not yet due for renewal get included in the initial query. 2) The action works in sequence, which exacerbated the issue in 1) and makes it very slow to run if the window of operation is wider than one day, which in turn makes it impossible to run any catch-up expansions with any significant gap to fill. 3) The action only expands the recurrence when the billing times because due, but most of its logic works on event time, which is 45 days before billing time, making the code hard to reason about and error-prone. This has led to b/258822640 where a premature optimization intended to fix 1) caused some autorenwals to not be expanded correctly when subsequent manual renews within the autorenew grace period closed the original recurrece. As a result, the new pipeline addresses the above issues in the following way: 1) Update the recurrenceLastExpansion field on the Recurring when a new expansion occurs, and narrow down the Recurrings in scope for expansion by only looking for the ones that have not been expanded for more than a year. 2) Make it a Beam pipeline so expansions can happen in parallel. The Recurrings are grouped into batches in order to not overwhelm the database with writes for each expansion. 3) Create new expansions when the event time, as opposed to billing time, is within the operation window. This streamlines the logic and makes it clearer and easier to reason about. This also aligns with how other (cancelllable) operations for which there are accompanying grace periods are handled, when the corresponding data is always speculatively created at event time. Lastly, doing this negates the need to check if the expansion has finished running before generating the monthly invoices, because the billing events are now created not just-in-time, but 45 days in advance. Note that this PR only adds the pipeline. It does not switch the default behavior to using the pipeline, which is still done by ExpandRecurringBillingEventsAction. We will first use this pipeline to generate missing billing events and domain histories caused by b/258822640. This also allows us to test it in production, as it backfills data that will not affect ongoing invoice generation. If anything goes wrong, we can always delete the generated billing events and domain histories, based on the unique "reason" in them. This pipeline can only run after we switch to use SQL sequence based ID allocation, introduced in #1831.
This commit is contained in:
parent
9789cf3b00
commit
2294c77306
29 changed files with 1194 additions and 27 deletions
|
@ -752,9 +752,14 @@ if (environment == 'alpha') {
|
|||
],
|
||||
invoicing :
|
||||
[
|
||||
mainClass: 'google.registry.beam.invoicing.InvoicingPipeline',
|
||||
mainClass: 'google.registry.beam.billing.InvoicingPipeline',
|
||||
metaData : 'google/registry/beam/invoicing_pipeline_metadata.json'
|
||||
],
|
||||
expandBilling :
|
||||
[
|
||||
mainClass: 'google.registry.beam.billing.ExpandRecurringBillingEventsPipeline',
|
||||
metaData : 'google/registry/beam/expand_recurring_billing_events_pipeline_metadata.json'
|
||||
],
|
||||
rde :
|
||||
[
|
||||
mainClass: 'google.registry.beam.rde.RdePipeline',
|
||||
|
|
|
@ -12,7 +12,7 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package google.registry.beam.invoicing;
|
||||
package google.registry.beam.billing;
|
||||
|
||||
import com.google.auto.value.AutoValue;
|
||||
import com.google.common.base.Joiner;
|
|
@ -0,0 +1,460 @@
|
|||
// Copyright 2022 The Nomulus Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package google.registry.beam.billing;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
import static com.google.common.collect.Sets.difference;
|
||||
import static google.registry.model.common.Cursor.CursorType.RECURRING_BILLING;
|
||||
import static google.registry.model.domain.Period.Unit.YEARS;
|
||||
import static google.registry.model.reporting.HistoryEntry.Type.DOMAIN_AUTORENEW;
|
||||
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
|
||||
import static google.registry.util.CollectionUtils.union;
|
||||
import static google.registry.util.DateTimeUtils.START_OF_TIME;
|
||||
import static google.registry.util.DateTimeUtils.earliestOf;
|
||||
import static google.registry.util.DateTimeUtils.latestOf;
|
||||
import static org.apache.beam.sdk.values.TypeDescriptors.voids;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Range;
|
||||
import dagger.Component;
|
||||
import google.registry.beam.common.RegistryJpaIO;
|
||||
import google.registry.config.RegistryConfig.Config;
|
||||
import google.registry.config.RegistryConfig.ConfigModule;
|
||||
import google.registry.flows.custom.CustomLogicFactoryModule;
|
||||
import google.registry.flows.custom.CustomLogicModule;
|
||||
import google.registry.flows.domain.DomainPricingLogic;
|
||||
import google.registry.model.ImmutableObject;
|
||||
import google.registry.model.billing.BillingEvent.Cancellation;
|
||||
import google.registry.model.billing.BillingEvent.Flag;
|
||||
import google.registry.model.billing.BillingEvent.OneTime;
|
||||
import google.registry.model.billing.BillingEvent.Recurring;
|
||||
import google.registry.model.common.Cursor;
|
||||
import google.registry.model.domain.Domain;
|
||||
import google.registry.model.domain.DomainHistory;
|
||||
import google.registry.model.domain.Period;
|
||||
import google.registry.model.reporting.DomainTransactionRecord;
|
||||
import google.registry.model.reporting.DomainTransactionRecord.TransactionReportField;
|
||||
import google.registry.model.tld.Registry;
|
||||
import google.registry.persistence.PersistenceModule.TransactionIsolationLevel;
|
||||
import google.registry.util.Clock;
|
||||
import google.registry.util.SystemClock;
|
||||
import java.io.Serializable;
|
||||
import java.math.BigInteger;
|
||||
import java.util.Set;
|
||||
import javax.inject.Singleton;
|
||||
import org.apache.beam.sdk.Pipeline;
|
||||
import org.apache.beam.sdk.PipelineResult;
|
||||
import org.apache.beam.sdk.coders.KvCoder;
|
||||
import org.apache.beam.sdk.coders.VarIntCoder;
|
||||
import org.apache.beam.sdk.coders.VarLongCoder;
|
||||
import org.apache.beam.sdk.metrics.Counter;
|
||||
import org.apache.beam.sdk.metrics.Metrics;
|
||||
import org.apache.beam.sdk.options.PipelineOptionsFactory;
|
||||
import org.apache.beam.sdk.transforms.Create;
|
||||
import org.apache.beam.sdk.transforms.DoFn;
|
||||
import org.apache.beam.sdk.transforms.GroupIntoBatches;
|
||||
import org.apache.beam.sdk.transforms.MapElements;
|
||||
import org.apache.beam.sdk.transforms.ParDo;
|
||||
import org.apache.beam.sdk.transforms.Wait;
|
||||
import org.apache.beam.sdk.values.KV;
|
||||
import org.apache.beam.sdk.values.PCollection;
|
||||
import org.apache.beam.sdk.values.PDone;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
/**
|
||||
* Definition of a Dataflow Flex pipeline template, which expands {@link Recurring} to {@link
|
||||
* OneTime} when an autorenew occurs within the given time frame.
|
||||
*
|
||||
* <p>This pipeline works in three stages:
|
||||
*
|
||||
* <ul>
|
||||
* <li>Gather the {@link Recurring}s that are in scope for expansion. The exact condition of
|
||||
* {@link Recurring}s to include can be found in {@link #getRecurringsInScope(Pipeline)}.
|
||||
* <li>Expand the {@link Recurring}s to {@link OneTime} (and corresponding {@link DomainHistory})
|
||||
* that fall within the [{@link #startTime}, {@link #endTime}) window, excluding those that
|
||||
* are already present (to make this pipeline idempotent when running with the same parameters
|
||||
* multiple times, either in parallel or in sequence). The {@link Recurring} is also updated
|
||||
* with the information on when it was last expanded, so it would not be in scope for
|
||||
* expansion until at least a year later.
|
||||
* <li>If the cursor for billing events should be advanced, advance it to {@link #endTime} after
|
||||
* all of the expansions in the previous step is done, only when it is currently at {@link
|
||||
* #startTime}.
|
||||
* </ul>
|
||||
*
|
||||
* <p>Note that the creation of new {@link OneTime} and {@link DomainHistory} is done speculatively
|
||||
* as soon as its event time is in scope for expansion (i.e. within the window of operation). If a
|
||||
* domain is subsequently cancelled during the autorenew grace period, a {@link Cancellation} would
|
||||
* have been created to cancel the {@link OneTime} out. Similarly, a {@link DomainHistory} for the
|
||||
* delete will be created which negates the effect of the speculatively created {@link
|
||||
* DomainHistory}, specifically for the transaction records. Both the {@link OneTime} and {@link
|
||||
* DomainHistory} will only be used (and cancelled out) when the billing time becomes effective,
|
||||
* which is after the grace period, when the cancellations would have been written, if need be. This
|
||||
* is no different from what we do with manual renewals or normal creates, where entities are always
|
||||
* created for the action regardless of whether their effects will be negated later due to
|
||||
* subsequent actions within respective grace periods.
|
||||
*
|
||||
* <p>To stage this template locally, run {@code ./nom_build :core:sBP --environment=alpha \
|
||||
* --pipeline=expandBilling}.
|
||||
*
|
||||
* <p>Then, you can run the staged template via the API client library, gCloud or a raw REST call.
|
||||
*
|
||||
* @see Cancellation#forGracePeriod
|
||||
* @see google.registry.flows.domain.DomainFlowUtils#createCancelingRecords
|
||||
* @see <a href="https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates">Using
|
||||
* Flex Templates</a>
|
||||
*/
|
||||
public class ExpandRecurringBillingEventsPipeline implements Serializable {
|
||||
|
||||
private static final long serialVersionUID = -5827984301386630194L;
|
||||
|
||||
private static final DomainPricingLogic domainPricingLogic;
|
||||
|
||||
private static final int batchSize;
|
||||
|
||||
static {
|
||||
PipelineComponent pipelineComponent =
|
||||
DaggerExpandRecurringBillingEventsPipeline_PipelineComponent.create();
|
||||
domainPricingLogic = pipelineComponent.domainPricingLogic();
|
||||
batchSize = pipelineComponent.batchSize();
|
||||
}
|
||||
|
||||
// Inclusive lower bound of the expansion window.
|
||||
private final DateTime startTime;
|
||||
// Exclusive lower bound of the expansion window.
|
||||
private final DateTime endTime;
|
||||
private final boolean isDryRun;
|
||||
private final boolean advanceCursor;
|
||||
private final Counter recurringsInScopeCounter =
|
||||
Metrics.counter("ExpandBilling", "RecurringsInScope");
|
||||
private final Counter expandedOneTimeCounter =
|
||||
Metrics.counter("ExpandBilling", "ExpandedOneTime");
|
||||
|
||||
ExpandRecurringBillingEventsPipeline(
|
||||
ExpandRecurringBillingEventsPipelineOptions options, Clock clock) {
|
||||
startTime = DateTime.parse(options.getStartTime());
|
||||
endTime = DateTime.parse(options.getEndTime());
|
||||
checkArgument(
|
||||
!endTime.isAfter(clock.nowUtc()),
|
||||
String.format("End time %s must be on or before now.", endTime));
|
||||
checkArgument(
|
||||
startTime.isBefore(endTime),
|
||||
String.format("[%s, %s) is not a valid window of operation.", startTime, endTime));
|
||||
isDryRun = options.getIsDryRun();
|
||||
advanceCursor = options.getAdvanceCursor();
|
||||
}
|
||||
|
||||
private PipelineResult run(Pipeline pipeline) {
|
||||
setupPipeline(pipeline);
|
||||
return pipeline.run();
|
||||
}
|
||||
|
||||
void setupPipeline(Pipeline pipeline) {
|
||||
PCollection<KV<Integer, Long>> recurringIds = getRecurringsInScope(pipeline);
|
||||
PCollection<Void> expanded = expandRecurrings(recurringIds);
|
||||
if (!isDryRun && advanceCursor) {
|
||||
advanceCursor(expanded);
|
||||
}
|
||||
}
|
||||
|
||||
PCollection<KV<Integer, Long>> getRecurringsInScope(Pipeline pipeline) {
|
||||
return pipeline.apply(
|
||||
"Read all Recurrings in scope",
|
||||
// Use native query because JPQL does not support timestamp arithmetics.
|
||||
RegistryJpaIO.read(
|
||||
"SELECT billing_recurrence_id "
|
||||
+ "FROM \"BillingRecurrence\" "
|
||||
// Recurrence should not close before the first event time.
|
||||
+ "WHERE event_time < recurrence_end_time "
|
||||
// First event time should be before end time.
|
||||
+ "AND event_Time < :endTime "
|
||||
// Recurrence should not close before start time.
|
||||
+ "AND :startTime < recurrence_end_time "
|
||||
// Last expansion should happen at least one year before start time.
|
||||
+ "AND recurrence_last_expansion < :oneYearAgo "
|
||||
// The recurrence should not close before next expansion time.
|
||||
+ "AND recurrence_last_expansion + INTERVAL '1 YEAR' < recurrence_end_time",
|
||||
ImmutableMap.of(
|
||||
"endTime",
|
||||
endTime,
|
||||
"startTime",
|
||||
startTime,
|
||||
"oneYearAgo",
|
||||
endTime.minusYears(1)),
|
||||
true,
|
||||
(BigInteger id) -> {
|
||||
// Note that because all elements are mapped to the same dummy key, the next
|
||||
// batching transform will effectively be serial. This however does not matter for
|
||||
// our use case because the elements were obtained from a SQL read query, which
|
||||
// are returned sequentially already. Therefore, having a sequential step to group
|
||||
// them does not reduce overall parallelism of the pipeline, and the batches can
|
||||
// then be distributed to all available workers for further processing, where the
|
||||
// main benefit of parallelism shows. In benchmarking, turning the distribution
|
||||
// of elements in this step resulted in marginal improvement in overall
|
||||
// performance at best without clear indication on why or to which degree. If the
|
||||
// runtime becomes a concern later on, we could consider fine-tuning the sharding
|
||||
// of output elements in this step.
|
||||
//
|
||||
// See: https://stackoverflow.com/a/44956702/791306
|
||||
return KV.of(0, id.longValue());
|
||||
})
|
||||
.withCoder(KvCoder.of(VarIntCoder.of(), VarLongCoder.of())));
|
||||
}
|
||||
|
||||
private PCollection<Void> expandRecurrings(PCollection<KV<Integer, Long>> recurringIds) {
|
||||
return recurringIds
|
||||
.apply(
|
||||
"Group into batches",
|
||||
GroupIntoBatches.<Integer, Long>ofSize(batchSize).withShardedKey())
|
||||
.apply(
|
||||
"Expand and save Recurrings into OneTimes and corresponding DomainHistories",
|
||||
MapElements.into(voids())
|
||||
.via(
|
||||
element -> {
|
||||
Iterable<Long> ids = element.getValue();
|
||||
tm().transact(
|
||||
() -> {
|
||||
ImmutableSet.Builder<ImmutableObject> results =
|
||||
new ImmutableSet.Builder<>();
|
||||
ids.forEach(id -> expandOneRecurring(id, results));
|
||||
if (!isDryRun) {
|
||||
tm().putAll(results.build());
|
||||
}
|
||||
});
|
||||
return null;
|
||||
}));
|
||||
}
|
||||
|
||||
private void expandOneRecurring(Long recurringId, ImmutableSet.Builder<ImmutableObject> results) {
|
||||
Recurring recurring = tm().loadByKey(Recurring.createVKey(recurringId));
|
||||
recurringsInScopeCounter.inc();
|
||||
Domain domain = tm().loadByKey(Domain.createVKey(recurring.getDomainRepoId()));
|
||||
Registry tld = Registry.get(domain.getTld());
|
||||
|
||||
// Determine the complete set of EventTimes this recurring event should expand to within
|
||||
// [max(recurrenceLastExpansion + 1 yr, startTime), min(recurrenceEndTime, endTime)).
|
||||
ImmutableSet<DateTime> eventTimes =
|
||||
ImmutableSet.copyOf(
|
||||
recurring
|
||||
.getRecurrenceTimeOfYear()
|
||||
.getInstancesInRange(
|
||||
Range.closedOpen(
|
||||
latestOf(recurring.getRecurrenceLastExpansion().plusYears(1), startTime),
|
||||
earliestOf(recurring.getRecurrenceEndTime(), endTime))));
|
||||
|
||||
// Find the times for which the OneTime billing event are already created, making this expansion
|
||||
// idempotent. There is no need to match to the domain repo ID as the cancellation matching
|
||||
// billing event itself can only be for a single domain.
|
||||
ImmutableSet<DateTime> existingEventTimes =
|
||||
ImmutableSet.copyOf(
|
||||
tm().query(
|
||||
"SELECT eventTime FROM BillingEvent WHERE cancellationMatchingBillingEvent ="
|
||||
+ " :key",
|
||||
DateTime.class)
|
||||
.setParameter("key", recurring.createVKey())
|
||||
.getResultList());
|
||||
|
||||
Set<DateTime> eventTimesToExpand = difference(eventTimes, existingEventTimes);
|
||||
|
||||
if (eventTimesToExpand.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
DateTime recurrenceLastExpansionTime = recurring.getRecurrenceLastExpansion();
|
||||
|
||||
// Create new OneTime and DomainHistory for EventTimes that needs to be expanded.
|
||||
for (DateTime eventTime : eventTimesToExpand) {
|
||||
recurrenceLastExpansionTime = latestOf(recurrenceLastExpansionTime, eventTime);
|
||||
expandedOneTimeCounter.inc();
|
||||
DateTime billingTime = eventTime.plus(tld.getAutoRenewGracePeriodLength());
|
||||
// Note that the DomainHistory is created as of transaction time, as opposed to event time.
|
||||
// This might be counterintuitive because other DomainHistories are created at the time
|
||||
// mutation events occur, such as in DomainDeleteFlow or DomainRenewFlow. Therefore, it is
|
||||
// possible to have a DomainHistory for a delete during the autorenew grace period with a
|
||||
// modification time before that of the DomainHistory for the autorenew itself. This is not
|
||||
// ideal, but necessary because we save the **current** state of the domain (as of transaction
|
||||
// time) to the DomainHistory , instead of the state of the domain as of event time (which
|
||||
// would required loading the domain from DomainHistory at event time).
|
||||
//
|
||||
// Even though doing the loading is seemly possible, it generally is a bad idea to create
|
||||
// DomainHistories retroactively and in all instances that we create a HistoryEntry we always
|
||||
// set the modification time to the transaction time. It would also violate the invariance
|
||||
// that a DomainHistory with a higher revision ID (which is always allocated with monotonic
|
||||
// increase) always has a later modification time.
|
||||
//
|
||||
// Lastly because the domain entity itself did not change as part of the expansion, we should
|
||||
// not project it to transaction time before saving it in the history, which would require us
|
||||
// to save the projected domain as well. Any changes to the domain itself are handled when
|
||||
// the domain is actually used or explicitly projected and saved. The DomainHistory created
|
||||
// here does not actually affect anything materially (e.g. RDE). We can understand it in such
|
||||
// a way that this history represents not when the domain is autorenewed (at event time), but
|
||||
// when its autorenew billing event is created (at transaction time).
|
||||
DomainHistory historyEntry =
|
||||
new DomainHistory.Builder()
|
||||
.setBySuperuser(false)
|
||||
.setRegistrarId(recurring.getRegistrarId())
|
||||
.setModificationTime(tm().getTransactionTime())
|
||||
.setDomain(domain)
|
||||
.setPeriod(Period.create(1, YEARS))
|
||||
.setReason("Domain autorenewal by ExpandRecurringBillingEventsPipeline")
|
||||
.setRequestedByRegistrar(false)
|
||||
.setType(DOMAIN_AUTORENEW)
|
||||
.setDomainTransactionRecords(
|
||||
// Don't write a domain transaction record if the domain is deleted before billing
|
||||
// time (i.e. within the autorenew grace period). We cannot rely on a negating
|
||||
// DomainHistory created by DomainDeleteFlow because it only cancels transaction
|
||||
// records already present. In this case the domain was deleted before this
|
||||
// pipeline runs to expand the OneTime (which should be rare because this pipeline
|
||||
// should run every day), and no negating transaction records would have been
|
||||
// created when the deletion occurred. Again, there is no need to project the
|
||||
// domain, because if it were deleted before this transaction, its updated delete
|
||||
// time would have already been loaded here.
|
||||
//
|
||||
// We don't compare recurrence end time with billing time because the recurrence
|
||||
// could be caused for other reasons during the grace period, like a manual
|
||||
// renewal, in which case we still want to write the transaction record. Also,
|
||||
// the expansion happens when event time is in scope, which means the billing time
|
||||
// is still 45 days in the future, and the recurrence could have been closed
|
||||
// between now and then.
|
||||
//
|
||||
// A side effect of this logic is that if a transfer occurs within the ARGP, it
|
||||
// would have recorded both a TRANSFER_SUCCESSFUL and a NET_RENEWS_1_YEAR, even
|
||||
// though the transfer would have subsumed the autorenew. There is no perfect
|
||||
// solution for this because even if we expand the recurrence when the billing
|
||||
// event is in scope (as was the case in the old action), we still cannot use
|
||||
// recurrence end time < billing time as an indicator for if a transfer had
|
||||
// occurred during ARGP (see last paragraph, renewals during ARGP also close the
|
||||
// recurrence),therefore we still cannot always be correct when constructing the
|
||||
// transaction records that way (either we miss transfers, or we miss renewals
|
||||
// during ARGP).
|
||||
//
|
||||
// See: DomainFlowUtils#createCancellingRecords
|
||||
domain.getDeletionTime().isBefore(billingTime)
|
||||
? ImmutableSet.of()
|
||||
: ImmutableSet.of(
|
||||
DomainTransactionRecord.create(
|
||||
tld.getTldStr(),
|
||||
// We report this when the autorenew grace period ends.
|
||||
billingTime,
|
||||
TransactionReportField.netRenewsFieldFromYears(1),
|
||||
1)))
|
||||
.build();
|
||||
results.add(historyEntry);
|
||||
|
||||
// It is OK to always create a OneTime, even though the domain might be deleted or transferred
|
||||
// later during autorenew grace period, as a cancellation will always be written out in those
|
||||
// instances.
|
||||
OneTime oneTime =
|
||||
new OneTime.Builder()
|
||||
.setBillingTime(billingTime)
|
||||
.setRegistrarId(recurring.getRegistrarId())
|
||||
// Determine the cost for a one-year renewal.
|
||||
.setCost(
|
||||
domainPricingLogic
|
||||
.getRenewPrice(tld, recurring.getTargetId(), eventTime, 1, recurring)
|
||||
.getRenewCost())
|
||||
.setEventTime(eventTime)
|
||||
.setFlags(union(recurring.getFlags(), Flag.SYNTHETIC))
|
||||
.setDomainHistory(historyEntry)
|
||||
.setPeriodYears(1)
|
||||
.setReason(recurring.getReason())
|
||||
.setSyntheticCreationTime(endTime)
|
||||
.setCancellationMatchingBillingEvent(recurring)
|
||||
.setTargetId(recurring.getTargetId())
|
||||
.build();
|
||||
results.add(oneTime);
|
||||
}
|
||||
results.add(
|
||||
recurring.asBuilder().setRecurrenceLastExpansion(recurrenceLastExpansionTime).build());
|
||||
}
|
||||
|
||||
private PDone advanceCursor(PCollection<Void> persisted) {
|
||||
return PDone.in(
|
||||
persisted
|
||||
.getPipeline()
|
||||
.apply("Create one dummy element", Create.of((Void) null))
|
||||
.apply("Wait for all saves to finish", Wait.on(persisted))
|
||||
// Because only one dummy element is created in the start PCollection, this
|
||||
// transform is guaranteed to only process one element and therefore only run once.
|
||||
// Because the previous step waits for all emissions of voids from the expansion step to
|
||||
// finish, this transform is guaranteed to run only after all expansions are done and
|
||||
// persisted.
|
||||
.apply(
|
||||
"Advance cursor",
|
||||
ParDo.of(
|
||||
new DoFn<Void, Void>() {
|
||||
@ProcessElement
|
||||
public void processElement() {
|
||||
tm().transact(
|
||||
() -> {
|
||||
DateTime currentCursorTime =
|
||||
tm().loadByKeyIfPresent(
|
||||
Cursor.createGlobalVKey(RECURRING_BILLING))
|
||||
.orElse(
|
||||
Cursor.createGlobal(RECURRING_BILLING, START_OF_TIME))
|
||||
.getCursorTime();
|
||||
if (!currentCursorTime.equals(startTime)) {
|
||||
throw new IllegalStateException(
|
||||
String.format(
|
||||
"Current cursor position %s does not match start time"
|
||||
+ " %s.",
|
||||
currentCursorTime, startTime));
|
||||
}
|
||||
tm().put(Cursor.createGlobal(RECURRING_BILLING, endTime));
|
||||
});
|
||||
}
|
||||
}))
|
||||
.getPipeline());
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
PipelineOptionsFactory.register(ExpandRecurringBillingEventsPipelineOptions.class);
|
||||
ExpandRecurringBillingEventsPipelineOptions options =
|
||||
PipelineOptionsFactory.fromArgs(args)
|
||||
.withValidation()
|
||||
.as(ExpandRecurringBillingEventsPipelineOptions.class);
|
||||
// Hardcode the transaction level to be at serializable we do not want concurrent runs of the
|
||||
// pipeline for the same window to create duplicate OneTimes. This ensures that the set of
|
||||
// existing OneTimes do not change by the time new OneTimes are inserted within a transaction.
|
||||
//
|
||||
// Per PostgreSQL, serializable isolation level does not introduce any blocking beyond that
|
||||
// present in repeatable read other than some overhead related to monitoring possible
|
||||
// serializable anomalies. Therefore, in most cases, since each worker of the same job works on
|
||||
// a different set of recurrings, it is not possible for their execution order to affect
|
||||
// serialization outcome, and the performance penalty should be minimum when using serializable
|
||||
// compared to using repeatable read.
|
||||
//
|
||||
// We should pay some attention to the runtime of the job and logs when we run this job daily on
|
||||
// production to check the actual performance impact for using this isolation level (i.e. check
|
||||
// the frequency of occurrence of retried transactions due to serialization errors) to assess
|
||||
// the actual parallelism of the job.
|
||||
//
|
||||
// See: https://www.postgresql.org/docs/current/transaction-iso.html
|
||||
options.setIsolationOverride(TransactionIsolationLevel.TRANSACTION_SERIALIZABLE);
|
||||
Pipeline pipeline = Pipeline.create(options);
|
||||
new ExpandRecurringBillingEventsPipeline(options, new SystemClock()).run(pipeline);
|
||||
}
|
||||
|
||||
@Singleton
|
||||
@Component(
|
||||
modules = {CustomLogicModule.class, CustomLogicFactoryModule.class, ConfigModule.class})
|
||||
interface PipelineComponent {
|
||||
|
||||
DomainPricingLogic domainPricingLogic();
|
||||
|
||||
@Config("jdbcBatchSize")
|
||||
int batchSize();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,49 @@
|
|||
// Copyright 2022 The Nomulus Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package google.registry.beam.billing;
|
||||
|
||||
import google.registry.beam.common.RegistryPipelineOptions;
|
||||
import org.apache.beam.sdk.options.Default;
|
||||
import org.apache.beam.sdk.options.Description;
|
||||
|
||||
public interface ExpandRecurringBillingEventsPipelineOptions extends RegistryPipelineOptions {
|
||||
@Description(
|
||||
"The inclusive lower bound of on the range of event times that will be expanded, in ISO 8601"
|
||||
+ " format")
|
||||
String getStartTime();
|
||||
|
||||
void setStartTime(String startTime);
|
||||
|
||||
@Description(
|
||||
"The exclusive upper bound of on the range of event times that will be expanded, in ISO 8601"
|
||||
+ " format")
|
||||
String getEndTime();
|
||||
|
||||
void setEndTime(String endTime);
|
||||
|
||||
@Description("If true, the expanded billing events and history entries will not be saved.")
|
||||
@Default.Boolean(false)
|
||||
boolean getIsDryRun();
|
||||
|
||||
void setIsDryRun(boolean isDryRun);
|
||||
|
||||
@Description(
|
||||
"If true, set the RECURRING_BILLING global cursor to endTime after saving all expanded"
|
||||
+ " billing events and history entries.")
|
||||
@Default.Boolean(true)
|
||||
boolean getAdvanceCursor();
|
||||
|
||||
void setAdvanceCursor(boolean advanceCursor);
|
||||
}
|
|
@ -12,16 +12,16 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package google.registry.beam.invoicing;
|
||||
package google.registry.beam.billing;
|
||||
|
||||
import static com.google.common.collect.ImmutableSet.toImmutableSet;
|
||||
import static org.apache.beam.sdk.values.TypeDescriptors.strings;
|
||||
|
||||
import com.google.common.flogger.FluentLogger;
|
||||
import google.registry.beam.billing.BillingEvent.InvoiceGroupingKey;
|
||||
import google.registry.beam.billing.BillingEvent.InvoiceGroupingKey.InvoiceGroupingKeyCoder;
|
||||
import google.registry.beam.common.RegistryJpaIO;
|
||||
import google.registry.beam.common.RegistryJpaIO.Read;
|
||||
import google.registry.beam.invoicing.BillingEvent.InvoiceGroupingKey;
|
||||
import google.registry.beam.invoicing.BillingEvent.InvoiceGroupingKey.InvoiceGroupingKeyCoder;
|
||||
import google.registry.model.billing.BillingEvent.Flag;
|
||||
import google.registry.model.billing.BillingEvent.OneTime;
|
||||
import google.registry.model.registrar.Registrar;
|
|
@ -12,7 +12,7 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package google.registry.beam.invoicing;
|
||||
package google.registry.beam.billing;
|
||||
|
||||
import google.registry.beam.common.RegistryPipelineOptions;
|
||||
import org.apache.beam.sdk.options.Description;
|
|
@ -71,7 +71,7 @@ public final class RegistryJpaIO {
|
|||
}
|
||||
|
||||
/**
|
||||
* Returns a {@link Read} connector based on the given {@code jpql} query string.
|
||||
* Returns a {@link Read} connector based on the given native or {@code jpql} query string.
|
||||
*
|
||||
* <p>User should take care to prevent sql-injection attacks.
|
||||
*/
|
||||
|
|
|
@ -24,8 +24,10 @@ import java.util.stream.Stream;
|
|||
import javax.annotation.Nullable;
|
||||
import javax.persistence.EntityManager;
|
||||
import javax.persistence.Query;
|
||||
import javax.persistence.TemporalType;
|
||||
import javax.persistence.TypedQuery;
|
||||
import javax.persistence.criteria.CriteriaQuery;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
/** Interface for query instances used by {@link RegistryJpaIO.Read}. */
|
||||
public interface RegistryQuery<T> extends Serializable {
|
||||
|
@ -57,7 +59,14 @@ public interface RegistryQuery<T> extends Serializable {
|
|||
Query query =
|
||||
nativeQuery ? entityManager.createNativeQuery(sql) : entityManager.createQuery(sql);
|
||||
if (parameters != null) {
|
||||
parameters.forEach(query::setParameter);
|
||||
parameters.forEach(
|
||||
(key, value) -> {
|
||||
if (value instanceof DateTime) {
|
||||
query.setParameter(key, ((DateTime) value).toDate(), TemporalType.TIMESTAMP);
|
||||
} else {
|
||||
query.setParameter(key, value);
|
||||
}
|
||||
});
|
||||
}
|
||||
JpaTransactionManager.setQueryFetchSize(query, QUERY_FETCH_SIZE);
|
||||
@SuppressWarnings("unchecked")
|
||||
|
|
|
@ -170,7 +170,6 @@ import org.joda.time.DateTime;
|
|||
* @see <a href="https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates">Using
|
||||
* Flex Templates</a>
|
||||
*/
|
||||
@SuppressWarnings("ALL")
|
||||
@Singleton
|
||||
public class RdePipeline implements Serializable {
|
||||
|
||||
|
|
|
@ -575,9 +575,9 @@ public final class RegistryConfig {
|
|||
/**
|
||||
* Returns the default job region to run Apache Beam (Cloud Dataflow) jobs in.
|
||||
*
|
||||
* @see google.registry.beam.invoicing.InvoicingPipeline
|
||||
* @see google.registry.beam.billing.InvoicingPipeline
|
||||
* @see google.registry.beam.spec11.Spec11Pipeline
|
||||
* @see google.registry.beam.invoicing.InvoicingPipeline
|
||||
* @see google.registry.beam.billing.InvoicingPipeline
|
||||
*/
|
||||
@Provides
|
||||
@Config("defaultJobRegion")
|
||||
|
@ -655,7 +655,7 @@ public final class RegistryConfig {
|
|||
/**
|
||||
* Returns the URL of the GCS bucket we store invoices and detail reports in.
|
||||
*
|
||||
* @see google.registry.beam.invoicing.InvoicingPipeline
|
||||
* @see google.registry.beam.billing.InvoicingPipeline
|
||||
*/
|
||||
@Provides
|
||||
@Config("billingBucketUrl")
|
||||
|
@ -691,7 +691,7 @@ public final class RegistryConfig {
|
|||
/**
|
||||
* Returns the file prefix for the invoice CSV file.
|
||||
*
|
||||
* @see google.registry.beam.invoicing.InvoicingPipeline
|
||||
* @see google.registry.beam.billing.InvoicingPipeline
|
||||
* @see google.registry.reporting.billing.BillingEmailUtils
|
||||
*/
|
||||
@Provides
|
||||
|
|
|
@ -258,7 +258,7 @@
|
|||
<cron>
|
||||
<url><![CDATA[/_dr/cron/fanout?queue=retryable-cron-tasks&endpoint=/_dr/task/generateInvoices?shouldPublish=true&runInEmpty]]></url>
|
||||
<description>
|
||||
Starts the beam/invoicing/InvoicingPipeline Dataflow template, which creates the overall invoice and
|
||||
Starts the beam/billing/InvoicingPipeline Dataflow template, which creates the overall invoice and
|
||||
detail report CSVs for last month, storing them in gs://[PROJECT-ID]-billing/invoices/yyyy-MM.
|
||||
Upon success, sends an e-mail copy of the invoice to billing personnel, and copies detail
|
||||
reports to the associated registrars' drive folders.
|
||||
|
|
|
@ -1131,7 +1131,7 @@ public class DomainFlowUtils {
|
|||
* hasn't been reported yet and b) matches the predicate 3. Return the transactionRecords under
|
||||
* the most recent HistoryEntry that fits the above criteria, with negated reportAmounts.
|
||||
*/
|
||||
static ImmutableSet<DomainTransactionRecord> createCancelingRecords(
|
||||
public static ImmutableSet<DomainTransactionRecord> createCancelingRecords(
|
||||
Domain domain,
|
||||
final DateTime now,
|
||||
Duration maxSearchPeriod,
|
||||
|
|
|
@ -469,6 +469,7 @@ public abstract class BillingEvent extends ImmutableObject
|
|||
@Index(columnList = "eventTime"),
|
||||
@Index(columnList = "domainRepoId"),
|
||||
@Index(columnList = "recurrenceEndTime"),
|
||||
@Index(columnList = "recurrenceLastExpansion"),
|
||||
@Index(columnList = "recurrence_time_of_year")
|
||||
})
|
||||
@AttributeOverride(name = "id", column = @Column(name = "billing_recurrence_id"))
|
||||
|
@ -481,6 +482,16 @@ public abstract class BillingEvent extends ImmutableObject
|
|||
*/
|
||||
DateTime recurrenceEndTime;
|
||||
|
||||
/**
|
||||
* The most recent {@link DateTime} when this recurrence was expanded.
|
||||
*
|
||||
* <p>We only bother checking recurrences for potential expansion if this is at least one year
|
||||
* in the past. If it's more recent than that, it means that the recurrence was already expanded
|
||||
* too recently to need to be checked again (as domains autorenew each year).
|
||||
*/
|
||||
@Column(nullable = false)
|
||||
DateTime recurrenceLastExpansion;
|
||||
|
||||
/**
|
||||
* The eventTime recurs every year on this [month, day, time] between {@link #eventTime} and
|
||||
* {@link #recurrenceEndTime}, inclusive of the start but not of the end.
|
||||
|
@ -519,6 +530,10 @@ public abstract class BillingEvent extends ImmutableObject
|
|||
return recurrenceEndTime;
|
||||
}
|
||||
|
||||
public DateTime getRecurrenceLastExpansion() {
|
||||
return recurrenceLastExpansion;
|
||||
}
|
||||
|
||||
public TimeOfYear getRecurrenceTimeOfYear() {
|
||||
return recurrenceTimeOfYear;
|
||||
}
|
||||
|
@ -559,6 +574,11 @@ public abstract class BillingEvent extends ImmutableObject
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder setRecurrenceLastExpansion(DateTime recurrenceLastExpansion) {
|
||||
getInstance().recurrenceLastExpansion = recurrenceLastExpansion;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setRenewalPriceBehavior(RenewalPriceBehavior renewalPriceBehavior) {
|
||||
getInstance().renewalPriceBehavior = renewalPriceBehavior;
|
||||
return this;
|
||||
|
@ -574,6 +594,12 @@ public abstract class BillingEvent extends ImmutableObject
|
|||
Recurring instance = getInstance();
|
||||
checkNotNull(instance.eventTime);
|
||||
checkNotNull(instance.reason);
|
||||
// Don't require recurrenceLastExpansion to be individually set on every new Recurrence.
|
||||
// The correct default value if not otherwise set is the event time of the recurrence minus
|
||||
// 1 year.
|
||||
instance.recurrenceLastExpansion =
|
||||
Optional.ofNullable(instance.recurrenceLastExpansion)
|
||||
.orElse(instance.eventTime.minusYears(1));
|
||||
checkArgument(
|
||||
instance.renewalPriceBehavior == RenewalPriceBehavior.SPECIFIED
|
||||
^ instance.renewalPrice == null,
|
||||
|
|
|
@ -602,7 +602,7 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager {
|
|||
DateTime transactionTime;
|
||||
|
||||
// The set of entity objects that have been either persisted (via insert()) or merged (via
|
||||
// put()/update()). If the entity manager returns these as a result of a find() or query
|
||||
// put()/update()). If the entity manager returns these as a result of a find() or query
|
||||
// operation, we can not detach them -- detaching removes them from the transaction and causes
|
||||
// them to not be saved to the database -- so we throw an exception instead.
|
||||
Set<Object> objectsToSave = Collections.newSetFromMap(new IdentityHashMap<>());
|
||||
|
|
|
@ -52,7 +52,7 @@ import org.joda.time.YearMonth;
|
|||
* Invokes the {@code InvoicingPipeline} beam template via the REST api, and enqueues the {@link
|
||||
* PublishInvoicesAction} to publish the subsequent output.
|
||||
*
|
||||
* <p>This action runs the {@link google.registry.beam.invoicing.InvoicingPipeline} beam flex
|
||||
* <p>This action runs the {@link google.registry.beam.billing.InvoicingPipeline} beam flex
|
||||
* template. The pipeline then generates invoices for the month and stores them on GCS.
|
||||
*/
|
||||
@Action(
|
||||
|
|
|
@ -39,7 +39,7 @@ import javax.inject.Inject;
|
|||
import org.joda.time.YearMonth;
|
||||
|
||||
/**
|
||||
* Uploads the results of the {@link google.registry.beam.invoicing.InvoicingPipeline}.
|
||||
* Uploads the results of the {@link google.registry.beam.billing.InvoicingPipeline}.
|
||||
*
|
||||
* <p>This relies on the retry semantics in {@code queue.xml} to ensure proper upload, in spite of
|
||||
* fluctuations in generation timing.
|
||||
|
|
|
@ -0,0 +1,51 @@
|
|||
{
|
||||
"name": "Expand Recurring Billings Events for Implicit Auto-Renewals",
|
||||
"description": "An Apache Beam batch pipeline that finds all auto-renewals that have implicitly occurred between the given window and creates the corresponding billing events and hisotry entries.",
|
||||
"parameters": [
|
||||
{
|
||||
"name": "registryEnvironment",
|
||||
"label": "The Registry environment.",
|
||||
"helpText": "The Registry environment.",
|
||||
"is_optional": false,
|
||||
"regexes": [
|
||||
"^PRODUCTION|SANDBOX|CRASH|QA|ALPHA$"
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "startTime",
|
||||
"label": "The inclusive lower bound of the operation window.",
|
||||
"helpText": "The inclusive lower bound of the operation window, in ISO 8601 format.",
|
||||
"is_optional": false
|
||||
},
|
||||
{
|
||||
"name": "endTime",
|
||||
"label": "The exclusive upper bound of the operation window.",
|
||||
"helpText": "The exclusive upper bound of the operation window, in ISO 8601 format.",
|
||||
"is_optional": false
|
||||
},
|
||||
{
|
||||
"name": "shard",
|
||||
"label": "The exclusive upper bound of the operation window.",
|
||||
"helpText": "The exclusive upper bound of the operation window, in ISO 8601 format.",
|
||||
"is_optional": true
|
||||
},
|
||||
{
|
||||
"name": "isDryRun",
|
||||
"label": "Whether this job is a dry run.",
|
||||
"helpText": "If true, no changes will be saved to the database.",
|
||||
"is_optional": true,
|
||||
"regexes": [
|
||||
"^true|false$"
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "advanceCursor",
|
||||
"label": "Whether the BILLING_TIME global cursor should be advanced.",
|
||||
"helpText": "If true, after all expansions are persisted, the cursor will be changed from startTime to endTime.",
|
||||
"is_optional": true,
|
||||
"regexes": [
|
||||
"^true|false$"
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
|
@ -12,12 +12,12 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package google.registry.beam.invoicing;
|
||||
package google.registry.beam.billing;
|
||||
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
|
||||
import google.registry.beam.invoicing.BillingEvent.InvoiceGroupingKey;
|
||||
import google.registry.beam.invoicing.BillingEvent.InvoiceGroupingKey.InvoiceGroupingKeyCoder;
|
||||
import google.registry.beam.billing.BillingEvent.InvoiceGroupingKey;
|
||||
import google.registry.beam.billing.BillingEvent.InvoiceGroupingKey.InvoiceGroupingKeyCoder;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
|
@ -0,0 +1,549 @@
|
|||
// Copyright 2022 The Nomulus Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package google.registry.beam.billing;
|
||||
|
||||
import static com.google.common.collect.ImmutableList.toImmutableList;
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
import static google.registry.model.EppResourceUtils.loadByForeignKey;
|
||||
import static google.registry.model.ImmutableObjectSubject.immutableObjectCorrespondence;
|
||||
import static google.registry.model.common.Cursor.CursorType.RECURRING_BILLING;
|
||||
import static google.registry.model.domain.Period.Unit.YEARS;
|
||||
import static google.registry.model.reporting.HistoryEntry.Type.DOMAIN_AUTORENEW;
|
||||
import static google.registry.model.reporting.HistoryEntry.Type.DOMAIN_CREATE;
|
||||
import static google.registry.model.reporting.HistoryEntryDao.loadHistoryObjectsForResource;
|
||||
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
|
||||
import static google.registry.testing.DatabaseHelper.assertBillingEventsForResource;
|
||||
import static google.registry.testing.DatabaseHelper.createTld;
|
||||
import static google.registry.testing.DatabaseHelper.getOnlyHistoryEntryOfType;
|
||||
import static google.registry.testing.DatabaseHelper.persistActiveDomain;
|
||||
import static google.registry.testing.DatabaseHelper.persistPremiumList;
|
||||
import static google.registry.testing.DatabaseHelper.persistResource;
|
||||
import static google.registry.util.DateTimeUtils.END_OF_TIME;
|
||||
import static org.joda.money.CurrencyUnit.USD;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import google.registry.beam.TestPipelineExtension;
|
||||
import google.registry.model.billing.BillingEvent;
|
||||
import google.registry.model.billing.BillingEvent.Flag;
|
||||
import google.registry.model.billing.BillingEvent.OneTime;
|
||||
import google.registry.model.billing.BillingEvent.Reason;
|
||||
import google.registry.model.billing.BillingEvent.Recurring;
|
||||
import google.registry.model.common.Cursor;
|
||||
import google.registry.model.domain.Domain;
|
||||
import google.registry.model.domain.DomainHistory;
|
||||
import google.registry.model.domain.Period;
|
||||
import google.registry.model.reporting.DomainTransactionRecord;
|
||||
import google.registry.model.reporting.DomainTransactionRecord.TransactionReportField;
|
||||
import google.registry.model.tld.Registry;
|
||||
import google.registry.persistence.PersistenceModule.TransactionIsolationLevel;
|
||||
import google.registry.persistence.transaction.JpaTestExtensions;
|
||||
import google.registry.persistence.transaction.JpaTestExtensions.JpaIntegrationTestExtension;
|
||||
import google.registry.testing.FakeClock;
|
||||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import org.apache.beam.runners.direct.DirectOptions;
|
||||
import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
|
||||
import org.apache.beam.sdk.options.PipelineOptionsFactory;
|
||||
import org.hibernate.cfg.AvailableSettings;
|
||||
import org.joda.money.Money;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Duration;
|
||||
import org.joda.time.format.DateTimeFormat;
|
||||
import org.joda.time.format.DateTimeFormatter;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.ValueSource;
|
||||
|
||||
/** Unit tests for {@link ExpandRecurringBillingEventsPipeline}. */
|
||||
public class ExpandRecurringBillingEventsPipelineTest {
|
||||
|
||||
private static final DateTimeFormatter DATE_TIME_FORMATTER =
|
||||
DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
|
||||
|
||||
private final FakeClock clock = new FakeClock(DateTime.parse("2021-02-02T00:00:05Z"));
|
||||
|
||||
private final DateTime startTime = DateTime.parse("2021-02-01TZ");
|
||||
|
||||
private DateTime endTime = DateTime.parse("2021-02-02TZ");
|
||||
|
||||
private final Cursor cursor = Cursor.createGlobal(RECURRING_BILLING, startTime);
|
||||
|
||||
private Domain domain;
|
||||
|
||||
private Recurring recurring;
|
||||
|
||||
private final TestOptions options = PipelineOptionsFactory.create().as(TestOptions.class);
|
||||
|
||||
@RegisterExtension
|
||||
final JpaIntegrationTestExtension jpa =
|
||||
new JpaTestExtensions.Builder()
|
||||
.withClock(clock)
|
||||
.withProperty(
|
||||
AvailableSettings.ISOLATION,
|
||||
TransactionIsolationLevel.TRANSACTION_SERIALIZABLE.name())
|
||||
.buildIntegrationTestExtension();
|
||||
|
||||
@RegisterExtension
|
||||
final TestPipelineExtension pipeline =
|
||||
TestPipelineExtension.create().enableAbandonedNodeEnforcement(true);
|
||||
|
||||
@BeforeEach
|
||||
void beforeEach() {
|
||||
// Set up the pipeline.
|
||||
options.setStartTime(DATE_TIME_FORMATTER.print(startTime));
|
||||
options.setEndTime(DATE_TIME_FORMATTER.print(endTime));
|
||||
options.setIsDryRun(false);
|
||||
options.setAdvanceCursor(true);
|
||||
tm().transact(() -> tm().put(cursor));
|
||||
|
||||
// Set up the database.
|
||||
createTld("tld");
|
||||
recurring = createDomainAtTime("example.tld", startTime.minusYears(1).plusHours(12));
|
||||
domain = loadByForeignKey(Domain.class, "example.tld", clock.nowUtc()).get();
|
||||
}
|
||||
|
||||
@Test
|
||||
void testFailure_endTimeAfterNow() {
|
||||
options.setEndTime(DATE_TIME_FORMATTER.print(clock.nowUtc().plusMillis(1)));
|
||||
IllegalArgumentException thrown =
|
||||
assertThrows(IllegalArgumentException.class, this::runPipeline);
|
||||
assertThat(thrown)
|
||||
.hasMessageThat()
|
||||
.contains("End time 2021-02-02T00:00:05.001Z must be on or before now");
|
||||
}
|
||||
|
||||
@Test
|
||||
void testFailure_endTimeBeforeStartTime() {
|
||||
options.setEndTime(DATE_TIME_FORMATTER.print(startTime.minusMillis(1)));
|
||||
IllegalArgumentException thrown =
|
||||
assertThrows(IllegalArgumentException.class, this::runPipeline);
|
||||
assertThat(thrown)
|
||||
.hasMessageThat()
|
||||
.contains("[2021-02-01T00:00:00.000Z, 2021-01-31T23:59:59.999Z)");
|
||||
}
|
||||
|
||||
@Test
|
||||
void testSuccess_expandSingleEvent() {
|
||||
runPipeline();
|
||||
|
||||
// Assert about DomainHistory.
|
||||
assertAutoRenewDomainHistories(defaultDomainHistory());
|
||||
|
||||
// Assert about BillingEvents.
|
||||
assertBillingEventsForResource(
|
||||
domain,
|
||||
defaultOneTime(getOnlyAutoRenewHistory()),
|
||||
recurring
|
||||
.asBuilder()
|
||||
.setRecurrenceLastExpansion(domain.getCreationTime().plusYears(1))
|
||||
.build());
|
||||
|
||||
// Assert about Cursor.
|
||||
assertCursorAt(endTime);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testSuccess_expandSingleEvent_deletedDuringGracePeriod() {
|
||||
domain = persistResource(domain.asBuilder().setDeletionTime(endTime.minusHours(2)).build());
|
||||
recurring =
|
||||
persistResource(recurring.asBuilder().setRecurrenceEndTime(endTime.minusHours(2)).build());
|
||||
runPipeline();
|
||||
|
||||
// Assert about DomainHistory, no transaction record should have been written.
|
||||
assertAutoRenewDomainHistories(
|
||||
defaultDomainHistory().asBuilder().setDomainTransactionRecords(ImmutableSet.of()).build());
|
||||
|
||||
// Assert about BillingEvents.
|
||||
assertBillingEventsForResource(
|
||||
domain,
|
||||
defaultOneTime(getOnlyAutoRenewHistory()),
|
||||
recurring
|
||||
.asBuilder()
|
||||
.setRecurrenceLastExpansion(domain.getCreationTime().plusYears(1))
|
||||
.build());
|
||||
|
||||
// Assert about Cursor.
|
||||
assertCursorAt(endTime);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testFailure_expandSingleEvent_cursorNotAtStartTime() {
|
||||
tm().transact(() -> tm().put(Cursor.createGlobal(RECURRING_BILLING, startTime.plusMillis(1))));
|
||||
|
||||
PipelineExecutionException thrown =
|
||||
assertThrows(PipelineExecutionException.class, this::runPipeline);
|
||||
|
||||
assertThat(thrown).hasCauseThat().hasMessageThat().contains("Current cursor position");
|
||||
|
||||
// Assert about DomainHistory.
|
||||
assertAutoRenewDomainHistories(defaultDomainHistory());
|
||||
|
||||
// Assert about BillingEvents.
|
||||
assertBillingEventsForResource(
|
||||
domain,
|
||||
defaultOneTime(getOnlyAutoRenewHistory()),
|
||||
recurring
|
||||
.asBuilder()
|
||||
.setRecurrenceLastExpansion(domain.getCreationTime().plusYears(1))
|
||||
.build());
|
||||
|
||||
// Assert that the cursor did not change.
|
||||
assertCursorAt(startTime.plusMillis(1));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testSuccess_noExpansion_recurrenceClosedBeforeEventTime() {
|
||||
recurring =
|
||||
persistResource(
|
||||
recurring
|
||||
.asBuilder()
|
||||
.setRecurrenceEndTime(recurring.getEventTime().minusDays(1))
|
||||
.build());
|
||||
runPipeline();
|
||||
assertNoExpansionsHappened();
|
||||
}
|
||||
|
||||
@Test
|
||||
void testSuccess_noExpansion_recurrenceClosedBeforeStartTime() {
|
||||
recurring =
|
||||
persistResource(recurring.asBuilder().setRecurrenceEndTime(startTime.minusDays(1)).build());
|
||||
runPipeline();
|
||||
assertNoExpansionsHappened();
|
||||
}
|
||||
|
||||
@Test
|
||||
void testSuccess_noExpansion_recurrenceClosedBeforeNextExpansion() {
|
||||
recurring =
|
||||
persistResource(
|
||||
recurring
|
||||
.asBuilder()
|
||||
.setEventTime(recurring.getEventTime().minusYears(1))
|
||||
.setRecurrenceEndTime(startTime.plusHours(6))
|
||||
.build());
|
||||
runPipeline();
|
||||
assertNoExpansionsHappened();
|
||||
}
|
||||
|
||||
@Test
|
||||
void testSuccess_noExpansion_eventTimeAfterEndTime() {
|
||||
recurring = persistResource(recurring.asBuilder().setEventTime(endTime.plusDays(1)).build());
|
||||
runPipeline();
|
||||
assertNoExpansionsHappened();
|
||||
}
|
||||
|
||||
@Test
|
||||
void testSuccess_noExpansion_LastExpansionLessThanAYearAgo() {
|
||||
recurring =
|
||||
persistResource(
|
||||
recurring
|
||||
.asBuilder()
|
||||
.setRecurrenceLastExpansion(startTime.minusYears(1).plusDays(1))
|
||||
.build());
|
||||
runPipeline();
|
||||
assertNoExpansionsHappened();
|
||||
}
|
||||
|
||||
@Test
|
||||
void testSuccess_noExpansion_oneTimeAlreadyExists() {
|
||||
DomainHistory history = persistResource(defaultDomainHistory());
|
||||
OneTime oneTime = persistResource(defaultOneTime(history));
|
||||
runPipeline();
|
||||
|
||||
// Assert about DomainHistory.
|
||||
assertAutoRenewDomainHistories(history);
|
||||
|
||||
// Assert about BillingEvents. No expansion happened, so last recurrence expansion time is
|
||||
// unchanged.
|
||||
assertBillingEventsForResource(domain, oneTime, recurring);
|
||||
|
||||
// Assert about Cursor.
|
||||
assertCursorAt(endTime);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testSuccess_expandSingleEvent_dryRun() {
|
||||
options.setIsDryRun(true);
|
||||
runPipeline();
|
||||
assertNoExpansionsHappened(true);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testSuccess_expandSingleEvent_doesNotAdvanceCursor() {
|
||||
options.setAdvanceCursor(false);
|
||||
|
||||
runPipeline();
|
||||
// Assert about DomainHistory.
|
||||
assertAutoRenewDomainHistories(defaultDomainHistory());
|
||||
|
||||
// Assert about BillingEvents.
|
||||
assertBillingEventsForResource(
|
||||
domain,
|
||||
defaultOneTime(getOnlyAutoRenewHistory()),
|
||||
recurring
|
||||
.asBuilder()
|
||||
.setRecurrenceLastExpansion(domain.getCreationTime().plusYears(1))
|
||||
.build());
|
||||
|
||||
// Assert that the cursor did not move.
|
||||
assertCursorAt(startTime);
|
||||
}
|
||||
|
||||
// We control the number of threads used in the pipeline to test if the batching behavior works
|
||||
// properly. When two threads are used, the two recurrings are processed in different workers and
|
||||
// should be processed in parallel.
|
||||
@ParameterizedTest
|
||||
@ValueSource(ints = {1, 2})
|
||||
void testSuccess_expandMultipleEvents_multipleDomains(int numOfThreads) {
|
||||
createTld("test");
|
||||
persistResource(
|
||||
Registry.get("test")
|
||||
.asBuilder()
|
||||
.setPremiumList(persistPremiumList("premium", USD, "other,USD 100"))
|
||||
.build());
|
||||
DateTime otherCreateTime = startTime.minusYears(1).plusHours(5);
|
||||
Recurring otherRecurring = createDomainAtTime("other.test", otherCreateTime);
|
||||
Domain otherDomain = loadByForeignKey(Domain.class, "other.test", clock.nowUtc()).get();
|
||||
|
||||
options.setTargetParallelism(numOfThreads);
|
||||
runPipeline();
|
||||
|
||||
// Assert about DomainHistory.
|
||||
DomainHistory history = defaultDomainHistory();
|
||||
DomainHistory otherHistory = defaultDomainHistory(otherDomain);
|
||||
assertAutoRenewDomainHistories(domain, history);
|
||||
assertAutoRenewDomainHistories(otherDomain, otherHistory);
|
||||
|
||||
// Assert about BillingEvents.
|
||||
assertBillingEventsForResource(
|
||||
domain,
|
||||
defaultOneTime(getOnlyAutoRenewHistory()),
|
||||
recurring
|
||||
.asBuilder()
|
||||
.setRecurrenceLastExpansion(domain.getCreationTime().plusYears(1))
|
||||
.build());
|
||||
assertBillingEventsForResource(
|
||||
otherDomain,
|
||||
defaultOneTime(otherDomain, getOnlyAutoRenewHistory(otherDomain), otherRecurring, 100),
|
||||
otherRecurring
|
||||
.asBuilder()
|
||||
.setRecurrenceLastExpansion(otherDomain.getCreationTime().plusYears(1))
|
||||
.build());
|
||||
|
||||
// Assert about Cursor.
|
||||
assertCursorAt(endTime);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testSuccess_expandMultipleEvents_multipleEventTime() {
|
||||
clock.advanceBy(Duration.standardDays(365));
|
||||
endTime = endTime.plusYears(1);
|
||||
options.setEndTime(DATE_TIME_FORMATTER.print(endTime));
|
||||
|
||||
runPipeline();
|
||||
// Assert about DomainHistory.
|
||||
assertAutoRenewDomainHistories(
|
||||
defaultDomainHistory(),
|
||||
defaultDomainHistory()
|
||||
.asBuilder()
|
||||
.setDomainTransactionRecords(
|
||||
ImmutableSet.of(
|
||||
DomainTransactionRecord.create(
|
||||
domain.getTld(),
|
||||
// We report this when the autorenew grace period ends.
|
||||
domain
|
||||
.getCreationTime()
|
||||
.plusYears(2)
|
||||
.plus(Registry.DEFAULT_AUTO_RENEW_GRACE_PERIOD),
|
||||
TransactionReportField.netRenewsFieldFromYears(1),
|
||||
1)))
|
||||
.build());
|
||||
|
||||
// Assert about BillingEvents.
|
||||
ImmutableList<DomainHistory> histories =
|
||||
loadHistoryObjectsForResource(domain.createVKey(), DomainHistory.class).stream()
|
||||
.filter(domainHistory -> DOMAIN_AUTORENEW.equals(domainHistory.getType()))
|
||||
.sorted(
|
||||
Comparator.comparing(
|
||||
h ->
|
||||
h.getDomainTransactionRecords().stream()
|
||||
.findFirst()
|
||||
.get()
|
||||
.getReportingTime()))
|
||||
.collect(toImmutableList());
|
||||
assertBillingEventsForResource(
|
||||
domain,
|
||||
defaultOneTime(histories.get(0)),
|
||||
defaultOneTime(histories.get(1))
|
||||
.asBuilder()
|
||||
.setEventTime(domain.getCreationTime().plusYears(2))
|
||||
.setBillingTime(
|
||||
domain
|
||||
.getCreationTime()
|
||||
.plusYears(2)
|
||||
.plus(Registry.DEFAULT_AUTO_RENEW_GRACE_PERIOD))
|
||||
.build(),
|
||||
recurring
|
||||
.asBuilder()
|
||||
.setRecurrenceLastExpansion(domain.getCreationTime().plusYears(2))
|
||||
.build());
|
||||
|
||||
// Assert about Cursor.
|
||||
assertCursorAt(endTime);
|
||||
}
|
||||
|
||||
private void runPipeline() {
|
||||
ExpandRecurringBillingEventsPipeline expandRecurringBillingEventsPipeline =
|
||||
new ExpandRecurringBillingEventsPipeline(options, clock);
|
||||
expandRecurringBillingEventsPipeline.setupPipeline(pipeline);
|
||||
pipeline.run(options).waitUntilFinish();
|
||||
}
|
||||
|
||||
void assertNoExpansionsHappened() {
|
||||
assertNoExpansionsHappened(false);
|
||||
}
|
||||
|
||||
void assertNoExpansionsHappened(boolean dryRun) {
|
||||
// Only the original domain create history entry is present.
|
||||
List<DomainHistory> persistedHistory =
|
||||
loadHistoryObjectsForResource(domain.createVKey(), DomainHistory.class);
|
||||
assertThat(persistedHistory.size()).isEqualTo(1);
|
||||
assertThat(persistedHistory.get(0).getType()).isEqualTo(DOMAIN_CREATE);
|
||||
|
||||
// Only the original recurrence is present.
|
||||
assertBillingEventsForResource(domain, recurring);
|
||||
|
||||
// If this is not a dry run, the cursor should still be moved even though expansions happened,
|
||||
// because we still successfully processed all the needed expansions (none in this case) in the
|
||||
// window. Therefore,
|
||||
// the cursor should be up-to-date as of end time.
|
||||
assertCursorAt(dryRun ? startTime : endTime);
|
||||
}
|
||||
|
||||
private DomainHistory defaultDomainHistory() {
|
||||
return defaultDomainHistory(domain);
|
||||
}
|
||||
|
||||
private DomainHistory defaultDomainHistory(Domain domain) {
|
||||
return new DomainHistory.Builder()
|
||||
.setBySuperuser(false)
|
||||
.setRegistrarId("TheRegistrar")
|
||||
.setModificationTime(clock.nowUtc())
|
||||
.setDomain(domain)
|
||||
.setPeriod(Period.create(1, YEARS))
|
||||
.setReason("Domain autorenewal by ExpandRecurringBillingEventsPipeline")
|
||||
.setRequestedByRegistrar(false)
|
||||
.setType(DOMAIN_AUTORENEW)
|
||||
.setDomainTransactionRecords(
|
||||
ImmutableSet.of(
|
||||
DomainTransactionRecord.create(
|
||||
domain.getTld(),
|
||||
// We report this when the autorenew grace period ends.
|
||||
domain
|
||||
.getCreationTime()
|
||||
.plusYears(1)
|
||||
.plus(Registry.DEFAULT_AUTO_RENEW_GRACE_PERIOD),
|
||||
TransactionReportField.netRenewsFieldFromYears(1),
|
||||
1)))
|
||||
.build();
|
||||
}
|
||||
|
||||
private OneTime defaultOneTime(DomainHistory history) {
|
||||
return defaultOneTime(domain, history, recurring, 11);
|
||||
}
|
||||
|
||||
private OneTime defaultOneTime(
|
||||
Domain domain, DomainHistory history, Recurring recurring, int cost) {
|
||||
return new BillingEvent.OneTime.Builder()
|
||||
.setBillingTime(
|
||||
domain.getCreationTime().plusYears(1).plus(Registry.DEFAULT_AUTO_RENEW_GRACE_PERIOD))
|
||||
.setRegistrarId("TheRegistrar")
|
||||
.setCost(Money.of(USD, cost))
|
||||
.setEventTime(domain.getCreationTime().plusYears(1))
|
||||
.setFlags(ImmutableSet.of(Flag.AUTO_RENEW, Flag.SYNTHETIC))
|
||||
.setPeriodYears(1)
|
||||
.setReason(Reason.RENEW)
|
||||
.setSyntheticCreationTime(endTime)
|
||||
.setCancellationMatchingBillingEvent(recurring)
|
||||
.setTargetId(domain.getDomainName())
|
||||
.setDomainHistory(history)
|
||||
.build();
|
||||
}
|
||||
|
||||
private void assertAutoRenewDomainHistories(DomainHistory... expected) {
|
||||
assertAutoRenewDomainHistories(domain, expected);
|
||||
}
|
||||
|
||||
private static void assertAutoRenewDomainHistories(Domain domain, DomainHistory... expected) {
|
||||
ImmutableList<DomainHistory> actuals =
|
||||
loadHistoryObjectsForResource(domain.createVKey(), DomainHistory.class).stream()
|
||||
.filter(domainHistory -> DOMAIN_AUTORENEW.equals(domainHistory.getType()))
|
||||
.collect(toImmutableList());
|
||||
assertThat(actuals)
|
||||
.comparingElementsUsing(immutableObjectCorrespondence("resource", "revisionId"))
|
||||
.containsExactlyElementsIn(Arrays.asList(expected));
|
||||
assertThat(
|
||||
actuals.stream()
|
||||
.map(history -> history.getDomainBase().get())
|
||||
.collect(toImmutableList()))
|
||||
.comparingElementsUsing(immutableObjectCorrespondence("nsHosts", "updateTimestamp"))
|
||||
.containsExactlyElementsIn(
|
||||
Arrays.stream(expected)
|
||||
.map(history -> history.getDomainBase().get())
|
||||
.collect(toImmutableList()));
|
||||
}
|
||||
|
||||
private static DomainHistory getOnlyAutoRenewHistory(Domain domain) {
|
||||
return getOnlyHistoryEntryOfType(domain, DOMAIN_AUTORENEW, DomainHistory.class);
|
||||
}
|
||||
|
||||
private DomainHistory getOnlyAutoRenewHistory() {
|
||||
return getOnlyAutoRenewHistory(domain);
|
||||
}
|
||||
|
||||
private static void assertCursorAt(DateTime expectedCursorTime) {
|
||||
Cursor cursor = tm().transact(() -> tm().loadByKey(Cursor.createGlobalVKey(RECURRING_BILLING)));
|
||||
assertThat(cursor).isNotNull();
|
||||
assertThat(cursor.getCursorTime()).isEqualTo(expectedCursorTime);
|
||||
}
|
||||
|
||||
private static Recurring createDomainAtTime(String domainName, DateTime createTime) {
|
||||
Domain domain = persistActiveDomain(domainName, createTime);
|
||||
DomainHistory domainHistory =
|
||||
persistResource(
|
||||
new DomainHistory.Builder()
|
||||
.setRegistrarId(domain.getCreationRegistrarId())
|
||||
.setType(DOMAIN_CREATE)
|
||||
.setModificationTime(createTime)
|
||||
.setDomain(domain)
|
||||
.build());
|
||||
return persistResource(
|
||||
new Recurring.Builder()
|
||||
.setDomainHistory(domainHistory)
|
||||
.setRegistrarId(domain.getCreationRegistrarId())
|
||||
.setEventTime(createTime.plusYears(1))
|
||||
.setFlags(ImmutableSet.of(Flag.AUTO_RENEW))
|
||||
.setReason(Reason.RENEW)
|
||||
.setRecurrenceEndTime(END_OF_TIME)
|
||||
.setTargetId(domain.getDomainName())
|
||||
.build());
|
||||
}
|
||||
|
||||
public interface TestOptions extends ExpandRecurringBillingEventsPipelineOptions, DirectOptions {}
|
||||
}
|
|
@ -12,7 +12,7 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package google.registry.beam.invoicing;
|
||||
package google.registry.beam.billing;
|
||||
|
||||
import static com.google.common.collect.ImmutableSet.toImmutableSet;
|
||||
import static com.google.common.truth.Truth.assertThat;
|
|
@ -36,12 +36,14 @@ import google.registry.model.contact.Contact;
|
|||
import google.registry.model.domain.Domain;
|
||||
import google.registry.model.domain.GracePeriod;
|
||||
import google.registry.model.eppcommon.StatusValue;
|
||||
import google.registry.persistence.PersistenceModule.TransactionIsolationLevel;
|
||||
import google.registry.persistence.transaction.JpaTestExtensions;
|
||||
import google.registry.persistence.transaction.JpaTestExtensions.JpaIntegrationTestExtension;
|
||||
import google.registry.persistence.transaction.JpaTransactionManager;
|
||||
import google.registry.persistence.transaction.TransactionManagerFactory;
|
||||
import google.registry.testing.FakeClock;
|
||||
import org.apache.beam.sdk.options.PipelineOptionsFactory;
|
||||
import org.hibernate.cfg.Environment;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Duration;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
|
@ -60,7 +62,11 @@ public class ResaveAllEppResourcesPipelineTest {
|
|||
|
||||
@RegisterExtension
|
||||
final JpaIntegrationTestExtension database =
|
||||
new JpaTestExtensions.Builder().withClock(fakeClock).buildIntegrationTestExtension();
|
||||
new JpaTestExtensions.Builder()
|
||||
.withClock(fakeClock)
|
||||
.withProperty(
|
||||
Environment.ISOLATION, TransactionIsolationLevel.TRANSACTION_REPEATABLE_READ.name())
|
||||
.buildIntegrationTestExtension();
|
||||
|
||||
private final ResaveAllEppResourcesPipelineOptions options =
|
||||
PipelineOptionsFactory.create().as(ResaveAllEppResourcesPipelineOptions.class);
|
||||
|
|
|
@ -302,6 +302,8 @@ class DomainTransferApproveFlowTest
|
|||
getGainingClientAutorenewEvent()
|
||||
.asBuilder()
|
||||
.setEventTime(domain.getRegistrationExpirationTime())
|
||||
.setRecurrenceLastExpansion(
|
||||
domain.getRegistrationExpirationTime().minusYears(1))
|
||||
.setDomainHistory(historyEntryTransferApproved)
|
||||
.build()))
|
||||
.toArray(BillingEvent[]::new));
|
||||
|
@ -338,6 +340,8 @@ class DomainTransferApproveFlowTest
|
|||
getGainingClientAutorenewEvent()
|
||||
.asBuilder()
|
||||
.setEventTime(domain.getRegistrationExpirationTime())
|
||||
.setRecurrenceLastExpansion(
|
||||
domain.getRegistrationExpirationTime().minusYears(1))
|
||||
.setDomainHistory(historyEntryTransferApproved)
|
||||
.build()))
|
||||
.toArray(BillingEvent[]::new));
|
||||
|
@ -835,7 +839,7 @@ class DomainTransferApproveFlowTest
|
|||
"tld",
|
||||
"domain_transfer_approve.xml",
|
||||
"domain_transfer_approve_response_zero_period.xml",
|
||||
domain.getRegistrationExpirationTime().plusYears(0));
|
||||
domain.getRegistrationExpirationTime());
|
||||
assertHistoryEntriesDoNotContainTransferBillingEventsOrGracePeriods();
|
||||
}
|
||||
|
||||
|
|
|
@ -296,7 +296,11 @@ class DomainTransferRequestFlowTest
|
|||
.setRecurrenceEndTime(implicitTransferTime)
|
||||
.build();
|
||||
BillingEvent.Recurring gainingClientAutorenew =
|
||||
getGainingClientAutorenewEvent().asBuilder().setEventTime(expectedExpirationTime).build();
|
||||
getGainingClientAutorenewEvent()
|
||||
.asBuilder()
|
||||
.setEventTime(expectedExpirationTime)
|
||||
.setRecurrenceLastExpansion(expectedExpirationTime.minusYears(1))
|
||||
.build();
|
||||
// Construct extra billing events expected by the specific test.
|
||||
ImmutableSet<BillingEvent> extraBillingEvents =
|
||||
Stream.of(extraExpectedBillingEvents)
|
||||
|
|
|
@ -774,6 +774,7 @@ public class BillingEventTest extends EntityTestCase {
|
|||
.setRecurrenceEndTime(END_OF_TIME)));
|
||||
assertThat(recurringEvent.getRenewalPriceBehavior()).isEqualTo(RenewalPriceBehavior.SPECIFIED);
|
||||
assertThat(recurringEvent.getRenewalPrice()).hasValue(Money.of(USD, 100));
|
||||
assertThat(recurringEvent.getRecurrenceLastExpansion()).isEqualTo(now);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -147,7 +147,7 @@ public class JpaTestExtensions {
|
|||
}
|
||||
|
||||
/** Adds the specified property to those used to initialize the transaction manager. */
|
||||
Builder withProperty(String name, String value) {
|
||||
public Builder withProperty(String name, String value) {
|
||||
this.userProperties.put(name, value);
|
||||
return this;
|
||||
}
|
||||
|
|
|
@ -76,6 +76,7 @@
|
|||
reason text not null,
|
||||
domain_name text not null,
|
||||
recurrence_end_time timestamptz,
|
||||
recurrence_last_expansion timestamptz not null,
|
||||
recurrence_time_of_year text,
|
||||
renewal_price_amount numeric(19, 2),
|
||||
renewal_price_currency text,
|
||||
|
@ -774,6 +775,7 @@ create index IDXd3gxhkh0jk694pjvh9pyn7wjc on "BillingRecurrence" (registrar_id);
|
|||
create index IDX6syykou4nkc7hqa5p8r92cpch on "BillingRecurrence" (event_time);
|
||||
create index IDXoqttafcywwdn41um6kwlt0n8b on "BillingRecurrence" (domain_repo_id);
|
||||
create index IDXp3usbtvk0v1m14i5tdp4xnxgc on "BillingRecurrence" (recurrence_end_time);
|
||||
create index IDXp0pxi708hlu4n40qhbtihge8x on "BillingRecurrence" (recurrence_last_expansion);
|
||||
create index IDXjny8wuot75b5e6p38r47wdawu on "BillingRecurrence" (recurrence_time_of_year);
|
||||
create index IDX3y752kr9uh4kh6uig54vemx0l on "Contact" (creation_time);
|
||||
create index IDXtm415d6fe1rr35stm33s5mg18 on "Contact" (current_sponsor_registrar_id);
|
||||
|
|
|
@ -90,8 +90,10 @@ steps:
|
|||
${PROJECT_ID} \
|
||||
google.registry.beam.spec11.Spec11Pipeline \
|
||||
google/registry/beam/spec11_pipeline_metadata.json \
|
||||
google.registry.beam.invoicing.InvoicingPipeline \
|
||||
google.registry.beam.billing.InvoicingPipeline \
|
||||
google/registry/beam/invoicing_pipeline_metadata.json \
|
||||
google.registry.beam.billing.ExpandRecurringBillingEventsPipeline \
|
||||
google/registry/beam/expand_recurring_billing_events_pipeline_metadata.json \
|
||||
google.registry.beam.rde.RdePipeline \
|
||||
google/registry/beam/rde_pipeline_metadata.json \
|
||||
google.registry.beam.resave.ResaveAllEppResourcesPipeline \
|
||||
|
|
Loading…
Add table
Reference in a new issue