Add SQL-based DNS refresh processing mechanism (#1971)

This commit is contained in:
Lai Jiang 2023-04-07 17:31:28 -04:00 committed by GitHub
parent a72f408366
commit d02b617e0f
18 changed files with 801 additions and 41 deletions

View file

@ -32,6 +32,8 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedMap;
import dagger.Module;
import dagger.Provides;
import google.registry.dns.ReadDnsRefreshRequestsAction;
import google.registry.model.common.DnsRefreshRequest;
import google.registry.persistence.transaction.JpaTransactionManager;
import google.registry.util.YamlUtils;
import java.lang.annotation.Documented;
@ -294,9 +296,9 @@ public final class RegistryConfig {
/**
* The maximum number of domain and host updates to batch together to send to
* PublishDnsUpdatesAction, to avoid exceeding AppEngine's limits.
* PublishDnsUpdatesAction, to avoid exceeding HTTP request timeout limits.
*
* @see google.registry.dns.ReadDnsQueueAction
* @see google.registry.dns.ReadDnsRefreshRequestsAction
*/
@Provides
@Config("dnsTldUpdateBatchSize")
@ -327,23 +329,30 @@ public final class RegistryConfig {
}
/**
* The requested maximum duration for ReadDnsQueueAction.
* The requested maximum duration for {@link ReadDnsRefreshRequestsAction}.
*
* <p>ReadDnsQueueAction reads update tasks from the dns-pull queue. It will continue reading
* tasks until either the queue is empty, or this duration has passed.
* <p>{@link ReadDnsRefreshRequestsAction} reads refresh requests from {@link DnsRefreshRequest}
* It will continue reading requests until either no requests exist that matche the condition,
* or this duration has passed.
*
* <p>This time is the maximum duration between the first and last attempt to lease tasks from
* the dns-pull queue. The actual running time might be slightly longer, as we process the
* tasks.
* <p>This time is the maximum duration between the first and last attempt to read requests from
* {@link DnsRefreshRequest}. The actual running time might be slightly longer, as we process
* the requests.
*
* <p>This value should be less than the cron-job repeat rate for ReadDnsQueueAction, to make
* sure we don't have multiple ReadDnsActions leasing tasks simultaneously.
* <p>The requests that are read will not be read again by any action until after this period
* has passed, so concurrent runs (or runs that are very close to each other) of {@link
* ReadDnsRefreshRequestsAction} will not keep reading the same requests with the earliest
* request time.
*
* @see google.registry.dns.ReadDnsQueueAction
* <p>Still, this value should ideally be less than the cloud scheduler job repeat rate for
* {@link ReadDnsRefreshRequestsAction}, to not waste resources on multiple actions running at
* the same time.
*
* <p>see google.registry.dns.ReadDnsRefreshRequestsAction
*/
@Provides
@Config("readDnsQueueActionRuntime")
public static Duration provideReadDnsQueueRuntime() {
@Config("readDnsRefreshRequestsActionRuntime")
public static Duration provideReadDnsRefreshRequestsRuntime() {
return Duration.standardSeconds(45);
}

View file

@ -19,6 +19,7 @@ import static google.registry.dns.DnsConstants.DNS_PULL_QUEUE_NAME;
import static google.registry.dns.RefreshDnsOnHostRenameAction.PARAM_HOST_KEY;
import static google.registry.request.RequestParameters.extractEnumParameter;
import static google.registry.request.RequestParameters.extractIntParameter;
import static google.registry.request.RequestParameters.extractOptionalParameter;
import static google.registry.request.RequestParameters.extractRequiredParameter;
import static google.registry.request.RequestParameters.extractSetOfParameters;
@ -48,7 +49,7 @@ public abstract class DnsModule {
public static final String PARAM_DOMAINS = "domains";
public static final String PARAM_HOSTS = "hosts";
public static final String PARAM_PUBLISH_TASK_ENQUEUED = "enqueued";
public static final String PARAM_REFRESH_REQUEST_CREATED = "itemsCreated";
public static final String PARAM_REFRESH_REQUEST_TIME = "requestTime";
@Binds
@DnsWriterZone
@ -83,10 +84,13 @@ public abstract class DnsModule {
return DateTime.parse(extractRequiredParameter(req, PARAM_PUBLISH_TASK_ENQUEUED));
}
// TODO: Retire the old header after DNS pull queue migration.
@Provides
@Parameter(PARAM_REFRESH_REQUEST_CREATED)
@Parameter(PARAM_REFRESH_REQUEST_TIME)
static DateTime provideItemsCreateTime(HttpServletRequest req) {
return DateTime.parse(extractRequiredParameter(req, PARAM_REFRESH_REQUEST_CREATED));
return DateTime.parse(
extractOptionalParameter(req, "itemsCreated")
.orElse(extractRequiredParameter(req, PARAM_REFRESH_REQUEST_TIME)));
}
@Provides

View file

@ -37,6 +37,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.flogger.FluentLogger;
import com.google.common.net.InternetDomainName;
import com.google.common.util.concurrent.RateLimiter;
import google.registry.config.RegistryConfig;
import google.registry.dns.DnsConstants.TargetType;
import google.registry.model.tld.Registries;
import google.registry.util.Clock;
@ -54,13 +55,13 @@ import org.joda.time.Duration;
* <p>This includes a {@link RateLimiter} to limit the {@link Queue#leaseTasks} call rate to 9 QPS,
* to stay under the 10 QPS limit for this function.
*
* <p>Note that overlapping calls to {@link ReadDnsQueueAction} (the only place where
* {@link DnsQueue#leaseTasks} is used) will have different rate limiters, so they could exceed the
* allowed rate. This should be rare though - because {@link DnsQueue#leaseTasks} is only used in
* {@link ReadDnsQueueAction}, which is run as a cron job with running time shorter than the cron
* repeat time - meaning there should never be two instances running at once.
* <p>Note that overlapping calls to {@link ReadDnsQueueAction} (the only place where {@link
* DnsQueue#leaseTasks} is used) will have different rate limiters, so they could exceed the allowed
* rate. This should be rare though - because {@link DnsQueue#leaseTasks} is only used in {@link
* ReadDnsQueueAction}, which is run as a cron job with running time shorter than the cron repeat
* time - meaning there should never be two instances running at once.
*
* @see google.registry.config.RegistryConfig.ConfigModule#provideReadDnsQueueRuntime
* @see RegistryConfig.ConfigModule#provideReadDnsRefreshRequestsRuntime()
*/
public class DnsQueue {

View file

@ -14,15 +14,19 @@
package google.registry.dns;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import com.google.common.collect.ImmutableList;
import com.google.common.net.InternetDomainName;
import google.registry.dns.DnsConstants.TargetType;
import google.registry.model.common.DatabaseMigrationStateSchedule;
import google.registry.model.common.DatabaseMigrationStateSchedule.MigrationState;
import google.registry.model.common.DnsRefreshRequest;
import google.registry.model.tld.Registries;
import java.util.Collection;
import javax.inject.Inject;
import org.joda.time.DateTime;
import org.joda.time.Duration;
/** Utility class to handle DNS refresh requests. */
@ -67,6 +71,61 @@ public class DnsUtils {
requestDnsRefresh(hostName, TargetType.HOST, Duration.ZERO);
}
/**
* Returns pending DNS update requests that need further processing up to batch size, in ascending
* order of their request time, and updates their processing time to now.
*
* <p>The criteria to pick the requests to include are:
*
* <ul>
* <li>They are for the given TLD.
* <li>Their request time is not in the future.
* <li>The last time they were processed is before the cooldown period.
* </ul>
*/
public ImmutableList<DnsRefreshRequest> readAndUpdateRequestsWithLatestProcessTime(
String tld, Duration cooldown, int batchSize) {
return tm().transact(
() -> {
DateTime transactionTime = tm().getTransactionTime();
ImmutableList<DnsRefreshRequest> requests =
tm().query(
"FROM DnsRefreshRequest WHERE tld = :tld "
+ "AND requestTime <= :now AND lastProcessTime < :cutoffTime "
+ "ORDER BY requestTime ASC, id ASC",
DnsRefreshRequest.class)
.setParameter("tld", tld)
.setParameter("now", transactionTime)
.setParameter("cutoffTime", transactionTime.minus(cooldown))
.setMaxResults(batchSize)
.getResultStream()
// Note that the process time is when the request was last read, batched and
// queued up for publishing, not when it is actually published by the DNS
// writer. This timestamp acts as a cooldown so the same request will not be
// retried too frequently. See DnsRefreshRequest.getLastProcessTime for a
// detailed explaination.
.map(e -> e.updateProcessTime(transactionTime))
.collect(toImmutableList());
tm().updateAll(requests);
return requests;
});
}
/**
* Removes the requests that have been processed.
*
* <p>Note that if a request entity has already been deleted, the method still succeeds without
* error because all we care about is that it no longer exists after the method runs.
*/
public void deleteRequests(Collection<DnsRefreshRequest> requests) {
tm().transact(
() ->
tm().delete(
requests.stream()
.map(DnsRefreshRequest::createVKey)
.collect(toImmutableList())));
}
private boolean usePullQueue() {
return !DatabaseMigrationStateSchedule.getValueAtTime(dnsQueue.getClock().nowUtc())
.equals(MigrationState.DNS_SQL);

View file

@ -22,7 +22,7 @@ import static google.registry.dns.DnsModule.PARAM_HOSTS;
import static google.registry.dns.DnsModule.PARAM_LOCK_INDEX;
import static google.registry.dns.DnsModule.PARAM_NUM_PUBLISH_LOCKS;
import static google.registry.dns.DnsModule.PARAM_PUBLISH_TASK_ENQUEUED;
import static google.registry.dns.DnsModule.PARAM_REFRESH_REQUEST_CREATED;
import static google.registry.dns.DnsModule.PARAM_REFRESH_REQUEST_TIME;
import static google.registry.model.EppResourceUtils.loadByForeignKey;
import static google.registry.request.Action.Method.POST;
import static google.registry.request.RequestParameters.PARAM_TLD;
@ -124,7 +124,7 @@ public final class PublishDnsUpdatesAction implements Runnable, Callable<Void> {
public PublishDnsUpdatesAction(
@Parameter(PARAM_DNS_WRITER) String dnsWriter,
@Parameter(PARAM_PUBLISH_TASK_ENQUEUED) DateTime enqueuedTime,
@Parameter(PARAM_REFRESH_REQUEST_CREATED) DateTime itemsCreateTime,
@Parameter(PARAM_REFRESH_REQUEST_TIME) DateTime itemsCreateTime,
@Parameter(PARAM_LOCK_INDEX) int lockIndex,
@Parameter(PARAM_NUM_PUBLISH_LOCKS) int numPublishLocks,
@Parameter(PARAM_DOMAINS) Set<String> domains,
@ -346,7 +346,7 @@ public final class PublishDnsUpdatesAction implements Runnable, Callable<Void> {
.put(PARAM_LOCK_INDEX, Integer.toString(lockIndex))
.put(PARAM_NUM_PUBLISH_LOCKS, Integer.toString(numPublishLocks))
.put(PARAM_PUBLISH_TASK_ENQUEUED, clock.nowUtc().toString())
.put(PARAM_REFRESH_REQUEST_CREATED, itemsCreateTime.toString())
.put(PARAM_REFRESH_REQUEST_TIME, itemsCreateTime.toString())
.put(PARAM_DOMAINS, Joiner.on(",").join(domains))
.put(PARAM_HOSTS, Joiner.on(",").join(hosts))
.build()));

View file

@ -26,7 +26,7 @@ import static google.registry.dns.DnsModule.PARAM_HOSTS;
import static google.registry.dns.DnsModule.PARAM_LOCK_INDEX;
import static google.registry.dns.DnsModule.PARAM_NUM_PUBLISH_LOCKS;
import static google.registry.dns.DnsModule.PARAM_PUBLISH_TASK_ENQUEUED;
import static google.registry.dns.DnsModule.PARAM_REFRESH_REQUEST_CREATED;
import static google.registry.dns.DnsModule.PARAM_REFRESH_REQUEST_TIME;
import static google.registry.request.RequestParameters.PARAM_TLD;
import static google.registry.util.DomainNameUtils.getSecondLevelDomain;
import static java.nio.charset.StandardCharsets.UTF_8;
@ -109,7 +109,7 @@ public final class ReadDnsQueueAction implements Runnable {
@Inject
ReadDnsQueueAction(
@Config("dnsTldUpdateBatchSize") int tldUpdateBatchSize,
@Config("readDnsQueueActionRuntime") Duration requestedMaximumDuration,
@Config("readDnsRefreshRequestsActionRuntime") Duration requestedMaximumDuration,
@Parameter(PARAM_JITTER_SECONDS) Optional<Integer> jitterSeconds,
Clock clock,
DnsQueue dnsQueue,
@ -379,7 +379,7 @@ public final class ReadDnsQueueAction implements Runnable {
.put(PARAM_LOCK_INDEX, Integer.toString(lockIndex))
.put(PARAM_NUM_PUBLISH_LOCKS, Integer.toString(numPublishLocks))
.put(PARAM_PUBLISH_TASK_ENQUEUED, clock.nowUtc().toString())
.put(PARAM_REFRESH_REQUEST_CREATED, earliestCreateTime.toString())
.put(PARAM_REFRESH_REQUEST_TIME, earliestCreateTime.toString())
.put(
PARAM_DOMAINS,
chunk.stream()

View file

@ -0,0 +1,206 @@
// Copyright 2023 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.dns;
import static com.google.common.collect.ImmutableSetMultimap.toImmutableSetMultimap;
import static google.registry.dns.DnsConstants.DNS_PUBLISH_PUSH_QUEUE_NAME;
import static google.registry.dns.DnsModule.PARAM_DNS_WRITER;
import static google.registry.dns.DnsModule.PARAM_DOMAINS;
import static google.registry.dns.DnsModule.PARAM_HOSTS;
import static google.registry.dns.DnsModule.PARAM_LOCK_INDEX;
import static google.registry.dns.DnsModule.PARAM_NUM_PUBLISH_LOCKS;
import static google.registry.dns.DnsModule.PARAM_PUBLISH_TASK_ENQUEUED;
import static google.registry.dns.DnsModule.PARAM_REFRESH_REQUEST_TIME;
import static google.registry.request.RequestParameters.PARAM_TLD;
import static google.registry.util.DateTimeUtils.END_OF_TIME;
import static google.registry.util.DomainNameUtils.getSecondLevelDomain;
import static java.nio.charset.StandardCharsets.UTF_8;
import com.google.cloud.tasks.v2.Task;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.flogger.FluentLogger;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import google.registry.batch.CloudTasksUtils;
import google.registry.config.RegistryConfig.Config;
import google.registry.dns.DnsConstants.TargetType;
import google.registry.model.common.DnsRefreshRequest;
import google.registry.model.tld.Registry;
import google.registry.request.Action;
import google.registry.request.Action.Service;
import google.registry.request.Parameter;
import google.registry.request.auth.Auth;
import google.registry.util.Clock;
import java.util.Collection;
import java.util.Optional;
import javax.inject.Inject;
import org.joda.time.DateTime;
import org.joda.time.Duration;
/**
* Action for fanning out DNS refresh tasks by TLD, using data taken from {@link DnsRefreshRequest}
* table.
*/
@Action(
service = Service.BACKEND,
path = "/_dr/task/readDnsRefreshRequests",
automaticallyPrintOk = true,
auth = Auth.AUTH_INTERNAL_OR_ADMIN)
public final class ReadDnsRefreshRequestsAction implements Runnable {
// This parameter cannot be named "jitterSeconds", which will be read by TldFanoutAction and not
// be passed to this action.
private static final String PARAM_JITTER_SECONDS = "dnsJitterSeconds";
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private final int tldUpdateBatchSize;
private final Duration requestedMaximumDuration;
private final Optional<Integer> jitterSeconds;
private final String tld;
private final Clock clock;
private final DnsUtils dnsUtils;
private final HashFunction hashFunction;
private final CloudTasksUtils cloudTasksUtils;
@Inject
ReadDnsRefreshRequestsAction(
@Config("dnsTldUpdateBatchSize") int tldUpdateBatchSize,
@Config("readDnsRefreshRequestsActionRuntime") Duration requestedMaximumDuration,
@Parameter(PARAM_JITTER_SECONDS) Optional<Integer> jitterSeconds,
@Parameter(PARAM_TLD) String tld,
Clock clock,
DnsUtils dnsUtils,
HashFunction hashFunction,
CloudTasksUtils cloudTasksUtils) {
this.tldUpdateBatchSize = tldUpdateBatchSize;
this.requestedMaximumDuration = requestedMaximumDuration;
this.jitterSeconds = jitterSeconds;
this.tld = tld;
this.clock = clock;
this.dnsUtils = dnsUtils;
this.hashFunction = hashFunction;
this.cloudTasksUtils = cloudTasksUtils;
}
/**
* Reads requests up to the maximum requested runtime, and enqueues update batches from the these
* requests.
*/
@Override
public void run() {
if (Registry.get(tld).getDnsPaused()) {
logger.atInfo().log("The queue updated is paused for TLD: %s.", tld);
return;
}
DateTime requestedEndTime = clock.nowUtc().plus(requestedMaximumDuration);
// See getLockIndex(), requests are evenly distributed to [1, numDnsPublishLocks], so each
// bucket would be roughly the size of tldUpdateBatchSize.
int processBatchSize = tldUpdateBatchSize * Registry.get(tld).getNumDnsPublishLocks();
while (requestedEndTime.isAfter(clock.nowUtc())) {
ImmutableList<DnsRefreshRequest> requests =
dnsUtils.readAndUpdateRequestsWithLatestProcessTime(
tld, requestedMaximumDuration, processBatchSize);
logger.atInfo().log("Read %d DNS update requests for TLD %s.", requests.size(), tld);
if (!requests.isEmpty()) {
processRequests(requests);
}
if (requests.size() < processBatchSize) {
return;
}
}
}
/**
* Subdivides {@link DnsRefreshRequest} into buckets by lock index, enqueue a Cloud Tasks task per
* bucket, and then delete the requests in each bucket.
*/
void processRequests(Collection<DnsRefreshRequest> requests) {
int numPublishLocks = Registry.get(tld).getNumDnsPublishLocks();
requests.stream()
.collect(
toImmutableSetMultimap(
request -> getLockIndex(numPublishLocks, request), request -> request))
.asMap()
.forEach(
(lockIndex, bucketedRequests) -> {
try {
enqueueUpdates(lockIndex, numPublishLocks, bucketedRequests);
dnsUtils.deleteRequests(bucketedRequests);
logger.atInfo().log(
"Processed %d DNS update requests for TLD %s.", bucketedRequests.size(), tld);
} catch (Exception e) {
// Log but continue to process the next bucket. The failed tasks will NOT be
// deleted and will be retried after the cooldown period has passed.
logger.atSevere().withCause(e).log(
"Error processing DNS update requests: %s", bucketedRequests);
}
});
}
/**
* Returns the lock index for a given {@link DnsRefreshRequest}.
*
* <p>We hash the second level domain for all records, to group in-bailiwick hosts (the only ones
* we refresh DNS for) with their superordinate domains. We use consistent hashing to determine
* the lock index because it gives us [0,N) bucketing properties out of the box, then add 1 to
* make indexes within [1,N].
*/
int getLockIndex(int numPublishLocks, DnsRefreshRequest request) {
String domain = getSecondLevelDomain(request.getName(), tld);
return Hashing.consistentHash(hashFunction.hashString(domain, UTF_8), numPublishLocks) + 1;
}
/** Creates DNS refresh tasks for all writers for the tld within a lock index. */
void enqueueUpdates(int lockIndex, int numPublishLocks, Collection<DnsRefreshRequest> requests) {
ImmutableList.Builder<String> domainsBuilder = new ImmutableList.Builder<>();
ImmutableList.Builder<String> hostsBuilder = new ImmutableList.Builder<>();
DateTime earliestRequestTime = END_OF_TIME;
for (DnsRefreshRequest request : requests) {
if (request.getRequestTime().isBefore(earliestRequestTime)) {
earliestRequestTime = request.getRequestTime();
}
String name = request.getName();
if (request.getType().equals(TargetType.DOMAIN)) {
domainsBuilder.add(name);
} else {
hostsBuilder.add(name);
}
}
ImmutableList<String> domains = domainsBuilder.build();
ImmutableList<String> hosts = hostsBuilder.build();
for (String dnsWriter : Registry.get(tld).getDnsWriters()) {
Task task =
cloudTasksUtils.createPostTaskWithJitter(
PublishDnsUpdatesAction.PATH,
Service.BACKEND,
ImmutableMultimap.<String, String>builder()
.put(PARAM_TLD, tld)
.put(PARAM_DNS_WRITER, dnsWriter)
.put(PARAM_LOCK_INDEX, Integer.toString(lockIndex))
.put(PARAM_NUM_PUBLISH_LOCKS, Integer.toString(numPublishLocks))
.put(PARAM_PUBLISH_TASK_ENQUEUED, clock.nowUtc().toString())
.put(PARAM_REFRESH_REQUEST_TIME, earliestRequestTime.toString())
.put(PARAM_DOMAINS, Joiner.on(',').join(domains))
.put(PARAM_HOSTS, Joiner.on(',').join(hosts))
.build(),
jitterSeconds);
cloudTasksUtils.enqueue(DNS_PUBLISH_PUSH_QUEUE_NAME, task);
logger.atInfo().log(
"Enqueued DNS update request for (TLD %s, lock %d) with %d domains and %d hosts.",
tld, lockIndex, domains.size(), hosts.size());
}
}
}

View file

@ -137,4 +137,14 @@
</description>
<schedule>*/1 * * * *</schedule>
</task>
<task>
<url>
<![CDATA[/_dr/cron/fanout?queue=dns-refresh&forEachRealTld&ForEachTestTld&endpoint=/_dr/task/readDnsRefreshRequests&dnsJitterSeconds=45]]></url>
<name>readDnsRefreshRequests</name>
<description>
Enqueue a ReadDnsRefreshRequestAction for each TLD.
</description>
<schedule>*/1 * * * *</schedule>
</task>
</taskentries>

View file

@ -6,6 +6,12 @@
<mode>pull</mode>
</queue>
<!-- Queue for reading DNS update requests and batching them off to the dns-publish queue. -->
<queue>
<name>dns-refresh</name>
<rate>100/s</rate>
</queue>
<!-- Queue for publishing DNS updates in batches. -->
<queue>
<name>dns-publish</name>

View file

@ -139,6 +139,16 @@
<schedule>*/1 * * * *</schedule>
</task>
<task>
<url>
<![CDATA[/_dr/cron/fanout?queue=dns-refresh&forEachRealTld&ForEachTestTld&endpoint=/_dr/task/readDnsRefreshRequests&dnsJitterSeconds=45]]></url>
<name>readDnsRefreshRequests</name>
<description>
Enqueue a ReadDnsRefreshRequestAction for each TLD.
</description>
<schedule>*/1 * * * *</schedule>
</task>
<task>
<url><![CDATA[/_dr/task/deleteExpiredDomains]]></url>
<name>deleteExpiredDomains</name>

View file

@ -210,6 +210,16 @@
<schedule>*/1 * * * *</schedule>
</task>
<task>
<url>
<![CDATA[/_dr/cron/fanout?queue=dns-refresh&forEachRealTld&ForEachTestTld&endpoint=/_dr/task/readDnsRefreshRequests&dnsJitterSeconds=45]]></url>
<name>readDnsRefreshRequests</name>
<description>
Enqueue a ReadDnsRefreshRequestAction for each TLD.
</description>
<schedule>*/1 * * * *</schedule>
</task>
<task>
<url><![CDATA[/_dr/cron/fanout?queue=retryable-cron-tasks&endpoint=/_dr/task/icannReportingStaging&runInEmpty]]></url>
<name>icannReportingStaging</name>

View file

@ -21,6 +21,16 @@
<schedule>*/1 * * * *</schedule>
</task>
<task>
<url>
<![CDATA[/_dr/cron/fanout?queue=dns-refresh&forEachRealTld&ForEachTestTld&endpoint=/_dr/task/readDnsRefreshRequests&dnsJitterSeconds=45]]></url>
<name>readDnsRefreshRequests</name>
<description>
Enqueue a ReadDnsRefreshRequestAction for each TLD.
</description>
<schedule>*/1 * * * *</schedule>
</task>
<task>
<url><![CDATA[/_dr/cron/fanout?queue=sheet&endpoint=/_dr/task/syncRegistrarsSheet&runInEmpty]]></url>
<name>syncRegistrarsSheet</name>

View file

@ -152,6 +152,16 @@
<schedule>*/1 * * * *</schedule>
</task>
<task>
<url>
<![CDATA[/_dr/cron/fanout?queue=dns-refresh&forEachRealTld&ForEachTestTld&endpoint=/_dr/task/readDnsRefreshRequests&dnsJitterSeconds=45]]></url>
<name>readDnsRefreshRequests</name>
<description>
Enqueue a ReadDnsRefreshRequestAction for each TLD.
</description>
<schedule>*/1 * * * *</schedule>
</task>
<task>
<url><![CDATA[/_dr/task/wipeOutContactHistoryPii]]></url>
<name>wipeOutContactHistoryPii</name>

View file

@ -21,6 +21,7 @@ import static google.registry.util.DateTimeUtils.START_OF_TIME;
import google.registry.dns.DnsConstants.TargetType;
import google.registry.dns.PublishDnsUpdatesAction;
import google.registry.model.ImmutableObject;
import google.registry.persistence.VKey;
import javax.annotation.Nullable;
import javax.persistence.Column;
import javax.persistence.Entity;
@ -89,6 +90,11 @@ public class DnsRefreshRequest extends ImmutableObject {
return lastProcessTime;
}
@Override
public VKey<DnsRefreshRequest> createVKey() {
return VKey.create(DnsRefreshRequest.class, id);
}
protected DnsRefreshRequest() {}
private DnsRefreshRequest(

View file

@ -14,6 +14,7 @@
package google.registry.dns;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.truth.Truth.assertThat;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import static google.registry.testing.DatabaseHelper.createTld;
@ -26,6 +27,7 @@ import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;
@ -36,6 +38,7 @@ import google.registry.model.common.DnsRefreshRequest;
import google.registry.persistence.transaction.JpaTestExtensions;
import google.registry.persistence.transaction.JpaTestExtensions.JpaIntegrationTestExtension;
import google.registry.testing.FakeClock;
import java.util.Comparator;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.junit.jupiter.api.Assertions;
@ -53,8 +56,7 @@ public class DnsUtilsTest {
private final DnsQueue dnsQueue = mock(DnsQueue.class);
private final DnsUtils dnsUtils = new DnsUtils(dnsQueue);
FakeClock clock = new FakeClock(DateTime.parse("2020-02-02T01:23:45Z"));
private final FakeClock clock = new FakeClock(DateTime.parse("2020-02-02T01:23:45Z"));
@RegisterExtension
JpaIntegrationTestExtension jpa =
@ -138,12 +140,133 @@ public class DnsUtilsTest {
assertRequest(request, TargetType.DOMAIN, domainName, tld, clock.nowUtc().plusMinutes(3));
}
@Test
void testSuccess_ProcessRequests() {
ImmutableList<DnsRefreshRequest> requests = processRequests();
DateTime processtime = clock.nowUtc();
assertThat(requests.size()).isEqualTo(4);
assertRequest(
requests.get(0),
TargetType.DOMAIN,
"test2.tld",
"tld",
clock.nowUtc().minusMinutes(4),
processtime);
assertRequest(
requests.get(1),
TargetType.DOMAIN,
"test1.tld",
"tld",
clock.nowUtc().minusMinutes(3),
processtime);
assertRequest(
requests.get(2),
TargetType.HOST,
"ns1.test2.tld",
"tld",
clock.nowUtc().minusMinutes(1),
processtime);
assertRequest(
requests.get(3),
TargetType.DOMAIN,
"test5.tld",
"tld",
clock.nowUtc().minusMinutes(1),
processtime);
requests = loadAllOf(DnsRefreshRequest.class);
assertThat(requests.size()).isEqualTo(7);
// The four processed records should have updated process time in SQL as well.
assertThat(requests.stream().filter(e -> e.getLastProcessTime().equals(processtime)).count())
.isEqualTo(4);
clock.advanceOneMilli();
// Requests within cooldown period not included.
requests =
dnsUtils.readAndUpdateRequestsWithLatestProcessTime("tld", Duration.standardMinutes(1), 4);
assertThat(requests.size()).isEqualTo(1);
assertRequest(
requests.get(0),
TargetType.DOMAIN,
"test6.tld",
"tld",
clock.nowUtc().minusMinutes(1).minusMillis(1),
clock.nowUtc());
}
@Test
void testSuccess_deleteRequests() {
dnsUtils.deleteRequests(processRequests());
ImmutableList<DnsRefreshRequest> remainingRequests =
loadAllOf(DnsRefreshRequest.class).stream()
.sorted(Comparator.comparing(DnsRefreshRequest::getRequestTime))
.collect(toImmutableList());
assertThat(remainingRequests.size()).isEqualTo(3);
assertRequest(
remainingRequests.get(0),
TargetType.DOMAIN,
"something.example",
"example",
clock.nowUtc().minusMinutes(2));
assertRequest(
remainingRequests.get(1),
TargetType.DOMAIN,
"test6.tld",
"tld",
clock.nowUtc().minusMinutes(1));
assertRequest(
remainingRequests.get(2),
TargetType.DOMAIN,
"test4.tld",
"tld",
clock.nowUtc().plusMinutes(1));
tm().transact(() -> tm().delete(remainingRequests.get(2)));
assertThat(loadAllOf(DnsRefreshRequest.class).size()).isEqualTo(2);
// Should not throw even though one of the request is already deleted.
dnsUtils.deleteRequests(remainingRequests);
assertThat(loadAllOf(DnsRefreshRequest.class).size()).isEqualTo(0);
}
private ImmutableList<DnsRefreshRequest> processRequests() {
useDnsSql();
createTld("example");
// Domain Included.
dnsUtils.requestDomainDnsRefresh("test1.tld", Duration.standardMinutes(1));
// This one should be returned before test1.tld, even though it's added later, because of
// the delay specified in test1.tld.
dnsUtils.requestDomainDnsRefresh("test2.tld");
// Not included because the TLD is not under management.
dnsUtils.requestDomainDnsRefresh("something.example", Duration.standardMinutes(2));
clock.advanceBy(Duration.standardMinutes(3));
// Host included.
dnsUtils.requestHostDnsRefresh("ns1.test2.tld");
// Not included because the request time is in the future
dnsUtils.requestDomainDnsRefresh("test4.tld", Duration.standardMinutes(2));
// Included after the previous one. Same request time, order by insertion order (i.e. ID);
dnsUtils.requestDomainDnsRefresh("test5.tld");
// Not included because batch size is exceeded;
dnsUtils.requestDomainDnsRefresh("test6.tld");
clock.advanceBy(Duration.standardMinutes(1));
return dnsUtils.readAndUpdateRequestsWithLatestProcessTime(
"tld", Duration.standardMinutes(1), 4);
}
private static void assertRequest(
DnsRefreshRequest request, TargetType type, String name, String tld, DateTime requestTime) {
assertRequest(request, type, name, tld, requestTime, START_OF_TIME);
}
private static void assertRequest(
DnsRefreshRequest request,
TargetType type,
String name,
String tld,
DateTime requestTime,
DateTime processTime) {
assertThat(request.getType()).isEqualTo(type);
assertThat(request.getName()).isEqualTo(name);
assertThat(request.getTld()).isEqualTo(tld);
assertThat(request.getRequestTime()).isEqualTo(requestTime);
assertThat(request.getLastProcessTime()).isEqualTo(processTime);
}
private void useDnsSql() {

View file

@ -22,7 +22,7 @@ import static google.registry.dns.DnsModule.PARAM_HOSTS;
import static google.registry.dns.DnsModule.PARAM_LOCK_INDEX;
import static google.registry.dns.DnsModule.PARAM_NUM_PUBLISH_LOCKS;
import static google.registry.dns.DnsModule.PARAM_PUBLISH_TASK_ENQUEUED;
import static google.registry.dns.DnsModule.PARAM_REFRESH_REQUEST_CREATED;
import static google.registry.dns.DnsModule.PARAM_REFRESH_REQUEST_TIME;
import static google.registry.dns.PublishDnsUpdatesAction.RETRIES_BEFORE_PERMANENT_FAILURE;
import static google.registry.request.RequestParameters.PARAM_TLD;
import static google.registry.testing.DatabaseHelper.createTld;
@ -302,7 +302,7 @@ public class PublishDnsUpdatesActionTest {
.param(PARAM_LOCK_INDEX, "1")
.param(PARAM_NUM_PUBLISH_LOCKS, "1")
.param(PARAM_PUBLISH_TASK_ENQUEUED, clock.nowUtc().toString())
.param(PARAM_REFRESH_REQUEST_CREATED, clock.nowUtc().minusHours(2).toString())
.param(PARAM_REFRESH_REQUEST_TIME, clock.nowUtc().minusHours(2).toString())
.param(PARAM_DOMAINS, "example1.xn--q9jyb4c,example2.xn--q9jyb4c")
.param(PARAM_HOSTS, "")
.header("content-type", "application/x-www-form-urlencoded"),
@ -313,7 +313,7 @@ public class PublishDnsUpdatesActionTest {
.param(PARAM_LOCK_INDEX, "1")
.param(PARAM_NUM_PUBLISH_LOCKS, "1")
.param(PARAM_PUBLISH_TASK_ENQUEUED, clock.nowUtc().toString())
.param(PARAM_REFRESH_REQUEST_CREATED, clock.nowUtc().minusHours(2).toString())
.param(PARAM_REFRESH_REQUEST_TIME, clock.nowUtc().minusHours(2).toString())
.param(PARAM_DOMAINS, "example3.xn--q9jyb4c,example4.xn--q9jyb4c")
.param(PARAM_HOSTS, "ns1.example.xn--q9jyb4c")
.header("content-type", "application/x-www-form-urlencoded"));
@ -341,7 +341,7 @@ public class PublishDnsUpdatesActionTest {
.param(PARAM_LOCK_INDEX, "1")
.param(PARAM_NUM_PUBLISH_LOCKS, "1")
.param(PARAM_PUBLISH_TASK_ENQUEUED, clock.nowUtc().toString())
.param(PARAM_REFRESH_REQUEST_CREATED, clock.nowUtc().minusHours(2).toString())
.param(PARAM_REFRESH_REQUEST_TIME, clock.nowUtc().minusHours(2).toString())
.param(PARAM_DOMAINS, "example1.xn--q9jyb4c,example2.xn--q9jyb4c")
.param(PARAM_HOSTS, "")
.header("content-type", "application/x-www-form-urlencoded"),
@ -352,7 +352,7 @@ public class PublishDnsUpdatesActionTest {
.param(PARAM_LOCK_INDEX, "1")
.param(PARAM_NUM_PUBLISH_LOCKS, "1")
.param(PARAM_PUBLISH_TASK_ENQUEUED, clock.nowUtc().toString())
.param(PARAM_REFRESH_REQUEST_CREATED, clock.nowUtc().minusHours(2).toString())
.param(PARAM_REFRESH_REQUEST_TIME, clock.nowUtc().minusHours(2).toString())
.param(PARAM_DOMAINS, "example3.xn--q9jyb4c,example4.xn--q9jyb4c,example5.xn--q9jyb4c")
.param(PARAM_HOSTS, "ns1.example.xn--q9jyb4c")
.header("content-type", "application/x-www-form-urlencoded"));
@ -378,7 +378,7 @@ public class PublishDnsUpdatesActionTest {
.param(PARAM_LOCK_INDEX, "1")
.param(PARAM_NUM_PUBLISH_LOCKS, "1")
.param(PARAM_PUBLISH_TASK_ENQUEUED, clock.nowUtc().toString())
.param(PARAM_REFRESH_REQUEST_CREATED, clock.nowUtc().minusHours(2).toString())
.param(PARAM_REFRESH_REQUEST_TIME, clock.nowUtc().minusHours(2).toString())
.param(PARAM_DOMAINS, "example1.xn--q9jyb4c")
.param(PARAM_HOSTS, "")
.header("content-type", "application/x-www-form-urlencoded"),
@ -389,7 +389,7 @@ public class PublishDnsUpdatesActionTest {
.param(PARAM_LOCK_INDEX, "1")
.param(PARAM_NUM_PUBLISH_LOCKS, "1")
.param(PARAM_PUBLISH_TASK_ENQUEUED, clock.nowUtc().toString())
.param(PARAM_REFRESH_REQUEST_CREATED, clock.nowUtc().minusHours(2).toString())
.param(PARAM_REFRESH_REQUEST_TIME, clock.nowUtc().minusHours(2).toString())
.param(PARAM_DOMAINS, "")
.param(PARAM_HOSTS, "ns1.example.xn--q9jyb4c")
.header("content-type", "application/x-www-form-urlencoded"));

View file

@ -144,7 +144,7 @@ public class ReadDnsQueueActionTest {
.url(PublishDnsUpdatesAction.PATH)
.param("tld", tldToDnsWriter.getKey())
.param("dnsWriter", tldToDnsWriter.getValue())
.param("itemsCreated", "3000-01-01T00:00:00.000Z")
.param("requestTime", "3000-01-01T00:00:00.000Z")
.param("enqueued", "3000-01-01T01:00:00.000Z")
// Single-lock TLDs should use lock 1 of 1 by default
.param("lockIndex", "1")
@ -245,7 +245,7 @@ public class ReadDnsQueueActionTest {
DNS_PUBLISH_PUSH_QUEUE_NAME,
new TaskMatcher()
.param("enqueued", "3000-02-05T01:00:00.000Z")
.param("itemsCreated", "3000-02-03T00:00:00.000Z")
.param("requestTime", "3000-02-03T00:00:00.000Z")
.param("tld", "com")
.param("dnsWriter", "comWriter")
.param("domains", "domain1.com,domain2.com,domain3.com")
@ -484,7 +484,7 @@ public class ReadDnsQueueActionTest {
.url(PublishDnsUpdatesAction.PATH)
.param("tld", "multilock.uk")
.param("dnsWriter", "multilockWriter")
.param("itemsCreated", "3000-01-01T00:00:00.000Z")
.param("requestTime", "3000-01-01T00:00:00.000Z")
.param("enqueued", "3000-01-01T01:00:00.000Z")
.param("domains", "hello.multilock.uk")
.param("hosts", "ns1.abc.hello.multilock.uk,ns2.hello.multilock.uk")
@ -493,7 +493,7 @@ public class ReadDnsQueueActionTest {
.url(PublishDnsUpdatesAction.PATH)
.param("tld", "multilock.uk")
.param("dnsWriter", "multilockWriter")
.param("itemsCreated", "3000-01-01T00:00:00.000Z")
.param("requestTime", "3000-01-01T00:00:00.000Z")
.param("enqueued", "3000-01-01T01:00:00.000Z")
.param("domains", "another.multilock.uk")
.param("hosts", "ns3.def.another.multilock.uk,ns4.another.multilock.uk")

View file

@ -0,0 +1,296 @@
// Copyright 2017 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.dns;
import static com.google.common.truth.Truth.assertThat;
import static google.registry.model.common.DatabaseMigrationStateSchedule.MigrationState;
import static google.registry.model.common.DatabaseMigrationStateSchedule.set;
import static google.registry.model.common.DatabaseMigrationStateSchedule.useUncachedForTest;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import static google.registry.testing.DatabaseHelper.createTld;
import static google.registry.testing.DatabaseHelper.loadAllOf;
import static google.registry.testing.DatabaseHelper.persistResource;
import static google.registry.testing.DatabaseHelper.persistResources;
import static google.registry.util.DateTimeUtils.START_OF_TIME;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyCollection;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.Ordering;
import google.registry.dns.DnsConstants.TargetType;
import google.registry.model.common.DnsRefreshRequest;
import google.registry.model.common.DnsRefreshRequestTest;
import google.registry.model.tld.Registry;
import google.registry.persistence.transaction.JpaTestExtensions;
import google.registry.persistence.transaction.JpaTestExtensions.JpaIntegrationTestExtension;
import google.registry.testing.CloudTasksHelper;
import google.registry.testing.CloudTasksHelper.TaskMatcher;
import google.registry.testing.FakeClock;
import java.util.Collection;
import java.util.Optional;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Duration;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.ArgumentCaptor;
/** Unit tests for {@link DnsRefreshRequestTest}. */
public class ReadDnsRefreshRequestsActionTest {
private final FakeClock clock = new FakeClock(DateTime.parse("2020-02-02T01:23:45Z"));
private final CloudTasksHelper cloudTasksHelper = new CloudTasksHelper(clock);
private final DnsUtils dnsUtils = new DnsUtils(null);
private final Optional<Integer> jitterSeconds = Optional.of(5);
@RegisterExtension
JpaIntegrationTestExtension jpa =
new JpaTestExtensions.Builder().withClock(clock).buildIntegrationTestExtension();
private final ReadDnsRefreshRequestsAction action =
spy(
new ReadDnsRefreshRequestsAction(
2,
Duration.standardSeconds(10),
jitterSeconds,
"tld",
clock,
dnsUtils,
null,
cloudTasksHelper.getTestCloudTasksUtils()));
private ImmutableList<DnsRefreshRequest> requests;
@BeforeAll
static void beforeAll() {
useUncachedForTest();
}
@BeforeEach
void beforeEach() {
useDnsSql();
persistResource(
createTld("tld")
.asBuilder()
.setDnsWriters(ImmutableSet.of("FooWriter", "BarWriter"))
.setNumDnsPublishLocks(2)
.build());
requests =
new ImmutableList.Builder<DnsRefreshRequest>()
.add(new DnsRefreshRequest(TargetType.DOMAIN, "domain.tld", "tld", clock.nowUtc()))
.add(
new DnsRefreshRequest(
TargetType.HOST, "ns1.domain.tld", "tld", clock.nowUtc().minusMinutes(1)))
.add(
new DnsRefreshRequest(
TargetType.DOMAIN, "future.tld", "tld", clock.nowUtc().plusMinutes(1)))
.build();
clock.advanceBy(Duration.standardMinutes(5));
persistResources(requests);
requests = loadAllOf(DnsRefreshRequest.class);
}
@Test
void testSuccess_runAction_pausedTld() {
persistResource(createTld("tld").asBuilder().setDnsPaused(true).build());
action.run();
verify(action, never()).enqueueUpdates(anyInt(), anyInt(), anyCollection());
verify(action, never()).processRequests(anyCollection());
}
@Test
void testSuccess_runAction_requestTimeInTheFuture() {
clock.setTo(DateTime.parse("2000-01-01T00:00:00Z"));
action.run();
verify(action, never()).enqueueUpdates(anyInt(), anyInt(), anyCollection());
verify(action, never()).processRequests(anyCollection());
}
@Test
void testSuccess_runAction_oneBatch() {
// The read batch size is 2 * 2 = 4. All requests will be in the same bucket, even though the
// bucket size should be roughly 2. But this is as expected because getLockIndex() only evenly
// distributes tasks in a statistical sense.
doReturn(2).when(action).getLockIndex(anyInt(), any(DnsRefreshRequest.class));
action.run();
@SuppressWarnings("unchecked")
ArgumentCaptor<Collection<DnsRefreshRequest>> captor =
ArgumentCaptor.forClass(Collection.class);
verify(action, times(1)).enqueueUpdates(eq(2), eq(2), captor.capture());
assertThat(captor.getValue().size()).isEqualTo(3);
verify(action, times(1)).processRequests(captor.capture());
assertThat(captor.getValue().size()).isEqualTo(3);
assertThat(loadAllOf(DnsRefreshRequest.class).isEmpty()).isTrue();
}
@Test
void testSuccess_runAction_twoBatches() {
// Make the read batch size 2 * 1 = 2.
persistResource(Registry.get("tld").asBuilder().setNumDnsPublishLocks(1).build());
doReturn(1).when(action).getLockIndex(anyInt(), any(DnsRefreshRequest.class));
doAnswer(
invocation -> {
@SuppressWarnings("unchecked")
ImmutableList<DnsRefreshRequest> ans =
(ImmutableList<DnsRefreshRequest>) invocation.callRealMethod();
// The next read should not time out as we only increment by one millisecond, whereas
// the timeout is set to 10 seconds.
clock.advanceOneMilli();
return ans;
})
.when(action)
.processRequests(anyCollection());
action.run();
@SuppressWarnings("unchecked")
ArgumentCaptor<Collection<DnsRefreshRequest>> captor =
ArgumentCaptor.forClass(Collection.class);
verify(action, times(2)).enqueueUpdates(eq(1), eq(1), captor.capture());
assertThat(captor.getAllValues().get(0).size()).isEqualTo(2);
assertThat(captor.getAllValues().get(1).size()).isEqualTo(1);
verify(action, times(2)).processRequests(captor.capture());
assertThat(captor.getAllValues().get(0).size()).isEqualTo(2);
assertThat(captor.getAllValues().get(1).size()).isEqualTo(1);
assertThat(loadAllOf(DnsRefreshRequest.class).isEmpty()).isTrue();
}
@Test
void testSuccess_runAction_timeOutAfterFirstRead() {
// Make the process batch size 2 * 1 = 2.
persistResource(Registry.get("tld").asBuilder().setNumDnsPublishLocks(1).build());
// Both requests in the first batch will be bucketed to the same bucket.
doReturn(1).when(action).getLockIndex(anyInt(), any(DnsRefreshRequest.class));
doAnswer(
invocation -> {
@SuppressWarnings("unchecked")
ImmutableList<DnsRefreshRequest> ans =
(ImmutableList<DnsRefreshRequest>) invocation.callRealMethod();
// After this function is called once, the loop in run() should top when it checks
// if the current time is before the request end time.
clock.advanceBy(Duration.standardHours(1));
return ans;
})
.when(action)
.processRequests(anyCollection());
action.run();
verify(action, times(1)).enqueueUpdates(anyInt(), anyInt(), anyCollection());
verify(action, times(1)).processRequests(anyCollection());
// The third request is left untouched because it is not read;
ImmutableList<DnsRefreshRequest> remainingRequests = loadAllOf(DnsRefreshRequest.class);
assertThat(remainingRequests.size()).isEqualTo(1);
assertThat(remainingRequests.get(0).getLastProcessTime()).isEqualTo(START_OF_TIME);
}
@Test
void testSuccess_processTasks() {
doReturn(2)
.doReturn(1)
.doReturn(2)
.when(action)
.getLockIndex(eq(2), any(DnsRefreshRequest.class));
action.processRequests(requests);
verify(action).enqueueUpdates(2, 2, ImmutableSet.of(requests.get(0), requests.get(2)));
verify(action).enqueueUpdates(1, 2, ImmutableSet.of(requests.get(1)));
assertThat(loadAllOf(DnsRefreshRequest.class)).isEmpty();
}
@Test
void testSuccess_processTasks_enqueueFailed_tasksNotDeleted() {
doReturn(2)
.doReturn(1)
.doReturn(2)
.when(action)
.getLockIndex(eq(2), any(DnsRefreshRequest.class));
doThrow(new RuntimeException("Something went wrong!"))
.when(action)
.enqueueUpdates(eq(2), eq(2), anyCollection());
action.processRequests(requests);
verify(action).enqueueUpdates(2, 2, ImmutableSet.of(requests.get(0), requests.get(2)));
verify(action).enqueueUpdates(1, 2, ImmutableSet.of(requests.get(1)));
assertThat(loadAllOf(DnsRefreshRequest.class).size()).isEqualTo(2);
}
@Test
void testSuccess_enqueueTasks() {
action.enqueueUpdates(2, 3, requests);
cloudTasksHelper.assertTasksEnqueued(
"dns-publish",
new TaskMatcher()
.url("/_dr/task/publishDnsUpdates")
.service("BACKEND")
.param("tld", "tld")
.param("dnsWriter", "FooWriter")
.param("lockIndex", "2")
.param("numPublishLocks", "3")
.param("enqueued", clock.nowUtc().toString())
.param("requestTime", clock.nowUtc().minusMinutes(6).toString())
.param("domains", "domain.tld,future.tld")
.param("hosts", "ns1.domain.tld"),
new TaskMatcher()
.url("/_dr/task/publishDnsUpdates")
.service("BACKEND")
.param("tld", "tld")
.param("dnsWriter", "BarWriter")
.param("lockIndex", "2")
.param("numPublishLocks", "3")
.param("enqueued", clock.nowUtc().toString())
.param("requestTime", clock.nowUtc().minusMinutes(6).toString())
.param("domains", "domain.tld,future.tld")
.param("hosts", "ns1.domain.tld"));
cloudTasksHelper
.getTestTasksFor("dns-publish")
.forEach(
task -> {
DateTime scheduledTime =
new DateTime(task.getScheduleTime().getSeconds() * 1000, DateTimeZone.UTC);
assertThat(new Duration(clock.nowUtc(), scheduledTime))
.isAtMost(Duration.standardSeconds(jitterSeconds.get()));
});
}
private void useDnsSql() {
DateTime currentTime = clock.nowUtc();
clock.setTo(START_OF_TIME);
tm().transact(
() ->
set(
new ImmutableSortedMap.Builder<DateTime, MigrationState>(Ordering.natural())
.put(START_OF_TIME, MigrationState.DATASTORE_ONLY)
.put(START_OF_TIME.plusMillis(1), MigrationState.DATASTORE_PRIMARY)
.put(START_OF_TIME.plusMillis(2), MigrationState.DATASTORE_PRIMARY_NO_ASYNC)
.put(
START_OF_TIME.plusMillis(3), MigrationState.DATASTORE_PRIMARY_READ_ONLY)
.put(START_OF_TIME.plusMillis(4), MigrationState.SQL_PRIMARY_READ_ONLY)
.put(START_OF_TIME.plusMillis(5), MigrationState.SQL_PRIMARY)
.put(START_OF_TIME.plusMillis(6), MigrationState.SQL_ONLY)
.put(START_OF_TIME.plusMillis(7), MigrationState.SEQUENCE_BASED_ALLOCATE_ID)
.put(START_OF_TIME.plusMillis(8), MigrationState.NORDN_SQL)
.put(START_OF_TIME.plusMillis(9), MigrationState.DNS_SQL)
.build()));
clock.setTo(currentTime);
}
}