diff --git a/java/google/registry/config/RegistryConfig.java b/java/google/registry/config/RegistryConfig.java
index f1c3024c1..1e0d347a8 100644
--- a/java/google/registry/config/RegistryConfig.java
+++ b/java/google/registry/config/RegistryConfig.java
@@ -265,29 +265,48 @@ public final class RegistryConfig {
}
/**
- * The maximum interval (seconds) to lease tasks from the dns-pull queue.
+ * The maximum time we allow publishDnsUpdates to run.
+ *
+ *
This is the maximum lock duration for publishing the DNS updates, meaning it should allow
+ * the various DnsWriters to publish and commit an entire batch (with a maximum number of items
+ * set by provideDnsTldUpdateBatchSize).
+ *
+ *
Any update that takes longer than this timeout will be killed and retried from scratch.
+ * Hence, a timeout that's too short can result in batches that retry over and over again,
+ * failing forever.
+ *
+ *
If there are lock contention issues, they should be solved by changing the batch sizes or
+ * the cron job rate, NOT by making this value smaller.
*
- * @see google.registry.dns.ReadDnsQueueAction
* @see google.registry.dns.PublishDnsUpdatesAction
*/
@Provides
- @Config("dnsWriteLockTimeout")
- public static Duration provideDnsWriteLockTimeout() {
- /*
- * This is the maximum lock duration for publishing the DNS updates, meaning it should allow
- * the various DnsWriters to publish and commit an entire batch (with a maximum number of
- * items set by provideDnsTldUpdateBatchSize).
- *
- * Any update that takes longer than this timeout will be killed and retried from scratch.
- * Hence, a timeout that's too short can result in batches that retry over and over again,
- * failing forever.
- *
- * If there are lock contention issues, they should be solved by changing the batch sizes
- * or the cron job rate, NOT by making this value smaller.
- */
+ @Config("publishDnsUpdatesLockDuration")
+ public static Duration providePublishDnsUpdatesLockDuration() {
return Duration.standardMinutes(3);
}
+ /**
+ * The requested maximum duration for ReadDnsQueueAction.
+ *
+ *
ReadDnsQueueAction reads update tasks from the dns-pull queue. It will continue reading
+ * tasks until either the queue is empty, or this duration has passed.
+ *
+ *
This time is the maximum duration between the first and last attempt to lease tasks from
+ * the dns-pull queue. The actual running time might be slightly longer, as we process the
+ * tasks.
+ *
+ *
This value should be less than the cron-job repeat rate for ReadDnsQueueAction, to make
+ * sure we don't have multiple ReadDnsActions leasing tasks simultaneously.
+ *
+ * @see google.registry.dns.ReadDnsQueueAction
+ */
+ @Provides
+ @Config("readDnsQueueActionRuntime")
+ public static Duration provideReadDnsQueueRuntime() {
+ return Duration.standardSeconds(45);
+ }
+
/**
* Returns the default time to live for DNS A and AAAA records.
*
diff --git a/java/google/registry/dns/DnsQueue.java b/java/google/registry/dns/DnsQueue.java
index 8a349ba99..d2d6b09d3 100644
--- a/java/google/registry/dns/DnsQueue.java
+++ b/java/google/registry/dns/DnsQueue.java
@@ -34,25 +34,44 @@ import com.google.apphosting.api.DeadlineExceededException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.net.InternetDomainName;
+import com.google.common.util.concurrent.RateLimiter;
import google.registry.dns.DnsConstants.TargetType;
import google.registry.model.registry.Registries;
import google.registry.util.FormattingLogger;
import google.registry.util.NonFinalForTesting;
import java.util.List;
import java.util.Optional;
-import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import javax.inject.Inject;
import javax.inject.Named;
import org.joda.time.Duration;
-/** Methods for manipulating the queue used for DNS write tasks. */
+/**
+ * Methods for manipulating the queue used for DNS write tasks.
+ *
+ *
This includes a {@link RateLimiter} to limit the {@link Queue#leaseTasks} call rate to 9 QPS,
+ * to stay under the 10 QPS limit for this function.
+ *
+ *
Note that overlapping calls to {@link ReadDnsQueueAction} (the only place where
+ * {@link DnsQueue#leaseTasks} is used) will have different rate limiters, so they could exceed the
+ * allowed rate. This should be rare though - because {@link DnsQueue#leaseTasks} is only used in
+ * {@link ReadDnsQueueAction}, which is run as a cron job with running time shorter than the cron
+ * repeat time - meaning there should never be two instances running at once.
+ *
+ * @see google.registry.config.RegistryConfig#provideReadDnsQueueRuntime
+ */
public class DnsQueue {
private static final FormattingLogger logger = FormattingLogger.getLoggerForCallerClass();
private final Queue queue;
+ // Queue.leaseTasks is limited to 10 requests per second as per
+ // https://cloud.google.com/appengine/docs/standard/java/javadoc/com/google/appengine/api/taskqueue/Queue.html
+ // "If you generate more than 10 LeaseTasks requests per second, only the first 10 requests will
+ // return results. The others will return no results."
+ private static final RateLimiter rateLimiter = RateLimiter.create(9);
+
@Inject
public DnsQueue(@Named(DNS_PULL_QUEUE_NAME) Queue queue) {
this.queue = queue;
@@ -112,9 +131,21 @@ public class DnsQueue {
return addToQueue(TargetType.ZONE, fullyQualifiedZoneName, fullyQualifiedZoneName);
}
+ /**
+ * Returns the maximum number of tasks that can be leased with {@link #leaseTasks}.
+ *
+ *
If this many tasks are returned, then there might be more tasks still waiting in the queue.
+ *
+ *
If less than this number of tasks are returned, then there are no more items in the queue.
+ */
+ public long getLeaseTasksBatchSize() {
+ return leaseTasksBatchSize;
+ }
+
/** Returns handles for a batch of tasks, leased for the specified duration. */
public List leaseTasks(Duration leaseDuration) {
try {
+ rateLimiter.acquire();
int numTasks = queue.fetchStatistics().getNumTasks();
logger.logfmt(
(numTasks >= leaseTasksBatchSize) ? Level.WARNING : Level.INFO,
@@ -128,17 +159,6 @@ public class DnsQueue {
}
}
- /** Reduce the task lease time to zero, making it immediately available to be leased again. */
- public void dropTaskLease(TaskHandle task) {
- try {
- queue.modifyTaskLease(task, 0, TimeUnit.SECONDS);
- } catch (IllegalStateException e) {
- logger.warningfmt(e, "Failed dropping expired lease: %s", task.getName());
- } catch (TransientFailureException | DeadlineExceededException e) {
- logger.severe(e, "Failed dropping task leases too fast");
- }
- }
-
/** Delete the task, removing it from the queue permanently. */
public void deleteTask(TaskHandle task) {
try {
diff --git a/java/google/registry/dns/PublishDnsUpdatesAction.java b/java/google/registry/dns/PublishDnsUpdatesAction.java
index eba62fd3a..52f66bfb6 100644
--- a/java/google/registry/dns/PublishDnsUpdatesAction.java
+++ b/java/google/registry/dns/PublishDnsUpdatesAction.java
@@ -58,7 +58,7 @@ public final class PublishDnsUpdatesAction implements Runnable, Callable {
@Inject DnsQueue dnsQueue;
@Inject DnsWriterProxy dnsWriterProxy;
@Inject DnsMetrics dnsMetrics;
- @Inject @Config("dnsWriteLockTimeout") Duration timeout;
+ @Inject @Config("publishDnsUpdatesLockDuration") Duration timeout;
/**
* The DNS writer to use for this batch.
diff --git a/java/google/registry/dns/ReadDnsQueueAction.java b/java/google/registry/dns/ReadDnsQueueAction.java
index 8fd64c7d2..66c5f688c 100644
--- a/java/google/registry/dns/ReadDnsQueueAction.java
+++ b/java/google/registry/dns/ReadDnsQueueAction.java
@@ -19,7 +19,6 @@ import static com.google.common.collect.Sets.difference;
import static google.registry.dns.DnsConstants.DNS_PUBLISH_PUSH_QUEUE_NAME;
import static google.registry.dns.DnsConstants.DNS_TARGET_NAME_PARAM;
import static google.registry.dns.DnsConstants.DNS_TARGET_TYPE_PARAM;
-import static google.registry.model.registry.Registries.getTlds;
import static java.util.concurrent.TimeUnit.SECONDS;
import com.google.appengine.api.taskqueue.Queue;
@@ -27,31 +26,31 @@ import com.google.appengine.api.taskqueue.TaskHandle;
import com.google.appengine.api.taskqueue.TaskOptions;
import com.google.auto.value.AutoValue;
import com.google.common.collect.ComparisonChain;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.collect.Iterables;
-import com.google.common.collect.SortedSetMultimap;
-import com.google.common.collect.TreeMultimap;
+import com.google.common.collect.Ordering;
import google.registry.config.RegistryConfig.Config;
import google.registry.dns.DnsConstants.TargetType;
+import google.registry.model.registry.Registries;
import google.registry.model.registry.Registry;
import google.registry.request.Action;
import google.registry.request.Parameter;
import google.registry.request.RequestParameters;
import google.registry.request.auth.Auth;
+import google.registry.util.Clock;
import google.registry.util.FormattingLogger;
import google.registry.util.TaskEnqueuer;
import java.io.UnsupportedEncodingException;
import java.util.Collection;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
-import java.util.Set;
import javax.inject.Inject;
import javax.inject.Named;
+import org.joda.time.DateTime;
import org.joda.time.Duration;
/**
@@ -74,10 +73,25 @@ public final class ReadDnsQueueAction implements Runnable {
private static final Random random = new Random();
private static final FormattingLogger logger = FormattingLogger.getLoggerForCallerClass();
+ /**
+ * Buffer time since the end of this action until retriable tasks are available again.
+ *
+ * We read batches of tasks from the queue in a loop. Any task that will need to be retried has
+ * to be kept out of the queue for the duration of this action - otherwise we will lease it again
+ * in a subsequent loop.
+ *
+ *
The 'requestedMaximumDuration' value is the maximum delay between the first and last calls
+ * to lease tasks, hence we want the lease duration to be (slightly) longer than that.
+ * LEASE_PADDING is the value we add to {@link #requestedMaximumDuration} to make sure the lease
+ * duration is indeed longer.
+ */
+ private static final Duration LEASE_PADDING = Duration.standardMinutes(1);
+
@Inject @Config("dnsTldUpdateBatchSize") int tldUpdateBatchSize;
- @Inject @Config("dnsWriteLockTimeout") Duration writeLockTimeout;
+ @Inject @Config("readDnsQueueActionRuntime") Duration requestedMaximumDuration;
@Inject @Named(DNS_PUBLISH_PUSH_QUEUE_NAME) Queue dnsPublishPushQueue;
@Inject @Parameter(PARAM_JITTER_SECONDS) Optional jitterSeconds;
+ @Inject Clock clock;
@Inject DnsQueue dnsQueue;
@Inject TaskEnqueuer taskEnqueuer;
@Inject ReadDnsQueueAction() {}
@@ -105,36 +119,148 @@ public final class ReadDnsQueueAction implements Runnable {
/** Leases all tasks from the pull queue and creates per-tld update actions for them. */
@Override
public void run() {
- Set tldsOfInterest = getTlds();
-
- List tasks = dnsQueue.leaseTasks(writeLockTimeout);
- if (tasks.isEmpty()) {
- return;
+ DateTime requestedEndTime = clock.nowUtc().plus(requestedMaximumDuration);
+ ImmutableSet tlds = Registries.getTlds();
+ while (requestedEndTime.isAfterNow()) {
+ List tasks = dnsQueue.leaseTasks(requestedMaximumDuration.plus(LEASE_PADDING));
+ logger.infofmt("Leased %d DNS update tasks.", tasks.size());
+ if (!tasks.isEmpty()) {
+ dispatchTasks(ImmutableSet.copyOf(tasks), tlds);
+ }
+ if (tasks.size() < dnsQueue.getLeaseTasksBatchSize()) {
+ return;
+ }
}
- logger.infofmt("Leased %d DNS update tasks.", tasks.size());
- // Normally, all tasks will be deleted from the pull queue. But some might have to remain if
- // we are not interested in the associated TLD, or if the TLD is paused. Remember which these
- // are.
- Set tasksToKeep = new HashSet<>();
- // The paused TLDs for which we found at least one refresh request.
- Set pausedTlds = new HashSet<>();
- // Create a sorted multimap into which we will insert the refresh items, so that the items for
- // each TLD will be grouped together, and domains and hosts will be grouped within a TLD. The
- // grouping and ordering of domains and hosts is not technically necessary, but a predictable
- // ordering makes it possible to write detailed tests.
- SortedSetMultimap refreshItemMultimap = TreeMultimap.create();
+ }
+
+ /** A set of tasks grouped based on the action to take on them. */
+ @AutoValue
+ abstract static class ClassifiedTasks {
+
+ /**
+ * List of tasks we want to keep in the queue (want to retry in the future).
+ *
+ * Normally, any task we lease from the queue will be deleted - either because we are going
+ * to process it now (these tasks are part of refreshItemsByTld), or because these tasks are
+ * "corrupt" in some way (don't parse, don't have the required parameters etc.).
+ *
+ *
Some tasks however are valid, but can't be processed at the moment. These tasks will be
+ * kept in the queue for future processing.
+ *
+ *
This includes tasks belonging to paused TLDs (which we want to process once the TLD is
+ * unpaused) and tasks belonging to (currently) unknown TLDs.
+ */
+ abstract ImmutableSet tasksToKeep();
+
+ /** The paused TLDs for which we found at least one refresh request. */
+ abstract ImmutableSet pausedTlds();
+
+ /**
+ * The unknown TLDs for which we found at least one refresh request.
+ *
+ * Unknown TLDs might have valid requests because the list of TLDs is heavily cached. Hence,
+ * when we add a new TLD - it is possible that some instances will not have that TLD in their
+ * list yet. We don't want to discard these tasks, so we wait a bit and retry them again.
+ *
+ *
This is less likely for production TLDs but is quite likely for test TLDs where we might
+ * create a TLD and then use it within seconds.
+ */
+ abstract ImmutableSet unknownTlds();
+
+ /**
+ * All the refresh items we need to actually fulfill, grouped by TLD.
+ *
+ * By default, the multimap is ordered - this can be changed with the builder.
+ *
+ *
The items for each TLD will be grouped together, and domains and hosts will be grouped
+ * within a TLD.
+ *
+ *
The grouping and ordering of domains and hosts is not technically necessary, but a
+ * predictable ordering makes it possible to write detailed tests.
+ */
+ abstract ImmutableSetMultimap refreshItemsByTld();
+
+ static Builder builder() {
+ Builder builder = new AutoValue_ReadDnsQueueAction_ClassifiedTasks.Builder();
+ builder
+ .refreshItemsByTldBuilder()
+ .orderKeysBy(Ordering.natural())
+ .orderValuesBy(Ordering.natural());
+ return builder;
+ }
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract ImmutableSet.Builder tasksToKeepBuilder();
+ abstract ImmutableSet.Builder pausedTldsBuilder();
+ abstract ImmutableSet.Builder unknownTldsBuilder();
+ abstract ImmutableSetMultimap.Builder refreshItemsByTldBuilder();
+
+ abstract ClassifiedTasks build();
+ }
+ }
+
+ /**
+ * Creates per-tld update actions for tasks leased from the pull queue.
+ *
+ * Will return "irrelevant" tasks to the queue for future processing. "Irrelevant" tasks are
+ * tasks for paused TLDs or tasks for TLDs not part of {@link Registries#getTlds()}.
+ */
+ private void dispatchTasks(ImmutableSet tasks, ImmutableSet tlds) {
+ ClassifiedTasks classifiedTasks = classifyTasks(tasks, tlds);
+ if (!classifiedTasks.pausedTlds().isEmpty()) {
+ logger.infofmt("The dns-pull queue is paused for TLDs: %s.", classifiedTasks.pausedTlds());
+ }
+ if (!classifiedTasks.unknownTlds().isEmpty()) {
+ logger.warningfmt(
+ "The dns-pull queue has unknown TLDs: %s.", classifiedTasks.unknownTlds());
+ }
+ enqueueUpdates(classifiedTasks.refreshItemsByTld());
+ if (!classifiedTasks.tasksToKeep().isEmpty()) {
+ logger.warningfmt(
+ "Keeping %d DNS update tasks in the queue.", classifiedTasks.tasksToKeep().size());
+ }
+ // Delete the tasks we don't want to see again from the queue.
+ //
+ // tasksToDelete includes both the tasks that we already fulfilled in this call (were part of
+ // refreshItemsByTld) and "corrupt" tasks we weren't able to parse correctly (this shouldn't
+ // happen, and we logged a "severe" error)
+ //
+ // We let the lease on the rest of the tasks (the tasks we want to keep) expire on its own. The
+ // reason we don't release these tasks back to the queue immediately is that we are going to
+ // immediately read another batch of tasks from the queue - and we don't want to get the same
+ // tasks again.
+ ImmutableSet tasksToDelete =
+ difference(tasks, classifiedTasks.tasksToKeep()).immutableCopy();
+ logger.infofmt("Removing %d DNS update tasks from the queue.", tasksToDelete.size());
+ dnsQueue.deleteTasks(tasksToDelete.asList());
+ logger.infofmt("Done processing DNS tasks.");
+ }
+
+ /**
+ * Classifies the given tasks based on what action we need to take on them.
+ *
+ * Note that some tasks might appear in multiple categories (if multiple actions are to be
+ * taken on them) or in no category (if no action is to be taken on them)
+ */
+ private static ClassifiedTasks classifyTasks(
+ ImmutableSet tasks, ImmutableSet tlds) {
+
+ ClassifiedTasks.Builder classifiedTasksBuilder = ClassifiedTasks.builder();
+
// Read all tasks on the DNS pull queue and load them into the refresh item multimap.
for (TaskHandle task : tasks) {
try {
Map params = ImmutableMap.copyOf(task.extractParams());
String tld = params.get(RequestParameters.PARAM_TLD);
if (tld == null) {
- logger.severe("Discarding invalid DNS refresh request; no TLD specified.");
- } else if (!tldsOfInterest.contains(tld)) {
- tasksToKeep.add(task);
+ logger.severefmt("Discarding invalid DNS refresh request %s; no TLD specified.", task);
+ } else if (!tlds.contains(tld)) {
+ classifiedTasksBuilder.tasksToKeepBuilder().add(task);
+ classifiedTasksBuilder.unknownTldsBuilder().add(tld);
} else if (Registry.get(tld).getDnsPaused()) {
- tasksToKeep.add(task);
- pausedTlds.add(tld);
+ classifiedTasksBuilder.tasksToKeepBuilder().add(task);
+ classifiedTasksBuilder.pausedTldsBuilder().add(tld);
} else {
String typeString = params.get(DNS_TARGET_TYPE_PARAM);
String name = params.get(DNS_TARGET_NAME_PARAM);
@@ -142,24 +268,27 @@ public final class ReadDnsQueueAction implements Runnable {
switch (type) {
case DOMAIN:
case HOST:
- refreshItemMultimap.put(tld, RefreshItem.create(type, name));
+ classifiedTasksBuilder
+ .refreshItemsByTldBuilder()
+ .put(tld, RefreshItem.create(type, name));
break;
default:
- logger.severefmt("Discarding DNS refresh request of type %s.", typeString);
+ logger.severefmt("Discarding DNS refresh request %s of type %s.", task, typeString);
break;
}
}
} catch (RuntimeException | UnsupportedEncodingException e) {
- logger.severefmt(e, "Discarding invalid DNS refresh request (task %s).", task);
+ logger.severefmt(e, "Discarding invalid DNS refresh request %s.", task);
}
}
- if (!pausedTlds.isEmpty()) {
- logger.infofmt("The dns-pull queue is paused for TLDs: %s.", pausedTlds);
- }
+ return classifiedTasksBuilder.build();
+ }
+
+ private void enqueueUpdates(ImmutableSetMultimap refreshItemsByTld) {
// Loop through the multimap by TLD and generate refresh tasks for the hosts and domains for
// each configured DNS writer.
for (Map.Entry> tldRefreshItemsEntry
- : refreshItemMultimap.asMap().entrySet()) {
+ : refreshItemsByTld.asMap().entrySet()) {
String tld = tldRefreshItemsEntry.getKey();
for (List chunk :
Iterables.partition(tldRefreshItemsEntry.getValue(), tldUpdateBatchSize)) {
@@ -181,14 +310,5 @@ public final class ReadDnsQueueAction implements Runnable {
}
}
}
- Set tasksToDelete = difference(ImmutableSet.copyOf(tasks), tasksToKeep);
- // Either delete or drop the lease of each task.
- logger.infofmt("Deleting %d DNS update tasks.", tasksToDelete.size());
- dnsQueue.deleteTasks(ImmutableList.copyOf(tasksToDelete));
- logger.infofmt("Dropping %d DNS update tasks.", tasksToKeep.size());
- for (TaskHandle task : tasksToKeep) {
- dnsQueue.dropTaskLease(task);
- }
- logger.infofmt("Done processing DNS tasks.");
}
}
diff --git a/javatests/google/registry/dns/ReadDnsQueueActionTest.java b/javatests/google/registry/dns/ReadDnsQueueActionTest.java
index 55d72880b..05a8062e0 100644
--- a/javatests/google/registry/dns/ReadDnsQueueActionTest.java
+++ b/javatests/google/registry/dns/ReadDnsQueueActionTest.java
@@ -15,15 +15,20 @@
package google.registry.dns;
import static com.google.appengine.api.taskqueue.QueueFactory.getQueue;
+import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Lists.transform;
+import static com.google.common.truth.Truth.assertThat;
+import static com.google.common.truth.Truth8.assertThat;
import static google.registry.dns.DnsConstants.DNS_PUBLISH_PUSH_QUEUE_NAME;
import static google.registry.dns.DnsConstants.DNS_PULL_QUEUE_NAME;
import static google.registry.dns.DnsConstants.DNS_TARGET_NAME_PARAM;
import static google.registry.dns.DnsConstants.DNS_TARGET_TYPE_PARAM;
+import static google.registry.request.RequestParameters.PARAM_TLD;
import static google.registry.testing.DatastoreHelper.createTlds;
import static google.registry.testing.DatastoreHelper.persistResource;
import static google.registry.testing.TaskQueueHelper.assertNoTasksEnqueued;
import static google.registry.testing.TaskQueueHelper.assertTasksEnqueued;
+import static google.registry.testing.TaskQueueHelper.getQueuedParams;
import com.google.appengine.api.taskqueue.QueueFactory;
import com.google.appengine.api.taskqueue.TaskOptions;
@@ -45,6 +50,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
import java.util.Optional;
+import java.util.stream.IntStream;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Duration;
@@ -100,7 +106,8 @@ public class ReadDnsQueueActionTest {
private void run() throws Exception {
ReadDnsQueueAction action = new ReadDnsQueueAction();
action.tldUpdateBatchSize = TEST_TLD_UPDATE_BATCH_SIZE;
- action.writeLockTimeout = Duration.standardSeconds(10);
+ action.requestedMaximumDuration = Duration.standardSeconds(10);
+ action.clock = clock;
action.dnsQueue = dnsQueue;
action.dnsPublishPushQueue = QueueFactory.getQueue(DNS_PUBLISH_PUSH_QUEUE_NAME);
action.taskEnqueuer = new TaskEnqueuer(new Retrier(null, 1));
@@ -119,6 +126,12 @@ public class ReadDnsQueueActionTest {
return options.param("tld", tld);
}
+ private static TaskMatcher createDomainRefreshTaskMatcher(String name) {
+ return new TaskMatcher()
+ .param(DNS_TARGET_NAME_PARAM, name)
+ .param(DNS_TARGET_TYPE_PARAM, TargetType.DOMAIN.toString());
+ }
+
private void assertTldsEnqueuedInPushQueue(ImmutableMultimap tldsToDnsWriters)
throws Exception {
assertTasksEnqueued(
@@ -158,6 +171,30 @@ public class ReadDnsQueueActionTest {
ImmutableMultimap.of("com", "comWriter", "net", "netWriter", "example", "exampleWriter"));
}
+ @Test
+ public void testSuccess_moreUpdatesThanQueueBatchSize() throws Exception {
+ // The task queue has a batch size of 1000 (that's the maximum number of items you can lease at
+ // once).
+ ImmutableList domains =
+ IntStream.range(0, 1500)
+ .mapToObj(i -> String.format("domain_%04d.com", i))
+ .collect(toImmutableList());
+ domains.forEach(dnsQueue::addDomainRefreshTask);
+ run();
+ assertNoTasksEnqueued(DNS_PULL_QUEUE_NAME);
+ ImmutableList> queuedParams =
+ getQueuedParams(DNS_PUBLISH_PUSH_QUEUE_NAME);
+ // ReadDnsQueueAction batches items per TLD in batches of size 100.
+ // So for 1500 items in the DNS queue, we expect 15 items in the push queue
+ assertThat(queuedParams).hasSize(15);
+ // Check all the expected domains are indeed enqueued
+ assertThat(
+ queuedParams
+ .stream()
+ .flatMap(params -> params.get("domains").stream()))
+ .containsExactlyElementsIn(domains);
+ }
+
@Test
public void testSuccess_twoDnsWriters() throws Exception {
persistResource(
@@ -172,13 +209,117 @@ public class ReadDnsQueueActionTest {
}
@Test
- public void testSuccess_oneTldPaused() throws Exception {
+ public void testSuccess_oneTldPaused_returnedToQueue() throws Exception {
persistResource(Registry.get("net").asBuilder().setDnsPaused(true).build());
dnsQueue.addDomainRefreshTask("domain.com");
dnsQueue.addDomainRefreshTask("domain.net");
dnsQueue.addDomainRefreshTask("domain.example");
run();
- assertTasksEnqueued(DNS_PULL_QUEUE_NAME, new TaskMatcher());
+ assertTasksEnqueued(DNS_PULL_QUEUE_NAME, createDomainRefreshTaskMatcher("domain.net"));
+ assertTldsEnqueuedInPushQueue(
+ ImmutableMultimap.of("com", "comWriter", "example", "exampleWriter"));
+ }
+
+ @Test
+ public void testSuccess_oneTldUnknown_returnedToQueue() throws Exception {
+ dnsQueue.addDomainRefreshTask("domain.com");
+ dnsQueue.addDomainRefreshTask("domain.example");
+ QueueFactory.getQueue(DNS_PULL_QUEUE_NAME)
+ .add(
+ TaskOptions.Builder.withDefaults()
+ .method(Method.PULL)
+ .param(DNS_TARGET_TYPE_PARAM, TargetType.DOMAIN.toString())
+ .param(DNS_TARGET_NAME_PARAM, "domain.unknown")
+ .param(PARAM_TLD, "unknown"));
+ run();
+ assertTasksEnqueued(DNS_PULL_QUEUE_NAME, createDomainRefreshTaskMatcher("domain.unknown"));
+ assertTldsEnqueuedInPushQueue(
+ ImmutableMultimap.of("com", "comWriter", "example", "exampleWriter"));
+ }
+
+ @Test
+ public void testSuccess_corruptTaskTldMismatch_published() throws Exception {
+ // TODO(mcilwain): what's the correct action to take in this case?
+ dnsQueue.addDomainRefreshTask("domain.com");
+ dnsQueue.addDomainRefreshTask("domain.example");
+ QueueFactory.getQueue(DNS_PULL_QUEUE_NAME)
+ .add(
+ TaskOptions.Builder.withDefaults()
+ .method(Method.PULL)
+ .param(DNS_TARGET_TYPE_PARAM, TargetType.DOMAIN.toString())
+ .param(DNS_TARGET_NAME_PARAM, "domain.wrongtld")
+ .param(PARAM_TLD, "net"));
+ run();
+ assertNoTasksEnqueued(DNS_PULL_QUEUE_NAME);
+ assertTldsEnqueuedInPushQueue(
+ ImmutableMultimap.of("com", "comWriter", "example", "exampleWriter", "net", "netWriter"));
+ }
+
+ @Test
+ public void testSuccess_corruptTaskNoTld_discarded() throws Exception {
+ dnsQueue.addDomainRefreshTask("domain.com");
+ dnsQueue.addDomainRefreshTask("domain.example");
+ QueueFactory.getQueue(DNS_PULL_QUEUE_NAME)
+ .add(
+ TaskOptions.Builder.withDefaults()
+ .method(Method.PULL)
+ .param(DNS_TARGET_TYPE_PARAM, TargetType.DOMAIN.toString())
+ .param(DNS_TARGET_NAME_PARAM, "domain.net"));
+ run();
+ // The corrupt task isn't in the pull queue, but also isn't in the push queue
+ assertNoTasksEnqueued(DNS_PULL_QUEUE_NAME);
+ assertTldsEnqueuedInPushQueue(
+ ImmutableMultimap.of("com", "comWriter", "example", "exampleWriter"));
+ }
+
+ @Test
+ public void testSuccess_corruptTaskNoName_discarded() throws Exception {
+ dnsQueue.addDomainRefreshTask("domain.com");
+ dnsQueue.addDomainRefreshTask("domain.example");
+ QueueFactory.getQueue(DNS_PULL_QUEUE_NAME)
+ .add(
+ TaskOptions.Builder.withDefaults()
+ .method(Method.PULL)
+ .param(DNS_TARGET_TYPE_PARAM, TargetType.DOMAIN.toString())
+ .param(PARAM_TLD, "net"));
+ run();
+ // The corrupt task isn't in the pull queue, but also isn't in the push queue
+ assertNoTasksEnqueued(DNS_PULL_QUEUE_NAME);
+ assertTldsEnqueuedInPushQueue(
+ ImmutableMultimap.of("com", "comWriter", "example", "exampleWriter"));
+ }
+
+ @Test
+ public void testSuccess_corruptTaskNoType_discarded() throws Exception {
+ dnsQueue.addDomainRefreshTask("domain.com");
+ dnsQueue.addDomainRefreshTask("domain.example");
+ QueueFactory.getQueue(DNS_PULL_QUEUE_NAME)
+ .add(
+ TaskOptions.Builder.withDefaults()
+ .method(Method.PULL)
+ .param(DNS_TARGET_NAME_PARAM, "domain.net")
+ .param(PARAM_TLD, "net"));
+ run();
+ // The corrupt task isn't in the pull queue, but also isn't in the push queue
+ assertNoTasksEnqueued(DNS_PULL_QUEUE_NAME);
+ assertTldsEnqueuedInPushQueue(
+ ImmutableMultimap.of("com", "comWriter", "example", "exampleWriter"));
+ }
+
+ @Test
+ public void testSuccess_corruptTaskWrongType_discarded() throws Exception {
+ dnsQueue.addDomainRefreshTask("domain.com");
+ dnsQueue.addDomainRefreshTask("domain.example");
+ QueueFactory.getQueue(DNS_PULL_QUEUE_NAME)
+ .add(
+ TaskOptions.Builder.withDefaults()
+ .method(Method.PULL)
+ .param(DNS_TARGET_TYPE_PARAM, "Wrong type")
+ .param(DNS_TARGET_NAME_PARAM, "domain.net")
+ .param(PARAM_TLD, "net"));
+ run();
+ // The corrupt task isn't in the pull queue, but also isn't in the push queue
+ assertNoTasksEnqueued(DNS_PULL_QUEUE_NAME);
assertTldsEnqueuedInPushQueue(
ImmutableMultimap.of("com", "comWriter", "example", "exampleWriter"));
}
diff --git a/javatests/google/registry/testing/BUILD b/javatests/google/registry/testing/BUILD
index 41ec72877..7b71f99fd 100644
--- a/javatests/google/registry/testing/BUILD
+++ b/javatests/google/registry/testing/BUILD
@@ -22,6 +22,7 @@ java_library(
exports = ["//third_party/junit"],
deps = [
"//java/google/registry/config",
+ "//java/google/registry/dns",
"//java/google/registry/dns:constants",
"//java/google/registry/dns/writer",
"//java/google/registry/flows",
diff --git a/javatests/google/registry/testing/TaskQueueHelper.java b/javatests/google/registry/testing/TaskQueueHelper.java
index f3496784e..8d3b4d89f 100644
--- a/javatests/google/registry/testing/TaskQueueHelper.java
+++ b/javatests/google/registry/testing/TaskQueueHelper.java
@@ -18,6 +18,7 @@ import static com.google.appengine.tools.development.testing.LocalTaskQueueTestC
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Predicates.in;
import static com.google.common.base.Predicates.not;
+import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterables.getFirst;
import static com.google.common.collect.Multisets.containsOccurrences;
import static com.google.common.truth.Truth.assertThat;
@@ -258,6 +259,15 @@ public class TaskQueueHelper {
}
}
+ public static ImmutableList> getQueuedParams(String queueName) {
+ return getQueueInfo(queueName)
+ .getTaskInfo()
+ .stream()
+ .map(MatchableTaskInfo::new)
+ .map(taskInfo -> ImmutableMultimap.copyOf(taskInfo.params))
+ .collect(toImmutableList());
+ }
+
/** Empties the task queue. */
public static void clearTaskQueue(String queueName) throws Exception {
getLocalTaskQueue().flushQueue(queueName);