diff --git a/core/src/main/java/google/registry/config/RegistryConfig.java b/core/src/main/java/google/registry/config/RegistryConfig.java index 4fb9bb7f1..d083f8688 100644 --- a/core/src/main/java/google/registry/config/RegistryConfig.java +++ b/core/src/main/java/google/registry/config/RegistryConfig.java @@ -32,6 +32,8 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSortedMap; import dagger.Module; import dagger.Provides; +import google.registry.dns.ReadDnsRefreshRequestsAction; +import google.registry.model.common.DnsRefreshRequest; import google.registry.persistence.transaction.JpaTransactionManager; import google.registry.util.YamlUtils; import java.lang.annotation.Documented; @@ -294,9 +296,9 @@ public final class RegistryConfig { /** * The maximum number of domain and host updates to batch together to send to - * PublishDnsUpdatesAction, to avoid exceeding AppEngine's limits. + * PublishDnsUpdatesAction, to avoid exceeding HTTP request timeout limits. * - * @see google.registry.dns.ReadDnsQueueAction + * @see google.registry.dns.ReadDnsRefreshRequestsAction */ @Provides @Config("dnsTldUpdateBatchSize") @@ -327,23 +329,30 @@ public final class RegistryConfig { } /** - * The requested maximum duration for ReadDnsQueueAction. + * The requested maximum duration for {@link ReadDnsRefreshRequestsAction}. * - *

ReadDnsQueueAction reads update tasks from the dns-pull queue. It will continue reading - * tasks until either the queue is empty, or this duration has passed. + *

{@link ReadDnsRefreshRequestsAction} reads refresh requests from {@link DnsRefreshRequest} + * It will continue reading requests until either no requests exist that matche the condition, + * or this duration has passed. * - *

This time is the maximum duration between the first and last attempt to lease tasks from - * the dns-pull queue. The actual running time might be slightly longer, as we process the - * tasks. + *

This time is the maximum duration between the first and last attempt to read requests from + * {@link DnsRefreshRequest}. The actual running time might be slightly longer, as we process + * the requests. * - *

This value should be less than the cron-job repeat rate for ReadDnsQueueAction, to make - * sure we don't have multiple ReadDnsActions leasing tasks simultaneously. + *

The requests that are read will not be read again by any action until after this period + * has passed, so concurrent runs (or runs that are very close to each other) of {@link + * ReadDnsRefreshRequestsAction} will not keep reading the same requests with the earliest + * request time. * - * @see google.registry.dns.ReadDnsQueueAction + *

Still, this value should ideally be less than the cloud scheduler job repeat rate for + * {@link ReadDnsRefreshRequestsAction}, to not waste resources on multiple actions running at + * the same time. + * + *

see google.registry.dns.ReadDnsRefreshRequestsAction */ @Provides - @Config("readDnsQueueActionRuntime") - public static Duration provideReadDnsQueueRuntime() { + @Config("readDnsRefreshRequestsActionRuntime") + public static Duration provideReadDnsRefreshRequestsRuntime() { return Duration.standardSeconds(45); } diff --git a/core/src/main/java/google/registry/dns/DnsModule.java b/core/src/main/java/google/registry/dns/DnsModule.java index 4b182eb2a..eda74b297 100644 --- a/core/src/main/java/google/registry/dns/DnsModule.java +++ b/core/src/main/java/google/registry/dns/DnsModule.java @@ -19,6 +19,7 @@ import static google.registry.dns.DnsConstants.DNS_PULL_QUEUE_NAME; import static google.registry.dns.RefreshDnsOnHostRenameAction.PARAM_HOST_KEY; import static google.registry.request.RequestParameters.extractEnumParameter; import static google.registry.request.RequestParameters.extractIntParameter; +import static google.registry.request.RequestParameters.extractOptionalParameter; import static google.registry.request.RequestParameters.extractRequiredParameter; import static google.registry.request.RequestParameters.extractSetOfParameters; @@ -48,7 +49,7 @@ public abstract class DnsModule { public static final String PARAM_DOMAINS = "domains"; public static final String PARAM_HOSTS = "hosts"; public static final String PARAM_PUBLISH_TASK_ENQUEUED = "enqueued"; - public static final String PARAM_REFRESH_REQUEST_CREATED = "itemsCreated"; + public static final String PARAM_REFRESH_REQUEST_TIME = "requestTime"; @Binds @DnsWriterZone @@ -83,10 +84,13 @@ public abstract class DnsModule { return DateTime.parse(extractRequiredParameter(req, PARAM_PUBLISH_TASK_ENQUEUED)); } + // TODO: Retire the old header after DNS pull queue migration. @Provides - @Parameter(PARAM_REFRESH_REQUEST_CREATED) + @Parameter(PARAM_REFRESH_REQUEST_TIME) static DateTime provideItemsCreateTime(HttpServletRequest req) { - return DateTime.parse(extractRequiredParameter(req, PARAM_REFRESH_REQUEST_CREATED)); + return DateTime.parse( + extractOptionalParameter(req, "itemsCreated") + .orElse(extractRequiredParameter(req, PARAM_REFRESH_REQUEST_TIME))); } @Provides diff --git a/core/src/main/java/google/registry/dns/DnsQueue.java b/core/src/main/java/google/registry/dns/DnsQueue.java index 36a39fca5..a54b4f53f 100644 --- a/core/src/main/java/google/registry/dns/DnsQueue.java +++ b/core/src/main/java/google/registry/dns/DnsQueue.java @@ -37,6 +37,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.flogger.FluentLogger; import com.google.common.net.InternetDomainName; import com.google.common.util.concurrent.RateLimiter; +import google.registry.config.RegistryConfig; import google.registry.dns.DnsConstants.TargetType; import google.registry.model.tld.Registries; import google.registry.util.Clock; @@ -54,13 +55,13 @@ import org.joda.time.Duration; *

This includes a {@link RateLimiter} to limit the {@link Queue#leaseTasks} call rate to 9 QPS, * to stay under the 10 QPS limit for this function. * - *

Note that overlapping calls to {@link ReadDnsQueueAction} (the only place where - * {@link DnsQueue#leaseTasks} is used) will have different rate limiters, so they could exceed the - * allowed rate. This should be rare though - because {@link DnsQueue#leaseTasks} is only used in - * {@link ReadDnsQueueAction}, which is run as a cron job with running time shorter than the cron - * repeat time - meaning there should never be two instances running at once. + *

Note that overlapping calls to {@link ReadDnsQueueAction} (the only place where {@link + * DnsQueue#leaseTasks} is used) will have different rate limiters, so they could exceed the allowed + * rate. This should be rare though - because {@link DnsQueue#leaseTasks} is only used in {@link + * ReadDnsQueueAction}, which is run as a cron job with running time shorter than the cron repeat + * time - meaning there should never be two instances running at once. * - * @see google.registry.config.RegistryConfig.ConfigModule#provideReadDnsQueueRuntime + * @see RegistryConfig.ConfigModule#provideReadDnsRefreshRequestsRuntime() */ public class DnsQueue { diff --git a/core/src/main/java/google/registry/dns/DnsUtils.java b/core/src/main/java/google/registry/dns/DnsUtils.java index f14b1950e..eda0ba2a5 100644 --- a/core/src/main/java/google/registry/dns/DnsUtils.java +++ b/core/src/main/java/google/registry/dns/DnsUtils.java @@ -14,15 +14,19 @@ package google.registry.dns; +import static com.google.common.collect.ImmutableList.toImmutableList; import static google.registry.persistence.transaction.TransactionManagerFactory.tm; +import com.google.common.collect.ImmutableList; import com.google.common.net.InternetDomainName; import google.registry.dns.DnsConstants.TargetType; import google.registry.model.common.DatabaseMigrationStateSchedule; import google.registry.model.common.DatabaseMigrationStateSchedule.MigrationState; import google.registry.model.common.DnsRefreshRequest; import google.registry.model.tld.Registries; +import java.util.Collection; import javax.inject.Inject; +import org.joda.time.DateTime; import org.joda.time.Duration; /** Utility class to handle DNS refresh requests. */ @@ -67,6 +71,61 @@ public class DnsUtils { requestDnsRefresh(hostName, TargetType.HOST, Duration.ZERO); } + /** + * Returns pending DNS update requests that need further processing up to batch size, in ascending + * order of their request time, and updates their processing time to now. + * + *

The criteria to pick the requests to include are: + * + *

+ */ + public ImmutableList readAndUpdateRequestsWithLatestProcessTime( + String tld, Duration cooldown, int batchSize) { + return tm().transact( + () -> { + DateTime transactionTime = tm().getTransactionTime(); + ImmutableList requests = + tm().query( + "FROM DnsRefreshRequest WHERE tld = :tld " + + "AND requestTime <= :now AND lastProcessTime < :cutoffTime " + + "ORDER BY requestTime ASC, id ASC", + DnsRefreshRequest.class) + .setParameter("tld", tld) + .setParameter("now", transactionTime) + .setParameter("cutoffTime", transactionTime.minus(cooldown)) + .setMaxResults(batchSize) + .getResultStream() + // Note that the process time is when the request was last read, batched and + // queued up for publishing, not when it is actually published by the DNS + // writer. This timestamp acts as a cooldown so the same request will not be + // retried too frequently. See DnsRefreshRequest.getLastProcessTime for a + // detailed explaination. + .map(e -> e.updateProcessTime(transactionTime)) + .collect(toImmutableList()); + tm().updateAll(requests); + return requests; + }); + } + + /** + * Removes the requests that have been processed. + * + *

Note that if a request entity has already been deleted, the method still succeeds without + * error because all we care about is that it no longer exists after the method runs. + */ + public void deleteRequests(Collection requests) { + tm().transact( + () -> + tm().delete( + requests.stream() + .map(DnsRefreshRequest::createVKey) + .collect(toImmutableList()))); + } + private boolean usePullQueue() { return !DatabaseMigrationStateSchedule.getValueAtTime(dnsQueue.getClock().nowUtc()) .equals(MigrationState.DNS_SQL); diff --git a/core/src/main/java/google/registry/dns/PublishDnsUpdatesAction.java b/core/src/main/java/google/registry/dns/PublishDnsUpdatesAction.java index bb3b7666a..da058a586 100644 --- a/core/src/main/java/google/registry/dns/PublishDnsUpdatesAction.java +++ b/core/src/main/java/google/registry/dns/PublishDnsUpdatesAction.java @@ -22,7 +22,7 @@ import static google.registry.dns.DnsModule.PARAM_HOSTS; import static google.registry.dns.DnsModule.PARAM_LOCK_INDEX; import static google.registry.dns.DnsModule.PARAM_NUM_PUBLISH_LOCKS; import static google.registry.dns.DnsModule.PARAM_PUBLISH_TASK_ENQUEUED; -import static google.registry.dns.DnsModule.PARAM_REFRESH_REQUEST_CREATED; +import static google.registry.dns.DnsModule.PARAM_REFRESH_REQUEST_TIME; import static google.registry.model.EppResourceUtils.loadByForeignKey; import static google.registry.request.Action.Method.POST; import static google.registry.request.RequestParameters.PARAM_TLD; @@ -124,7 +124,7 @@ public final class PublishDnsUpdatesAction implements Runnable, Callable { public PublishDnsUpdatesAction( @Parameter(PARAM_DNS_WRITER) String dnsWriter, @Parameter(PARAM_PUBLISH_TASK_ENQUEUED) DateTime enqueuedTime, - @Parameter(PARAM_REFRESH_REQUEST_CREATED) DateTime itemsCreateTime, + @Parameter(PARAM_REFRESH_REQUEST_TIME) DateTime itemsCreateTime, @Parameter(PARAM_LOCK_INDEX) int lockIndex, @Parameter(PARAM_NUM_PUBLISH_LOCKS) int numPublishLocks, @Parameter(PARAM_DOMAINS) Set domains, @@ -346,7 +346,7 @@ public final class PublishDnsUpdatesAction implements Runnable, Callable { .put(PARAM_LOCK_INDEX, Integer.toString(lockIndex)) .put(PARAM_NUM_PUBLISH_LOCKS, Integer.toString(numPublishLocks)) .put(PARAM_PUBLISH_TASK_ENQUEUED, clock.nowUtc().toString()) - .put(PARAM_REFRESH_REQUEST_CREATED, itemsCreateTime.toString()) + .put(PARAM_REFRESH_REQUEST_TIME, itemsCreateTime.toString()) .put(PARAM_DOMAINS, Joiner.on(",").join(domains)) .put(PARAM_HOSTS, Joiner.on(",").join(hosts)) .build())); diff --git a/core/src/main/java/google/registry/dns/ReadDnsQueueAction.java b/core/src/main/java/google/registry/dns/ReadDnsQueueAction.java index 7a87facfc..9c99880ae 100644 --- a/core/src/main/java/google/registry/dns/ReadDnsQueueAction.java +++ b/core/src/main/java/google/registry/dns/ReadDnsQueueAction.java @@ -26,7 +26,7 @@ import static google.registry.dns.DnsModule.PARAM_HOSTS; import static google.registry.dns.DnsModule.PARAM_LOCK_INDEX; import static google.registry.dns.DnsModule.PARAM_NUM_PUBLISH_LOCKS; import static google.registry.dns.DnsModule.PARAM_PUBLISH_TASK_ENQUEUED; -import static google.registry.dns.DnsModule.PARAM_REFRESH_REQUEST_CREATED; +import static google.registry.dns.DnsModule.PARAM_REFRESH_REQUEST_TIME; import static google.registry.request.RequestParameters.PARAM_TLD; import static google.registry.util.DomainNameUtils.getSecondLevelDomain; import static java.nio.charset.StandardCharsets.UTF_8; @@ -109,7 +109,7 @@ public final class ReadDnsQueueAction implements Runnable { @Inject ReadDnsQueueAction( @Config("dnsTldUpdateBatchSize") int tldUpdateBatchSize, - @Config("readDnsQueueActionRuntime") Duration requestedMaximumDuration, + @Config("readDnsRefreshRequestsActionRuntime") Duration requestedMaximumDuration, @Parameter(PARAM_JITTER_SECONDS) Optional jitterSeconds, Clock clock, DnsQueue dnsQueue, @@ -379,7 +379,7 @@ public final class ReadDnsQueueAction implements Runnable { .put(PARAM_LOCK_INDEX, Integer.toString(lockIndex)) .put(PARAM_NUM_PUBLISH_LOCKS, Integer.toString(numPublishLocks)) .put(PARAM_PUBLISH_TASK_ENQUEUED, clock.nowUtc().toString()) - .put(PARAM_REFRESH_REQUEST_CREATED, earliestCreateTime.toString()) + .put(PARAM_REFRESH_REQUEST_TIME, earliestCreateTime.toString()) .put( PARAM_DOMAINS, chunk.stream() diff --git a/core/src/main/java/google/registry/dns/ReadDnsRefreshRequestsAction.java b/core/src/main/java/google/registry/dns/ReadDnsRefreshRequestsAction.java new file mode 100644 index 000000000..c4b57bd4d --- /dev/null +++ b/core/src/main/java/google/registry/dns/ReadDnsRefreshRequestsAction.java @@ -0,0 +1,206 @@ +// Copyright 2023 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.dns; + +import static com.google.common.collect.ImmutableSetMultimap.toImmutableSetMultimap; +import static google.registry.dns.DnsConstants.DNS_PUBLISH_PUSH_QUEUE_NAME; +import static google.registry.dns.DnsModule.PARAM_DNS_WRITER; +import static google.registry.dns.DnsModule.PARAM_DOMAINS; +import static google.registry.dns.DnsModule.PARAM_HOSTS; +import static google.registry.dns.DnsModule.PARAM_LOCK_INDEX; +import static google.registry.dns.DnsModule.PARAM_NUM_PUBLISH_LOCKS; +import static google.registry.dns.DnsModule.PARAM_PUBLISH_TASK_ENQUEUED; +import static google.registry.dns.DnsModule.PARAM_REFRESH_REQUEST_TIME; +import static google.registry.request.RequestParameters.PARAM_TLD; +import static google.registry.util.DateTimeUtils.END_OF_TIME; +import static google.registry.util.DomainNameUtils.getSecondLevelDomain; +import static java.nio.charset.StandardCharsets.UTF_8; + +import com.google.cloud.tasks.v2.Task; +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMultimap; +import com.google.common.flogger.FluentLogger; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hashing; +import google.registry.batch.CloudTasksUtils; +import google.registry.config.RegistryConfig.Config; +import google.registry.dns.DnsConstants.TargetType; +import google.registry.model.common.DnsRefreshRequest; +import google.registry.model.tld.Registry; +import google.registry.request.Action; +import google.registry.request.Action.Service; +import google.registry.request.Parameter; +import google.registry.request.auth.Auth; +import google.registry.util.Clock; +import java.util.Collection; +import java.util.Optional; +import javax.inject.Inject; +import org.joda.time.DateTime; +import org.joda.time.Duration; + +/** + * Action for fanning out DNS refresh tasks by TLD, using data taken from {@link DnsRefreshRequest} + * table. + */ +@Action( + service = Service.BACKEND, + path = "/_dr/task/readDnsRefreshRequests", + automaticallyPrintOk = true, + auth = Auth.AUTH_INTERNAL_OR_ADMIN) +public final class ReadDnsRefreshRequestsAction implements Runnable { + + // This parameter cannot be named "jitterSeconds", which will be read by TldFanoutAction and not + // be passed to this action. + private static final String PARAM_JITTER_SECONDS = "dnsJitterSeconds"; + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); + + private final int tldUpdateBatchSize; + private final Duration requestedMaximumDuration; + private final Optional jitterSeconds; + private final String tld; + private final Clock clock; + private final DnsUtils dnsUtils; + private final HashFunction hashFunction; + private final CloudTasksUtils cloudTasksUtils; + + @Inject + ReadDnsRefreshRequestsAction( + @Config("dnsTldUpdateBatchSize") int tldUpdateBatchSize, + @Config("readDnsRefreshRequestsActionRuntime") Duration requestedMaximumDuration, + @Parameter(PARAM_JITTER_SECONDS) Optional jitterSeconds, + @Parameter(PARAM_TLD) String tld, + Clock clock, + DnsUtils dnsUtils, + HashFunction hashFunction, + CloudTasksUtils cloudTasksUtils) { + this.tldUpdateBatchSize = tldUpdateBatchSize; + this.requestedMaximumDuration = requestedMaximumDuration; + this.jitterSeconds = jitterSeconds; + this.tld = tld; + this.clock = clock; + this.dnsUtils = dnsUtils; + this.hashFunction = hashFunction; + this.cloudTasksUtils = cloudTasksUtils; + } + + /** + * Reads requests up to the maximum requested runtime, and enqueues update batches from the these + * requests. + */ + @Override + public void run() { + if (Registry.get(tld).getDnsPaused()) { + logger.atInfo().log("The queue updated is paused for TLD: %s.", tld); + return; + } + DateTime requestedEndTime = clock.nowUtc().plus(requestedMaximumDuration); + // See getLockIndex(), requests are evenly distributed to [1, numDnsPublishLocks], so each + // bucket would be roughly the size of tldUpdateBatchSize. + int processBatchSize = tldUpdateBatchSize * Registry.get(tld).getNumDnsPublishLocks(); + while (requestedEndTime.isAfter(clock.nowUtc())) { + ImmutableList requests = + dnsUtils.readAndUpdateRequestsWithLatestProcessTime( + tld, requestedMaximumDuration, processBatchSize); + logger.atInfo().log("Read %d DNS update requests for TLD %s.", requests.size(), tld); + if (!requests.isEmpty()) { + processRequests(requests); + } + if (requests.size() < processBatchSize) { + return; + } + } + } + /** + * Subdivides {@link DnsRefreshRequest} into buckets by lock index, enqueue a Cloud Tasks task per + * bucket, and then delete the requests in each bucket. + */ + void processRequests(Collection requests) { + int numPublishLocks = Registry.get(tld).getNumDnsPublishLocks(); + requests.stream() + .collect( + toImmutableSetMultimap( + request -> getLockIndex(numPublishLocks, request), request -> request)) + .asMap() + .forEach( + (lockIndex, bucketedRequests) -> { + try { + enqueueUpdates(lockIndex, numPublishLocks, bucketedRequests); + dnsUtils.deleteRequests(bucketedRequests); + logger.atInfo().log( + "Processed %d DNS update requests for TLD %s.", bucketedRequests.size(), tld); + } catch (Exception e) { + // Log but continue to process the next bucket. The failed tasks will NOT be + // deleted and will be retried after the cooldown period has passed. + logger.atSevere().withCause(e).log( + "Error processing DNS update requests: %s", bucketedRequests); + } + }); + } + + /** + * Returns the lock index for a given {@link DnsRefreshRequest}. + * + *

We hash the second level domain for all records, to group in-bailiwick hosts (the only ones + * we refresh DNS for) with their superordinate domains. We use consistent hashing to determine + * the lock index because it gives us [0,N) bucketing properties out of the box, then add 1 to + * make indexes within [1,N]. + */ + int getLockIndex(int numPublishLocks, DnsRefreshRequest request) { + String domain = getSecondLevelDomain(request.getName(), tld); + return Hashing.consistentHash(hashFunction.hashString(domain, UTF_8), numPublishLocks) + 1; + } + + /** Creates DNS refresh tasks for all writers for the tld within a lock index. */ + void enqueueUpdates(int lockIndex, int numPublishLocks, Collection requests) { + ImmutableList.Builder domainsBuilder = new ImmutableList.Builder<>(); + ImmutableList.Builder hostsBuilder = new ImmutableList.Builder<>(); + DateTime earliestRequestTime = END_OF_TIME; + for (DnsRefreshRequest request : requests) { + if (request.getRequestTime().isBefore(earliestRequestTime)) { + earliestRequestTime = request.getRequestTime(); + } + String name = request.getName(); + if (request.getType().equals(TargetType.DOMAIN)) { + domainsBuilder.add(name); + } else { + hostsBuilder.add(name); + } + } + ImmutableList domains = domainsBuilder.build(); + ImmutableList hosts = hostsBuilder.build(); + for (String dnsWriter : Registry.get(tld).getDnsWriters()) { + Task task = + cloudTasksUtils.createPostTaskWithJitter( + PublishDnsUpdatesAction.PATH, + Service.BACKEND, + ImmutableMultimap.builder() + .put(PARAM_TLD, tld) + .put(PARAM_DNS_WRITER, dnsWriter) + .put(PARAM_LOCK_INDEX, Integer.toString(lockIndex)) + .put(PARAM_NUM_PUBLISH_LOCKS, Integer.toString(numPublishLocks)) + .put(PARAM_PUBLISH_TASK_ENQUEUED, clock.nowUtc().toString()) + .put(PARAM_REFRESH_REQUEST_TIME, earliestRequestTime.toString()) + .put(PARAM_DOMAINS, Joiner.on(',').join(domains)) + .put(PARAM_HOSTS, Joiner.on(',').join(hosts)) + .build(), + jitterSeconds); + cloudTasksUtils.enqueue(DNS_PUBLISH_PUSH_QUEUE_NAME, task); + logger.atInfo().log( + "Enqueued DNS update request for (TLD %s, lock %d) with %d domains and %d hosts.", + tld, lockIndex, domains.size(), hosts.size()); + } + } +} diff --git a/core/src/main/java/google/registry/env/alpha/default/WEB-INF/cloud-scheduler-tasks.xml b/core/src/main/java/google/registry/env/alpha/default/WEB-INF/cloud-scheduler-tasks.xml index 6fdee14c2..405d14397 100644 --- a/core/src/main/java/google/registry/env/alpha/default/WEB-INF/cloud-scheduler-tasks.xml +++ b/core/src/main/java/google/registry/env/alpha/default/WEB-INF/cloud-scheduler-tasks.xml @@ -137,4 +137,14 @@ */1 * * * * + + + + + readDnsRefreshRequests + + Enqueue a ReadDnsRefreshRequestAction for each TLD. + + */1 * * * * + diff --git a/core/src/main/java/google/registry/env/common/default/WEB-INF/queue.xml b/core/src/main/java/google/registry/env/common/default/WEB-INF/queue.xml index c39ffc327..ff780cff4 100644 --- a/core/src/main/java/google/registry/env/common/default/WEB-INF/queue.xml +++ b/core/src/main/java/google/registry/env/common/default/WEB-INF/queue.xml @@ -6,6 +6,12 @@ pull + + + dns-refresh + 100/s + + dns-publish diff --git a/core/src/main/java/google/registry/env/crash/default/WEB-INF/cloud-scheduler-tasks.xml b/core/src/main/java/google/registry/env/crash/default/WEB-INF/cloud-scheduler-tasks.xml index 6b0d28187..dec54c16b 100644 --- a/core/src/main/java/google/registry/env/crash/default/WEB-INF/cloud-scheduler-tasks.xml +++ b/core/src/main/java/google/registry/env/crash/default/WEB-INF/cloud-scheduler-tasks.xml @@ -139,6 +139,16 @@ */1 * * * * + + + + readDnsRefreshRequests + + Enqueue a ReadDnsRefreshRequestAction for each TLD. + + */1 * * * * + + deleteExpiredDomains diff --git a/core/src/main/java/google/registry/env/production/default/WEB-INF/cloud-scheduler-tasks.xml b/core/src/main/java/google/registry/env/production/default/WEB-INF/cloud-scheduler-tasks.xml index 715443fae..c63e53992 100644 --- a/core/src/main/java/google/registry/env/production/default/WEB-INF/cloud-scheduler-tasks.xml +++ b/core/src/main/java/google/registry/env/production/default/WEB-INF/cloud-scheduler-tasks.xml @@ -210,6 +210,16 @@ */1 * * * * + + + + readDnsRefreshRequests + + Enqueue a ReadDnsRefreshRequestAction for each TLD. + + */1 * * * * + + icannReportingStaging diff --git a/core/src/main/java/google/registry/env/qa/default/WEB-INF/cloud-scheduler-tasks.xml b/core/src/main/java/google/registry/env/qa/default/WEB-INF/cloud-scheduler-tasks.xml index 39db7da50..695bbfc44 100644 --- a/core/src/main/java/google/registry/env/qa/default/WEB-INF/cloud-scheduler-tasks.xml +++ b/core/src/main/java/google/registry/env/qa/default/WEB-INF/cloud-scheduler-tasks.xml @@ -21,6 +21,16 @@ */1 * * * * + + + + readDnsRefreshRequests + + Enqueue a ReadDnsRefreshRequestAction for each TLD. + + */1 * * * * + + syncRegistrarsSheet diff --git a/core/src/main/java/google/registry/env/sandbox/default/WEB-INF/cloud-scheduler-tasks.xml b/core/src/main/java/google/registry/env/sandbox/default/WEB-INF/cloud-scheduler-tasks.xml index 63c9a9ffc..e57d2bd21 100644 --- a/core/src/main/java/google/registry/env/sandbox/default/WEB-INF/cloud-scheduler-tasks.xml +++ b/core/src/main/java/google/registry/env/sandbox/default/WEB-INF/cloud-scheduler-tasks.xml @@ -152,6 +152,16 @@ */1 * * * * + + + + readDnsRefreshRequests + + Enqueue a ReadDnsRefreshRequestAction for each TLD. + + */1 * * * * + + wipeOutContactHistoryPii diff --git a/core/src/main/java/google/registry/model/common/DnsRefreshRequest.java b/core/src/main/java/google/registry/model/common/DnsRefreshRequest.java index 8d38cc311..c5717e757 100644 --- a/core/src/main/java/google/registry/model/common/DnsRefreshRequest.java +++ b/core/src/main/java/google/registry/model/common/DnsRefreshRequest.java @@ -21,6 +21,7 @@ import static google.registry.util.DateTimeUtils.START_OF_TIME; import google.registry.dns.DnsConstants.TargetType; import google.registry.dns.PublishDnsUpdatesAction; import google.registry.model.ImmutableObject; +import google.registry.persistence.VKey; import javax.annotation.Nullable; import javax.persistence.Column; import javax.persistence.Entity; @@ -89,6 +90,11 @@ public class DnsRefreshRequest extends ImmutableObject { return lastProcessTime; } + @Override + public VKey createVKey() { + return VKey.create(DnsRefreshRequest.class, id); + } + protected DnsRefreshRequest() {} private DnsRefreshRequest( diff --git a/core/src/test/java/google/registry/dns/DnsUtilsTest.java b/core/src/test/java/google/registry/dns/DnsUtilsTest.java index 8469cbaa9..778ebc0e9 100644 --- a/core/src/test/java/google/registry/dns/DnsUtilsTest.java +++ b/core/src/test/java/google/registry/dns/DnsUtilsTest.java @@ -14,6 +14,7 @@ package google.registry.dns; +import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.truth.Truth.assertThat; import static google.registry.persistence.transaction.TransactionManagerFactory.tm; import static google.registry.testing.DatabaseHelper.createTld; @@ -26,6 +27,7 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSortedMap; import com.google.common.collect.Iterables; import com.google.common.collect.Ordering; @@ -36,6 +38,7 @@ import google.registry.model.common.DnsRefreshRequest; import google.registry.persistence.transaction.JpaTestExtensions; import google.registry.persistence.transaction.JpaTestExtensions.JpaIntegrationTestExtension; import google.registry.testing.FakeClock; +import java.util.Comparator; import org.joda.time.DateTime; import org.joda.time.Duration; import org.junit.jupiter.api.Assertions; @@ -53,8 +56,7 @@ public class DnsUtilsTest { private final DnsQueue dnsQueue = mock(DnsQueue.class); private final DnsUtils dnsUtils = new DnsUtils(dnsQueue); - - FakeClock clock = new FakeClock(DateTime.parse("2020-02-02T01:23:45Z")); + private final FakeClock clock = new FakeClock(DateTime.parse("2020-02-02T01:23:45Z")); @RegisterExtension JpaIntegrationTestExtension jpa = @@ -138,12 +140,133 @@ public class DnsUtilsTest { assertRequest(request, TargetType.DOMAIN, domainName, tld, clock.nowUtc().plusMinutes(3)); } + @Test + void testSuccess_ProcessRequests() { + ImmutableList requests = processRequests(); + DateTime processtime = clock.nowUtc(); + assertThat(requests.size()).isEqualTo(4); + assertRequest( + requests.get(0), + TargetType.DOMAIN, + "test2.tld", + "tld", + clock.nowUtc().minusMinutes(4), + processtime); + assertRequest( + requests.get(1), + TargetType.DOMAIN, + "test1.tld", + "tld", + clock.nowUtc().minusMinutes(3), + processtime); + assertRequest( + requests.get(2), + TargetType.HOST, + "ns1.test2.tld", + "tld", + clock.nowUtc().minusMinutes(1), + processtime); + assertRequest( + requests.get(3), + TargetType.DOMAIN, + "test5.tld", + "tld", + clock.nowUtc().minusMinutes(1), + processtime); + requests = loadAllOf(DnsRefreshRequest.class); + assertThat(requests.size()).isEqualTo(7); + // The four processed records should have updated process time in SQL as well. + assertThat(requests.stream().filter(e -> e.getLastProcessTime().equals(processtime)).count()) + .isEqualTo(4); + clock.advanceOneMilli(); + + // Requests within cooldown period not included. + requests = + dnsUtils.readAndUpdateRequestsWithLatestProcessTime("tld", Duration.standardMinutes(1), 4); + assertThat(requests.size()).isEqualTo(1); + assertRequest( + requests.get(0), + TargetType.DOMAIN, + "test6.tld", + "tld", + clock.nowUtc().minusMinutes(1).minusMillis(1), + clock.nowUtc()); + } + + @Test + void testSuccess_deleteRequests() { + dnsUtils.deleteRequests(processRequests()); + ImmutableList remainingRequests = + loadAllOf(DnsRefreshRequest.class).stream() + .sorted(Comparator.comparing(DnsRefreshRequest::getRequestTime)) + .collect(toImmutableList()); + assertThat(remainingRequests.size()).isEqualTo(3); + assertRequest( + remainingRequests.get(0), + TargetType.DOMAIN, + "something.example", + "example", + clock.nowUtc().minusMinutes(2)); + assertRequest( + remainingRequests.get(1), + TargetType.DOMAIN, + "test6.tld", + "tld", + clock.nowUtc().minusMinutes(1)); + assertRequest( + remainingRequests.get(2), + TargetType.DOMAIN, + "test4.tld", + "tld", + clock.nowUtc().plusMinutes(1)); + tm().transact(() -> tm().delete(remainingRequests.get(2))); + assertThat(loadAllOf(DnsRefreshRequest.class).size()).isEqualTo(2); + // Should not throw even though one of the request is already deleted. + dnsUtils.deleteRequests(remainingRequests); + assertThat(loadAllOf(DnsRefreshRequest.class).size()).isEqualTo(0); + } + + private ImmutableList processRequests() { + useDnsSql(); + createTld("example"); + // Domain Included. + dnsUtils.requestDomainDnsRefresh("test1.tld", Duration.standardMinutes(1)); + // This one should be returned before test1.tld, even though it's added later, because of + // the delay specified in test1.tld. + dnsUtils.requestDomainDnsRefresh("test2.tld"); + // Not included because the TLD is not under management. + dnsUtils.requestDomainDnsRefresh("something.example", Duration.standardMinutes(2)); + clock.advanceBy(Duration.standardMinutes(3)); + // Host included. + dnsUtils.requestHostDnsRefresh("ns1.test2.tld"); + // Not included because the request time is in the future + dnsUtils.requestDomainDnsRefresh("test4.tld", Duration.standardMinutes(2)); + // Included after the previous one. Same request time, order by insertion order (i.e. ID); + dnsUtils.requestDomainDnsRefresh("test5.tld"); + // Not included because batch size is exceeded; + dnsUtils.requestDomainDnsRefresh("test6.tld"); + clock.advanceBy(Duration.standardMinutes(1)); + return dnsUtils.readAndUpdateRequestsWithLatestProcessTime( + "tld", Duration.standardMinutes(1), 4); + } + private static void assertRequest( DnsRefreshRequest request, TargetType type, String name, String tld, DateTime requestTime) { + assertRequest(request, type, name, tld, requestTime, START_OF_TIME); + } + + private static void assertRequest( + DnsRefreshRequest request, + TargetType type, + String name, + String tld, + DateTime requestTime, + DateTime processTime) { assertThat(request.getType()).isEqualTo(type); assertThat(request.getName()).isEqualTo(name); assertThat(request.getTld()).isEqualTo(tld); assertThat(request.getRequestTime()).isEqualTo(requestTime); + assertThat(request.getLastProcessTime()).isEqualTo(processTime); } private void useDnsSql() { diff --git a/core/src/test/java/google/registry/dns/PublishDnsUpdatesActionTest.java b/core/src/test/java/google/registry/dns/PublishDnsUpdatesActionTest.java index ba20118ac..b40170888 100644 --- a/core/src/test/java/google/registry/dns/PublishDnsUpdatesActionTest.java +++ b/core/src/test/java/google/registry/dns/PublishDnsUpdatesActionTest.java @@ -22,7 +22,7 @@ import static google.registry.dns.DnsModule.PARAM_HOSTS; import static google.registry.dns.DnsModule.PARAM_LOCK_INDEX; import static google.registry.dns.DnsModule.PARAM_NUM_PUBLISH_LOCKS; import static google.registry.dns.DnsModule.PARAM_PUBLISH_TASK_ENQUEUED; -import static google.registry.dns.DnsModule.PARAM_REFRESH_REQUEST_CREATED; +import static google.registry.dns.DnsModule.PARAM_REFRESH_REQUEST_TIME; import static google.registry.dns.PublishDnsUpdatesAction.RETRIES_BEFORE_PERMANENT_FAILURE; import static google.registry.request.RequestParameters.PARAM_TLD; import static google.registry.testing.DatabaseHelper.createTld; @@ -302,7 +302,7 @@ public class PublishDnsUpdatesActionTest { .param(PARAM_LOCK_INDEX, "1") .param(PARAM_NUM_PUBLISH_LOCKS, "1") .param(PARAM_PUBLISH_TASK_ENQUEUED, clock.nowUtc().toString()) - .param(PARAM_REFRESH_REQUEST_CREATED, clock.nowUtc().minusHours(2).toString()) + .param(PARAM_REFRESH_REQUEST_TIME, clock.nowUtc().minusHours(2).toString()) .param(PARAM_DOMAINS, "example1.xn--q9jyb4c,example2.xn--q9jyb4c") .param(PARAM_HOSTS, "") .header("content-type", "application/x-www-form-urlencoded"), @@ -313,7 +313,7 @@ public class PublishDnsUpdatesActionTest { .param(PARAM_LOCK_INDEX, "1") .param(PARAM_NUM_PUBLISH_LOCKS, "1") .param(PARAM_PUBLISH_TASK_ENQUEUED, clock.nowUtc().toString()) - .param(PARAM_REFRESH_REQUEST_CREATED, clock.nowUtc().minusHours(2).toString()) + .param(PARAM_REFRESH_REQUEST_TIME, clock.nowUtc().minusHours(2).toString()) .param(PARAM_DOMAINS, "example3.xn--q9jyb4c,example4.xn--q9jyb4c") .param(PARAM_HOSTS, "ns1.example.xn--q9jyb4c") .header("content-type", "application/x-www-form-urlencoded")); @@ -341,7 +341,7 @@ public class PublishDnsUpdatesActionTest { .param(PARAM_LOCK_INDEX, "1") .param(PARAM_NUM_PUBLISH_LOCKS, "1") .param(PARAM_PUBLISH_TASK_ENQUEUED, clock.nowUtc().toString()) - .param(PARAM_REFRESH_REQUEST_CREATED, clock.nowUtc().minusHours(2).toString()) + .param(PARAM_REFRESH_REQUEST_TIME, clock.nowUtc().minusHours(2).toString()) .param(PARAM_DOMAINS, "example1.xn--q9jyb4c,example2.xn--q9jyb4c") .param(PARAM_HOSTS, "") .header("content-type", "application/x-www-form-urlencoded"), @@ -352,7 +352,7 @@ public class PublishDnsUpdatesActionTest { .param(PARAM_LOCK_INDEX, "1") .param(PARAM_NUM_PUBLISH_LOCKS, "1") .param(PARAM_PUBLISH_TASK_ENQUEUED, clock.nowUtc().toString()) - .param(PARAM_REFRESH_REQUEST_CREATED, clock.nowUtc().minusHours(2).toString()) + .param(PARAM_REFRESH_REQUEST_TIME, clock.nowUtc().minusHours(2).toString()) .param(PARAM_DOMAINS, "example3.xn--q9jyb4c,example4.xn--q9jyb4c,example5.xn--q9jyb4c") .param(PARAM_HOSTS, "ns1.example.xn--q9jyb4c") .header("content-type", "application/x-www-form-urlencoded")); @@ -378,7 +378,7 @@ public class PublishDnsUpdatesActionTest { .param(PARAM_LOCK_INDEX, "1") .param(PARAM_NUM_PUBLISH_LOCKS, "1") .param(PARAM_PUBLISH_TASK_ENQUEUED, clock.nowUtc().toString()) - .param(PARAM_REFRESH_REQUEST_CREATED, clock.nowUtc().minusHours(2).toString()) + .param(PARAM_REFRESH_REQUEST_TIME, clock.nowUtc().minusHours(2).toString()) .param(PARAM_DOMAINS, "example1.xn--q9jyb4c") .param(PARAM_HOSTS, "") .header("content-type", "application/x-www-form-urlencoded"), @@ -389,7 +389,7 @@ public class PublishDnsUpdatesActionTest { .param(PARAM_LOCK_INDEX, "1") .param(PARAM_NUM_PUBLISH_LOCKS, "1") .param(PARAM_PUBLISH_TASK_ENQUEUED, clock.nowUtc().toString()) - .param(PARAM_REFRESH_REQUEST_CREATED, clock.nowUtc().minusHours(2).toString()) + .param(PARAM_REFRESH_REQUEST_TIME, clock.nowUtc().minusHours(2).toString()) .param(PARAM_DOMAINS, "") .param(PARAM_HOSTS, "ns1.example.xn--q9jyb4c") .header("content-type", "application/x-www-form-urlencoded")); diff --git a/core/src/test/java/google/registry/dns/ReadDnsQueueActionTest.java b/core/src/test/java/google/registry/dns/ReadDnsQueueActionTest.java index 1fe4aaaab..9bcf6d3cb 100644 --- a/core/src/test/java/google/registry/dns/ReadDnsQueueActionTest.java +++ b/core/src/test/java/google/registry/dns/ReadDnsQueueActionTest.java @@ -144,7 +144,7 @@ public class ReadDnsQueueActionTest { .url(PublishDnsUpdatesAction.PATH) .param("tld", tldToDnsWriter.getKey()) .param("dnsWriter", tldToDnsWriter.getValue()) - .param("itemsCreated", "3000-01-01T00:00:00.000Z") + .param("requestTime", "3000-01-01T00:00:00.000Z") .param("enqueued", "3000-01-01T01:00:00.000Z") // Single-lock TLDs should use lock 1 of 1 by default .param("lockIndex", "1") @@ -245,7 +245,7 @@ public class ReadDnsQueueActionTest { DNS_PUBLISH_PUSH_QUEUE_NAME, new TaskMatcher() .param("enqueued", "3000-02-05T01:00:00.000Z") - .param("itemsCreated", "3000-02-03T00:00:00.000Z") + .param("requestTime", "3000-02-03T00:00:00.000Z") .param("tld", "com") .param("dnsWriter", "comWriter") .param("domains", "domain1.com,domain2.com,domain3.com") @@ -484,7 +484,7 @@ public class ReadDnsQueueActionTest { .url(PublishDnsUpdatesAction.PATH) .param("tld", "multilock.uk") .param("dnsWriter", "multilockWriter") - .param("itemsCreated", "3000-01-01T00:00:00.000Z") + .param("requestTime", "3000-01-01T00:00:00.000Z") .param("enqueued", "3000-01-01T01:00:00.000Z") .param("domains", "hello.multilock.uk") .param("hosts", "ns1.abc.hello.multilock.uk,ns2.hello.multilock.uk") @@ -493,7 +493,7 @@ public class ReadDnsQueueActionTest { .url(PublishDnsUpdatesAction.PATH) .param("tld", "multilock.uk") .param("dnsWriter", "multilockWriter") - .param("itemsCreated", "3000-01-01T00:00:00.000Z") + .param("requestTime", "3000-01-01T00:00:00.000Z") .param("enqueued", "3000-01-01T01:00:00.000Z") .param("domains", "another.multilock.uk") .param("hosts", "ns3.def.another.multilock.uk,ns4.another.multilock.uk") diff --git a/core/src/test/java/google/registry/dns/ReadDnsRefreshRequestsActionTest.java b/core/src/test/java/google/registry/dns/ReadDnsRefreshRequestsActionTest.java new file mode 100644 index 000000000..6d0b0a71b --- /dev/null +++ b/core/src/test/java/google/registry/dns/ReadDnsRefreshRequestsActionTest.java @@ -0,0 +1,296 @@ +// Copyright 2017 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.dns; + +import static com.google.common.truth.Truth.assertThat; +import static google.registry.model.common.DatabaseMigrationStateSchedule.MigrationState; +import static google.registry.model.common.DatabaseMigrationStateSchedule.set; +import static google.registry.model.common.DatabaseMigrationStateSchedule.useUncachedForTest; +import static google.registry.persistence.transaction.TransactionManagerFactory.tm; +import static google.registry.testing.DatabaseHelper.createTld; +import static google.registry.testing.DatabaseHelper.loadAllOf; +import static google.registry.testing.DatabaseHelper.persistResource; +import static google.registry.testing.DatabaseHelper.persistResources; +import static google.registry.util.DateTimeUtils.START_OF_TIME; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyCollection; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ImmutableSortedMap; +import com.google.common.collect.Ordering; +import google.registry.dns.DnsConstants.TargetType; +import google.registry.model.common.DnsRefreshRequest; +import google.registry.model.common.DnsRefreshRequestTest; +import google.registry.model.tld.Registry; +import google.registry.persistence.transaction.JpaTestExtensions; +import google.registry.persistence.transaction.JpaTestExtensions.JpaIntegrationTestExtension; +import google.registry.testing.CloudTasksHelper; +import google.registry.testing.CloudTasksHelper.TaskMatcher; +import google.registry.testing.FakeClock; +import java.util.Collection; +import java.util.Optional; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.joda.time.Duration; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.mockito.ArgumentCaptor; + +/** Unit tests for {@link DnsRefreshRequestTest}. */ +public class ReadDnsRefreshRequestsActionTest { + + private final FakeClock clock = new FakeClock(DateTime.parse("2020-02-02T01:23:45Z")); + private final CloudTasksHelper cloudTasksHelper = new CloudTasksHelper(clock); + private final DnsUtils dnsUtils = new DnsUtils(null); + private final Optional jitterSeconds = Optional.of(5); + + @RegisterExtension + JpaIntegrationTestExtension jpa = + new JpaTestExtensions.Builder().withClock(clock).buildIntegrationTestExtension(); + + private final ReadDnsRefreshRequestsAction action = + spy( + new ReadDnsRefreshRequestsAction( + 2, + Duration.standardSeconds(10), + jitterSeconds, + "tld", + clock, + dnsUtils, + null, + cloudTasksHelper.getTestCloudTasksUtils())); + + private ImmutableList requests; + + @BeforeAll + static void beforeAll() { + useUncachedForTest(); + } + + @BeforeEach + void beforeEach() { + useDnsSql(); + persistResource( + createTld("tld") + .asBuilder() + .setDnsWriters(ImmutableSet.of("FooWriter", "BarWriter")) + .setNumDnsPublishLocks(2) + .build()); + requests = + new ImmutableList.Builder() + .add(new DnsRefreshRequest(TargetType.DOMAIN, "domain.tld", "tld", clock.nowUtc())) + .add( + new DnsRefreshRequest( + TargetType.HOST, "ns1.domain.tld", "tld", clock.nowUtc().minusMinutes(1))) + .add( + new DnsRefreshRequest( + TargetType.DOMAIN, "future.tld", "tld", clock.nowUtc().plusMinutes(1))) + .build(); + clock.advanceBy(Duration.standardMinutes(5)); + persistResources(requests); + requests = loadAllOf(DnsRefreshRequest.class); + } + + @Test + void testSuccess_runAction_pausedTld() { + persistResource(createTld("tld").asBuilder().setDnsPaused(true).build()); + action.run(); + verify(action, never()).enqueueUpdates(anyInt(), anyInt(), anyCollection()); + verify(action, never()).processRequests(anyCollection()); + } + + @Test + void testSuccess_runAction_requestTimeInTheFuture() { + clock.setTo(DateTime.parse("2000-01-01T00:00:00Z")); + action.run(); + verify(action, never()).enqueueUpdates(anyInt(), anyInt(), anyCollection()); + verify(action, never()).processRequests(anyCollection()); + } + + @Test + void testSuccess_runAction_oneBatch() { + // The read batch size is 2 * 2 = 4. All requests will be in the same bucket, even though the + // bucket size should be roughly 2. But this is as expected because getLockIndex() only evenly + // distributes tasks in a statistical sense. + doReturn(2).when(action).getLockIndex(anyInt(), any(DnsRefreshRequest.class)); + action.run(); + @SuppressWarnings("unchecked") + ArgumentCaptor> captor = + ArgumentCaptor.forClass(Collection.class); + verify(action, times(1)).enqueueUpdates(eq(2), eq(2), captor.capture()); + assertThat(captor.getValue().size()).isEqualTo(3); + verify(action, times(1)).processRequests(captor.capture()); + assertThat(captor.getValue().size()).isEqualTo(3); + assertThat(loadAllOf(DnsRefreshRequest.class).isEmpty()).isTrue(); + } + + @Test + void testSuccess_runAction_twoBatches() { + // Make the read batch size 2 * 1 = 2. + persistResource(Registry.get("tld").asBuilder().setNumDnsPublishLocks(1).build()); + doReturn(1).when(action).getLockIndex(anyInt(), any(DnsRefreshRequest.class)); + doAnswer( + invocation -> { + @SuppressWarnings("unchecked") + ImmutableList ans = + (ImmutableList) invocation.callRealMethod(); + // The next read should not time out as we only increment by one millisecond, whereas + // the timeout is set to 10 seconds. + clock.advanceOneMilli(); + return ans; + }) + .when(action) + .processRequests(anyCollection()); + action.run(); + @SuppressWarnings("unchecked") + ArgumentCaptor> captor = + ArgumentCaptor.forClass(Collection.class); + verify(action, times(2)).enqueueUpdates(eq(1), eq(1), captor.capture()); + assertThat(captor.getAllValues().get(0).size()).isEqualTo(2); + assertThat(captor.getAllValues().get(1).size()).isEqualTo(1); + verify(action, times(2)).processRequests(captor.capture()); + assertThat(captor.getAllValues().get(0).size()).isEqualTo(2); + assertThat(captor.getAllValues().get(1).size()).isEqualTo(1); + assertThat(loadAllOf(DnsRefreshRequest.class).isEmpty()).isTrue(); + } + + @Test + void testSuccess_runAction_timeOutAfterFirstRead() { + // Make the process batch size 2 * 1 = 2. + persistResource(Registry.get("tld").asBuilder().setNumDnsPublishLocks(1).build()); + // Both requests in the first batch will be bucketed to the same bucket. + doReturn(1).when(action).getLockIndex(anyInt(), any(DnsRefreshRequest.class)); + doAnswer( + invocation -> { + @SuppressWarnings("unchecked") + ImmutableList ans = + (ImmutableList) invocation.callRealMethod(); + // After this function is called once, the loop in run() should top when it checks + // if the current time is before the request end time. + clock.advanceBy(Duration.standardHours(1)); + return ans; + }) + .when(action) + .processRequests(anyCollection()); + action.run(); + verify(action, times(1)).enqueueUpdates(anyInt(), anyInt(), anyCollection()); + verify(action, times(1)).processRequests(anyCollection()); + // The third request is left untouched because it is not read; + ImmutableList remainingRequests = loadAllOf(DnsRefreshRequest.class); + assertThat(remainingRequests.size()).isEqualTo(1); + assertThat(remainingRequests.get(0).getLastProcessTime()).isEqualTo(START_OF_TIME); + } + + @Test + void testSuccess_processTasks() { + doReturn(2) + .doReturn(1) + .doReturn(2) + .when(action) + .getLockIndex(eq(2), any(DnsRefreshRequest.class)); + action.processRequests(requests); + verify(action).enqueueUpdates(2, 2, ImmutableSet.of(requests.get(0), requests.get(2))); + verify(action).enqueueUpdates(1, 2, ImmutableSet.of(requests.get(1))); + assertThat(loadAllOf(DnsRefreshRequest.class)).isEmpty(); + } + + @Test + void testSuccess_processTasks_enqueueFailed_tasksNotDeleted() { + doReturn(2) + .doReturn(1) + .doReturn(2) + .when(action) + .getLockIndex(eq(2), any(DnsRefreshRequest.class)); + doThrow(new RuntimeException("Something went wrong!")) + .when(action) + .enqueueUpdates(eq(2), eq(2), anyCollection()); + action.processRequests(requests); + verify(action).enqueueUpdates(2, 2, ImmutableSet.of(requests.get(0), requests.get(2))); + verify(action).enqueueUpdates(1, 2, ImmutableSet.of(requests.get(1))); + assertThat(loadAllOf(DnsRefreshRequest.class).size()).isEqualTo(2); + } + + @Test + void testSuccess_enqueueTasks() { + action.enqueueUpdates(2, 3, requests); + cloudTasksHelper.assertTasksEnqueued( + "dns-publish", + new TaskMatcher() + .url("/_dr/task/publishDnsUpdates") + .service("BACKEND") + .param("tld", "tld") + .param("dnsWriter", "FooWriter") + .param("lockIndex", "2") + .param("numPublishLocks", "3") + .param("enqueued", clock.nowUtc().toString()) + .param("requestTime", clock.nowUtc().minusMinutes(6).toString()) + .param("domains", "domain.tld,future.tld") + .param("hosts", "ns1.domain.tld"), + new TaskMatcher() + .url("/_dr/task/publishDnsUpdates") + .service("BACKEND") + .param("tld", "tld") + .param("dnsWriter", "BarWriter") + .param("lockIndex", "2") + .param("numPublishLocks", "3") + .param("enqueued", clock.nowUtc().toString()) + .param("requestTime", clock.nowUtc().minusMinutes(6).toString()) + .param("domains", "domain.tld,future.tld") + .param("hosts", "ns1.domain.tld")); + cloudTasksHelper + .getTestTasksFor("dns-publish") + .forEach( + task -> { + DateTime scheduledTime = + new DateTime(task.getScheduleTime().getSeconds() * 1000, DateTimeZone.UTC); + assertThat(new Duration(clock.nowUtc(), scheduledTime)) + .isAtMost(Duration.standardSeconds(jitterSeconds.get())); + }); + } + + private void useDnsSql() { + DateTime currentTime = clock.nowUtc(); + clock.setTo(START_OF_TIME); + tm().transact( + () -> + set( + new ImmutableSortedMap.Builder(Ordering.natural()) + .put(START_OF_TIME, MigrationState.DATASTORE_ONLY) + .put(START_OF_TIME.plusMillis(1), MigrationState.DATASTORE_PRIMARY) + .put(START_OF_TIME.plusMillis(2), MigrationState.DATASTORE_PRIMARY_NO_ASYNC) + .put( + START_OF_TIME.plusMillis(3), MigrationState.DATASTORE_PRIMARY_READ_ONLY) + .put(START_OF_TIME.plusMillis(4), MigrationState.SQL_PRIMARY_READ_ONLY) + .put(START_OF_TIME.plusMillis(5), MigrationState.SQL_PRIMARY) + .put(START_OF_TIME.plusMillis(6), MigrationState.SQL_ONLY) + .put(START_OF_TIME.plusMillis(7), MigrationState.SEQUENCE_BASED_ALLOCATE_ID) + .put(START_OF_TIME.plusMillis(8), MigrationState.NORDN_SQL) + .put(START_OF_TIME.plusMillis(9), MigrationState.DNS_SQL) + .build())); + clock.setTo(currentTime); + } +}