google-nomulus/java/google/registry/dns/ReadDnsQueueAction.java
guyben bba975a991 Allow over 1000 dns-updates to be handled at once
The task-queue API only allows reading 1000 tasks at a time, hence the original reason for this limit. We get over that limit by reading (and processing) items from the queue in a loop - 1000 at a time.

This is important because the 1000 dns-updates are shared among all TLDs,
meaning that a TLD with >1000 waiting updates can affect the update latency of
other TLDs.

In addition, partially fixes the bug where if there are more than 1000 updates to paused
/ non-existing TLDs, we completely block all updated to all TLDs.

By partially fixed, I mean "if we have around 1000 updates to paused TLDs, we will read them every time ReadDnsUpdates is called, ignore then, and only then get to the actual updates we want to process".

This works for a number of 1000 updates waiting - but if paused TLDs have tens or hundreds of thousands of updates waiting - this might still choke up other TLDs (not to mention we keep reading / updating 10s or 100s of thousands of tasks in the queue, that's... bad.)

A more thorough fix will come in a future CL, as it requires a more thorough change in the code.

Note that the queue lease command supports a maximum of 10 QPS. Any more than
that - and we get errors / empty results. Hence we limit our QPS to 9 to be on
the safe side.

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=185218684
2018-02-20 15:42:09 -05:00

314 lines
13 KiB
Java

// Copyright 2017 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.dns;
import static com.google.appengine.api.taskqueue.TaskOptions.Builder.withUrl;
import static com.google.common.collect.Sets.difference;
import static google.registry.dns.DnsConstants.DNS_PUBLISH_PUSH_QUEUE_NAME;
import static google.registry.dns.DnsConstants.DNS_TARGET_NAME_PARAM;
import static google.registry.dns.DnsConstants.DNS_TARGET_TYPE_PARAM;
import static java.util.concurrent.TimeUnit.SECONDS;
import com.google.appengine.api.taskqueue.Queue;
import com.google.appengine.api.taskqueue.TaskHandle;
import com.google.appengine.api.taskqueue.TaskOptions;
import com.google.auto.value.AutoValue;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;
import google.registry.config.RegistryConfig.Config;
import google.registry.dns.DnsConstants.TargetType;
import google.registry.model.registry.Registries;
import google.registry.model.registry.Registry;
import google.registry.request.Action;
import google.registry.request.Parameter;
import google.registry.request.RequestParameters;
import google.registry.request.auth.Auth;
import google.registry.util.Clock;
import google.registry.util.FormattingLogger;
import google.registry.util.TaskEnqueuer;
import java.io.UnsupportedEncodingException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import javax.inject.Inject;
import javax.inject.Named;
import org.joda.time.DateTime;
import org.joda.time.Duration;
/**
* Action for fanning out DNS refresh tasks by TLD, using data taken from the DNS pull queue.
*
* <h3>Parameters Reference</h3>
*
* <ul>
* <li>{@code jitterSeconds} Randomly delay each task by up to this many seconds.
* </ul>
*/
@Action(
path = "/_dr/cron/readDnsQueue",
automaticallyPrintOk = true,
auth = Auth.AUTH_INTERNAL_ONLY
)
public final class ReadDnsQueueAction implements Runnable {
private static final String PARAM_JITTER_SECONDS = "jitterSeconds";
private static final Random random = new Random();
private static final FormattingLogger logger = FormattingLogger.getLoggerForCallerClass();
/**
* Buffer time since the end of this action until retriable tasks are available again.
*
* <p>We read batches of tasks from the queue in a loop. Any task that will need to be retried has
* to be kept out of the queue for the duration of this action - otherwise we will lease it again
* in a subsequent loop.
*
* <p>The 'requestedMaximumDuration' value is the maximum delay between the first and last calls
* to lease tasks, hence we want the lease duration to be (slightly) longer than that.
* LEASE_PADDING is the value we add to {@link #requestedMaximumDuration} to make sure the lease
* duration is indeed longer.
*/
private static final Duration LEASE_PADDING = Duration.standardMinutes(1);
@Inject @Config("dnsTldUpdateBatchSize") int tldUpdateBatchSize;
@Inject @Config("readDnsQueueActionRuntime") Duration requestedMaximumDuration;
@Inject @Named(DNS_PUBLISH_PUSH_QUEUE_NAME) Queue dnsPublishPushQueue;
@Inject @Parameter(PARAM_JITTER_SECONDS) Optional<Integer> jitterSeconds;
@Inject Clock clock;
@Inject DnsQueue dnsQueue;
@Inject TaskEnqueuer taskEnqueuer;
@Inject ReadDnsQueueAction() {}
/** Container for items we pull out of the DNS pull queue and process for fanout. */
@AutoValue
abstract static class RefreshItem implements Comparable<RefreshItem> {
static RefreshItem create(TargetType type, String name) {
return new AutoValue_ReadDnsQueueAction_RefreshItem(type, name);
}
abstract TargetType type();
abstract String name();
@Override
public int compareTo(RefreshItem other) {
return ComparisonChain.start()
.compare(this.type(), other.type())
.compare(this.name(), other.name())
.result();
}
}
/** Leases all tasks from the pull queue and creates per-tld update actions for them. */
@Override
public void run() {
DateTime requestedEndTime = clock.nowUtc().plus(requestedMaximumDuration);
ImmutableSet<String> tlds = Registries.getTlds();
while (requestedEndTime.isAfterNow()) {
List<TaskHandle> tasks = dnsQueue.leaseTasks(requestedMaximumDuration.plus(LEASE_PADDING));
logger.infofmt("Leased %d DNS update tasks.", tasks.size());
if (!tasks.isEmpty()) {
dispatchTasks(ImmutableSet.copyOf(tasks), tlds);
}
if (tasks.size() < dnsQueue.getLeaseTasksBatchSize()) {
return;
}
}
}
/** A set of tasks grouped based on the action to take on them. */
@AutoValue
abstract static class ClassifiedTasks {
/**
* List of tasks we want to keep in the queue (want to retry in the future).
*
* <p>Normally, any task we lease from the queue will be deleted - either because we are going
* to process it now (these tasks are part of refreshItemsByTld), or because these tasks are
* "corrupt" in some way (don't parse, don't have the required parameters etc.).
*
* <p>Some tasks however are valid, but can't be processed at the moment. These tasks will be
* kept in the queue for future processing.
*
* <p>This includes tasks belonging to paused TLDs (which we want to process once the TLD is
* unpaused) and tasks belonging to (currently) unknown TLDs.
*/
abstract ImmutableSet<TaskHandle> tasksToKeep();
/** The paused TLDs for which we found at least one refresh request. */
abstract ImmutableSet<String> pausedTlds();
/**
* The unknown TLDs for which we found at least one refresh request.
*
* <p>Unknown TLDs might have valid requests because the list of TLDs is heavily cached. Hence,
* when we add a new TLD - it is possible that some instances will not have that TLD in their
* list yet. We don't want to discard these tasks, so we wait a bit and retry them again.
*
* <p>This is less likely for production TLDs but is quite likely for test TLDs where we might
* create a TLD and then use it within seconds.
*/
abstract ImmutableSet<String> unknownTlds();
/**
* All the refresh items we need to actually fulfill, grouped by TLD.
*
* <p>By default, the multimap is ordered - this can be changed with the builder.
*
* <p>The items for each TLD will be grouped together, and domains and hosts will be grouped
* within a TLD.
*
* <p>The grouping and ordering of domains and hosts is not technically necessary, but a
* predictable ordering makes it possible to write detailed tests.
*/
abstract ImmutableSetMultimap<String, RefreshItem> refreshItemsByTld();
static Builder builder() {
Builder builder = new AutoValue_ReadDnsQueueAction_ClassifiedTasks.Builder();
builder
.refreshItemsByTldBuilder()
.orderKeysBy(Ordering.natural())
.orderValuesBy(Ordering.natural());
return builder;
}
@AutoValue.Builder
abstract static class Builder {
abstract ImmutableSet.Builder<TaskHandle> tasksToKeepBuilder();
abstract ImmutableSet.Builder<String> pausedTldsBuilder();
abstract ImmutableSet.Builder<String> unknownTldsBuilder();
abstract ImmutableSetMultimap.Builder<String, RefreshItem> refreshItemsByTldBuilder();
abstract ClassifiedTasks build();
}
}
/**
* Creates per-tld update actions for tasks leased from the pull queue.
*
* <p>Will return "irrelevant" tasks to the queue for future processing. "Irrelevant" tasks are
* tasks for paused TLDs or tasks for TLDs not part of {@link Registries#getTlds()}.
*/
private void dispatchTasks(ImmutableSet<TaskHandle> tasks, ImmutableSet<String> tlds) {
ClassifiedTasks classifiedTasks = classifyTasks(tasks, tlds);
if (!classifiedTasks.pausedTlds().isEmpty()) {
logger.infofmt("The dns-pull queue is paused for TLDs: %s.", classifiedTasks.pausedTlds());
}
if (!classifiedTasks.unknownTlds().isEmpty()) {
logger.warningfmt(
"The dns-pull queue has unknown TLDs: %s.", classifiedTasks.unknownTlds());
}
enqueueUpdates(classifiedTasks.refreshItemsByTld());
if (!classifiedTasks.tasksToKeep().isEmpty()) {
logger.warningfmt(
"Keeping %d DNS update tasks in the queue.", classifiedTasks.tasksToKeep().size());
}
// Delete the tasks we don't want to see again from the queue.
//
// tasksToDelete includes both the tasks that we already fulfilled in this call (were part of
// refreshItemsByTld) and "corrupt" tasks we weren't able to parse correctly (this shouldn't
// happen, and we logged a "severe" error)
//
// We let the lease on the rest of the tasks (the tasks we want to keep) expire on its own. The
// reason we don't release these tasks back to the queue immediately is that we are going to
// immediately read another batch of tasks from the queue - and we don't want to get the same
// tasks again.
ImmutableSet<TaskHandle> tasksToDelete =
difference(tasks, classifiedTasks.tasksToKeep()).immutableCopy();
logger.infofmt("Removing %d DNS update tasks from the queue.", tasksToDelete.size());
dnsQueue.deleteTasks(tasksToDelete.asList());
logger.infofmt("Done processing DNS tasks.");
}
/**
* Classifies the given tasks based on what action we need to take on them.
*
* <p>Note that some tasks might appear in multiple categories (if multiple actions are to be
* taken on them) or in no category (if no action is to be taken on them)
*/
private static ClassifiedTasks classifyTasks(
ImmutableSet<TaskHandle> tasks, ImmutableSet<String> tlds) {
ClassifiedTasks.Builder classifiedTasksBuilder = ClassifiedTasks.builder();
// Read all tasks on the DNS pull queue and load them into the refresh item multimap.
for (TaskHandle task : tasks) {
try {
Map<String, String> params = ImmutableMap.copyOf(task.extractParams());
String tld = params.get(RequestParameters.PARAM_TLD);
if (tld == null) {
logger.severefmt("Discarding invalid DNS refresh request %s; no TLD specified.", task);
} else if (!tlds.contains(tld)) {
classifiedTasksBuilder.tasksToKeepBuilder().add(task);
classifiedTasksBuilder.unknownTldsBuilder().add(tld);
} else if (Registry.get(tld).getDnsPaused()) {
classifiedTasksBuilder.tasksToKeepBuilder().add(task);
classifiedTasksBuilder.pausedTldsBuilder().add(tld);
} else {
String typeString = params.get(DNS_TARGET_TYPE_PARAM);
String name = params.get(DNS_TARGET_NAME_PARAM);
TargetType type = TargetType.valueOf(typeString);
switch (type) {
case DOMAIN:
case HOST:
classifiedTasksBuilder
.refreshItemsByTldBuilder()
.put(tld, RefreshItem.create(type, name));
break;
default:
logger.severefmt("Discarding DNS refresh request %s of type %s.", task, typeString);
break;
}
}
} catch (RuntimeException | UnsupportedEncodingException e) {
logger.severefmt(e, "Discarding invalid DNS refresh request %s.", task);
}
}
return classifiedTasksBuilder.build();
}
private void enqueueUpdates(ImmutableSetMultimap<String, RefreshItem> refreshItemsByTld) {
// Loop through the multimap by TLD and generate refresh tasks for the hosts and domains for
// each configured DNS writer.
for (Map.Entry<String, Collection<RefreshItem>> tldRefreshItemsEntry
: refreshItemsByTld.asMap().entrySet()) {
String tld = tldRefreshItemsEntry.getKey();
for (List<RefreshItem> chunk :
Iterables.partition(tldRefreshItemsEntry.getValue(), tldUpdateBatchSize)) {
for (String dnsWriter : Registry.get(tld).getDnsWriters()) {
TaskOptions options = withUrl(PublishDnsUpdatesAction.PATH)
.countdownMillis(jitterSeconds.isPresent()
? random.nextInt((int) SECONDS.toMillis(jitterSeconds.get()))
: 0)
.param(RequestParameters.PARAM_TLD, tld)
.param(PublishDnsUpdatesAction.PARAM_DNS_WRITER, dnsWriter);
for (RefreshItem refreshItem : chunk) {
options.param(
(refreshItem.type() == TargetType.HOST)
? PublishDnsUpdatesAction.PARAM_HOSTS
: PublishDnsUpdatesAction.PARAM_DOMAINS,
refreshItem.name());
}
taskEnqueuer.enqueue(dnsPublishPushQueue, options);
}
}
}
}
}