mirror of
https://github.com/google/nomulus.git
synced 2025-05-02 04:57:51 +02:00
Added: - dns/update_latency, which measures the time since a DNS update was added to the pull queue until that updates is committed to the DnsWriter - - It doesn't check that after being committed, it was actually published in the DNS. - dns/publish_queue_delay, which measures how long since the initial insertion to the push queue until a publishDnsUpdate action was handled. It measures both for successes (which is what we care about) and various failures (which are important because the success for that publishDnsUpdate will be > than any of the previous failures) ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=185995678
337 lines
14 KiB
Java
337 lines
14 KiB
Java
// Copyright 2017 The Nomulus Authors. All Rights Reserved.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package google.registry.dns;
|
|
|
|
import static com.google.appengine.api.taskqueue.TaskOptions.Builder.withUrl;
|
|
import static com.google.common.collect.Sets.difference;
|
|
import static google.registry.dns.DnsConstants.DNS_PUBLISH_PUSH_QUEUE_NAME;
|
|
import static google.registry.dns.DnsConstants.DNS_TARGET_CREATE_TIME_PARAM;
|
|
import static google.registry.dns.DnsConstants.DNS_TARGET_NAME_PARAM;
|
|
import static google.registry.dns.DnsConstants.DNS_TARGET_TYPE_PARAM;
|
|
import static java.util.concurrent.TimeUnit.SECONDS;
|
|
|
|
import com.google.appengine.api.taskqueue.Queue;
|
|
import com.google.appengine.api.taskqueue.TaskHandle;
|
|
import com.google.appengine.api.taskqueue.TaskOptions;
|
|
import com.google.auto.value.AutoValue;
|
|
import com.google.common.collect.ComparisonChain;
|
|
import com.google.common.collect.ImmutableMap;
|
|
import com.google.common.collect.ImmutableSet;
|
|
import com.google.common.collect.ImmutableSetMultimap;
|
|
import com.google.common.collect.Iterables;
|
|
import com.google.common.collect.Ordering;
|
|
import google.registry.config.RegistryConfig.Config;
|
|
import google.registry.dns.DnsConstants.TargetType;
|
|
import google.registry.model.registry.Registries;
|
|
import google.registry.model.registry.Registry;
|
|
import google.registry.request.Action;
|
|
import google.registry.request.Parameter;
|
|
import google.registry.request.RequestParameters;
|
|
import google.registry.request.auth.Auth;
|
|
import google.registry.util.Clock;
|
|
import google.registry.util.FormattingLogger;
|
|
import google.registry.util.TaskEnqueuer;
|
|
import java.io.UnsupportedEncodingException;
|
|
import java.util.Collection;
|
|
import java.util.Comparator;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Optional;
|
|
import java.util.Random;
|
|
import javax.inject.Inject;
|
|
import javax.inject.Named;
|
|
import org.joda.time.DateTime;
|
|
import org.joda.time.Duration;
|
|
|
|
/**
|
|
* Action for fanning out DNS refresh tasks by TLD, using data taken from the DNS pull queue.
|
|
*
|
|
* <h3>Parameters Reference</h3>
|
|
*
|
|
* <ul>
|
|
* <li>{@code jitterSeconds} Randomly delay each task by up to this many seconds.
|
|
* </ul>
|
|
*/
|
|
@Action(
|
|
path = "/_dr/cron/readDnsQueue",
|
|
automaticallyPrintOk = true,
|
|
auth = Auth.AUTH_INTERNAL_ONLY
|
|
)
|
|
public final class ReadDnsQueueAction implements Runnable {
|
|
|
|
private static final String PARAM_JITTER_SECONDS = "jitterSeconds";
|
|
private static final Random random = new Random();
|
|
private static final FormattingLogger logger = FormattingLogger.getLoggerForCallerClass();
|
|
|
|
/**
|
|
* Buffer time since the end of this action until retriable tasks are available again.
|
|
*
|
|
* <p>We read batches of tasks from the queue in a loop. Any task that will need to be retried has
|
|
* to be kept out of the queue for the duration of this action - otherwise we will lease it again
|
|
* in a subsequent loop.
|
|
*
|
|
* <p>The 'requestedMaximumDuration' value is the maximum delay between the first and last calls
|
|
* to lease tasks, hence we want the lease duration to be (slightly) longer than that.
|
|
* LEASE_PADDING is the value we add to {@link #requestedMaximumDuration} to make sure the lease
|
|
* duration is indeed longer.
|
|
*/
|
|
private static final Duration LEASE_PADDING = Duration.standardMinutes(1);
|
|
|
|
@Inject @Config("dnsTldUpdateBatchSize") int tldUpdateBatchSize;
|
|
@Inject @Config("readDnsQueueActionRuntime") Duration requestedMaximumDuration;
|
|
@Inject @Named(DNS_PUBLISH_PUSH_QUEUE_NAME) Queue dnsPublishPushQueue;
|
|
@Inject @Parameter(PARAM_JITTER_SECONDS) Optional<Integer> jitterSeconds;
|
|
@Inject Clock clock;
|
|
@Inject DnsQueue dnsQueue;
|
|
@Inject TaskEnqueuer taskEnqueuer;
|
|
@Inject ReadDnsQueueAction() {}
|
|
|
|
/** Container for items we pull out of the DNS pull queue and process for fanout. */
|
|
@AutoValue
|
|
abstract static class RefreshItem implements Comparable<RefreshItem> {
|
|
static RefreshItem create(TargetType type, String name, DateTime creationTime) {
|
|
return new AutoValue_ReadDnsQueueAction_RefreshItem(type, name, creationTime);
|
|
}
|
|
|
|
abstract TargetType type();
|
|
|
|
abstract String name();
|
|
|
|
abstract DateTime creationTime();
|
|
|
|
@Override
|
|
public int compareTo(RefreshItem other) {
|
|
return ComparisonChain.start()
|
|
.compare(this.type(), other.type())
|
|
.compare(this.name(), other.name())
|
|
.compare(this.creationTime(), other.creationTime())
|
|
.result();
|
|
}
|
|
}
|
|
|
|
/** Leases all tasks from the pull queue and creates per-tld update actions for them. */
|
|
@Override
|
|
public void run() {
|
|
DateTime requestedEndTime = clock.nowUtc().plus(requestedMaximumDuration);
|
|
ImmutableSet<String> tlds = Registries.getTlds();
|
|
while (requestedEndTime.isAfterNow()) {
|
|
List<TaskHandle> tasks = dnsQueue.leaseTasks(requestedMaximumDuration.plus(LEASE_PADDING));
|
|
logger.infofmt("Leased %d DNS update tasks.", tasks.size());
|
|
if (!tasks.isEmpty()) {
|
|
dispatchTasks(ImmutableSet.copyOf(tasks), tlds);
|
|
}
|
|
if (tasks.size() < dnsQueue.getLeaseTasksBatchSize()) {
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
/** A set of tasks grouped based on the action to take on them. */
|
|
@AutoValue
|
|
abstract static class ClassifiedTasks {
|
|
|
|
/**
|
|
* List of tasks we want to keep in the queue (want to retry in the future).
|
|
*
|
|
* <p>Normally, any task we lease from the queue will be deleted - either because we are going
|
|
* to process it now (these tasks are part of refreshItemsByTld), or because these tasks are
|
|
* "corrupt" in some way (don't parse, don't have the required parameters etc.).
|
|
*
|
|
* <p>Some tasks however are valid, but can't be processed at the moment. These tasks will be
|
|
* kept in the queue for future processing.
|
|
*
|
|
* <p>This includes tasks belonging to paused TLDs (which we want to process once the TLD is
|
|
* unpaused) and tasks belonging to (currently) unknown TLDs.
|
|
*/
|
|
abstract ImmutableSet<TaskHandle> tasksToKeep();
|
|
|
|
/** The paused TLDs for which we found at least one refresh request. */
|
|
abstract ImmutableSet<String> pausedTlds();
|
|
|
|
/**
|
|
* The unknown TLDs for which we found at least one refresh request.
|
|
*
|
|
* <p>Unknown TLDs might have valid requests because the list of TLDs is heavily cached. Hence,
|
|
* when we add a new TLD - it is possible that some instances will not have that TLD in their
|
|
* list yet. We don't want to discard these tasks, so we wait a bit and retry them again.
|
|
*
|
|
* <p>This is less likely for production TLDs but is quite likely for test TLDs where we might
|
|
* create a TLD and then use it within seconds.
|
|
*/
|
|
abstract ImmutableSet<String> unknownTlds();
|
|
|
|
/**
|
|
* All the refresh items we need to actually fulfill, grouped by TLD.
|
|
*
|
|
* <p>By default, the multimap is ordered - this can be changed with the builder.
|
|
*
|
|
* <p>The items for each TLD will be grouped together, and domains and hosts will be grouped
|
|
* within a TLD.
|
|
*
|
|
* <p>The grouping and ordering of domains and hosts is not technically necessary, but a
|
|
* predictable ordering makes it possible to write detailed tests.
|
|
*/
|
|
abstract ImmutableSetMultimap<String, RefreshItem> refreshItemsByTld();
|
|
|
|
static Builder builder() {
|
|
Builder builder = new AutoValue_ReadDnsQueueAction_ClassifiedTasks.Builder();
|
|
builder
|
|
.refreshItemsByTldBuilder()
|
|
.orderKeysBy(Ordering.natural())
|
|
.orderValuesBy(Ordering.natural());
|
|
return builder;
|
|
}
|
|
|
|
@AutoValue.Builder
|
|
abstract static class Builder {
|
|
abstract ImmutableSet.Builder<TaskHandle> tasksToKeepBuilder();
|
|
abstract ImmutableSet.Builder<String> pausedTldsBuilder();
|
|
abstract ImmutableSet.Builder<String> unknownTldsBuilder();
|
|
abstract ImmutableSetMultimap.Builder<String, RefreshItem> refreshItemsByTldBuilder();
|
|
|
|
abstract ClassifiedTasks build();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Creates per-tld update actions for tasks leased from the pull queue.
|
|
*
|
|
* <p>Will return "irrelevant" tasks to the queue for future processing. "Irrelevant" tasks are
|
|
* tasks for paused TLDs or tasks for TLDs not part of {@link Registries#getTlds()}.
|
|
*/
|
|
private void dispatchTasks(ImmutableSet<TaskHandle> tasks, ImmutableSet<String> tlds) {
|
|
ClassifiedTasks classifiedTasks = classifyTasks(tasks, tlds, clock.nowUtc());
|
|
if (!classifiedTasks.pausedTlds().isEmpty()) {
|
|
logger.infofmt("The dns-pull queue is paused for TLDs: %s.", classifiedTasks.pausedTlds());
|
|
}
|
|
if (!classifiedTasks.unknownTlds().isEmpty()) {
|
|
logger.warningfmt(
|
|
"The dns-pull queue has unknown TLDs: %s.", classifiedTasks.unknownTlds());
|
|
}
|
|
enqueueUpdates(classifiedTasks.refreshItemsByTld());
|
|
if (!classifiedTasks.tasksToKeep().isEmpty()) {
|
|
logger.warningfmt(
|
|
"Keeping %d DNS update tasks in the queue.", classifiedTasks.tasksToKeep().size());
|
|
}
|
|
// Delete the tasks we don't want to see again from the queue.
|
|
//
|
|
// tasksToDelete includes both the tasks that we already fulfilled in this call (were part of
|
|
// refreshItemsByTld) and "corrupt" tasks we weren't able to parse correctly (this shouldn't
|
|
// happen, and we logged a "severe" error)
|
|
//
|
|
// We let the lease on the rest of the tasks (the tasks we want to keep) expire on its own. The
|
|
// reason we don't release these tasks back to the queue immediately is that we are going to
|
|
// immediately read another batch of tasks from the queue - and we don't want to get the same
|
|
// tasks again.
|
|
ImmutableSet<TaskHandle> tasksToDelete =
|
|
difference(tasks, classifiedTasks.tasksToKeep()).immutableCopy();
|
|
logger.infofmt("Removing %d DNS update tasks from the queue.", tasksToDelete.size());
|
|
dnsQueue.deleteTasks(tasksToDelete.asList());
|
|
logger.infofmt("Done processing DNS tasks.");
|
|
}
|
|
|
|
/**
|
|
* Classifies the given tasks based on what action we need to take on them.
|
|
*
|
|
* <p>Note that some tasks might appear in multiple categories (if multiple actions are to be
|
|
* taken on them) or in no category (if no action is to be taken on them)
|
|
*/
|
|
private static ClassifiedTasks classifyTasks(
|
|
ImmutableSet<TaskHandle> tasks, ImmutableSet<String> tlds, DateTime now) {
|
|
|
|
ClassifiedTasks.Builder classifiedTasksBuilder = ClassifiedTasks.builder();
|
|
|
|
// Read all tasks on the DNS pull queue and load them into the refresh item multimap.
|
|
for (TaskHandle task : tasks) {
|
|
try {
|
|
Map<String, String> params = ImmutableMap.copyOf(task.extractParams());
|
|
// We allow 'null' create-time for the transition period - and during that time we set the
|
|
// create-time to "now".
|
|
//
|
|
// TODO(b/73343464):remove support for null create-time once transition is over.
|
|
DateTime creationTime =
|
|
Optional.ofNullable(params.get(DNS_TARGET_CREATE_TIME_PARAM))
|
|
.map(DateTime::parse)
|
|
.orElse(now);
|
|
String tld = params.get(RequestParameters.PARAM_TLD);
|
|
if (tld == null) {
|
|
logger.severefmt("Discarding invalid DNS refresh request %s; no TLD specified.", task);
|
|
} else if (!tlds.contains(tld)) {
|
|
classifiedTasksBuilder.tasksToKeepBuilder().add(task);
|
|
classifiedTasksBuilder.unknownTldsBuilder().add(tld);
|
|
} else if (Registry.get(tld).getDnsPaused()) {
|
|
classifiedTasksBuilder.tasksToKeepBuilder().add(task);
|
|
classifiedTasksBuilder.pausedTldsBuilder().add(tld);
|
|
} else {
|
|
String typeString = params.get(DNS_TARGET_TYPE_PARAM);
|
|
String name = params.get(DNS_TARGET_NAME_PARAM);
|
|
TargetType type = TargetType.valueOf(typeString);
|
|
switch (type) {
|
|
case DOMAIN:
|
|
case HOST:
|
|
classifiedTasksBuilder
|
|
.refreshItemsByTldBuilder()
|
|
.put(tld, RefreshItem.create(type, name, creationTime));
|
|
break;
|
|
default:
|
|
logger.severefmt("Discarding DNS refresh request %s of type %s.", task, typeString);
|
|
break;
|
|
}
|
|
}
|
|
} catch (RuntimeException | UnsupportedEncodingException e) {
|
|
logger.severefmt(e, "Discarding invalid DNS refresh request %s.", task);
|
|
}
|
|
}
|
|
return classifiedTasksBuilder.build();
|
|
}
|
|
|
|
private void enqueueUpdates(ImmutableSetMultimap<String, RefreshItem> refreshItemsByTld) {
|
|
// Loop through the multimap by TLD and generate refresh tasks for the hosts and domains for
|
|
// each configured DNS writer.
|
|
for (Map.Entry<String, Collection<RefreshItem>> tldRefreshItemsEntry
|
|
: refreshItemsByTld.asMap().entrySet()) {
|
|
String tld = tldRefreshItemsEntry.getKey();
|
|
for (List<RefreshItem> chunk :
|
|
Iterables.partition(tldRefreshItemsEntry.getValue(), tldUpdateBatchSize)) {
|
|
DateTime earliestCreateTime =
|
|
chunk.stream().map(RefreshItem::creationTime).min(Comparator.naturalOrder()).get();
|
|
for (String dnsWriter : Registry.get(tld).getDnsWriters()) {
|
|
TaskOptions options =
|
|
withUrl(PublishDnsUpdatesAction.PATH)
|
|
.countdownMillis(
|
|
jitterSeconds.isPresent()
|
|
? random.nextInt((int) SECONDS.toMillis(jitterSeconds.get()))
|
|
: 0)
|
|
.param(RequestParameters.PARAM_TLD, tld)
|
|
.param(PublishDnsUpdatesAction.PARAM_DNS_WRITER, dnsWriter)
|
|
.param(
|
|
PublishDnsUpdatesAction.PARAM_PUBLISH_TASK_ENQUEUED,
|
|
clock.nowUtc().toString())
|
|
.param(
|
|
PublishDnsUpdatesAction.PARAM_REFRESH_REQUEST_CREATED,
|
|
earliestCreateTime.toString());
|
|
for (RefreshItem refreshItem : chunk) {
|
|
options.param(
|
|
(refreshItem.type() == TargetType.HOST)
|
|
? PublishDnsUpdatesAction.PARAM_HOSTS
|
|
: PublishDnsUpdatesAction.PARAM_DOMAINS,
|
|
refreshItem.name());
|
|
}
|
|
taskEnqueuer.enqueue(dnsPublishPushQueue, options);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|