Add capability to sync DNS using multiple writers if configured

This is written in such a way that it can safely handle task items in the
old format so long as the DNS writer to use for the given TLD is unambiguous
(which it is for now, until we allow multiple DNS writers to be configured).

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=162293412
This commit is contained in:
mcilwain 2017-07-17 17:05:47 -07:00 committed by Ben McIlwain
parent c88a776741
commit 4a921973ea
6 changed files with 131 additions and 60 deletions

View file

@ -72,17 +72,17 @@ import org.joda.time.Duration;
)
public final class ReadDnsQueueAction implements Runnable {
public static final String KEEP_TASKS_PARAM = "keepTasks";
public static final String PARAM_KEEP_TASKS = "keepTasks";
private static final String JITTER_SECONDS_PARAM = "jitterSeconds";
private static final String PARAM_JITTER_SECONDS = "jitterSeconds";
private static final Random random = new Random();
private static final FormattingLogger logger = FormattingLogger.getLoggerForCallerClass();
@Inject @Config("dnsTldUpdateBatchSize") int tldUpdateBatchSize;
@Inject @Config("dnsWriteLockTimeout") Duration writeLockTimeout;
@Inject @Named(DNS_PUBLISH_PUSH_QUEUE_NAME) Queue dnsPublishPushQueue;
@Inject @Parameter(JITTER_SECONDS_PARAM) Optional<Integer> jitterSeconds;
@Inject @Parameter(KEEP_TASKS_PARAM) boolean keepTasks;
@Inject @Parameter(PARAM_JITTER_SECONDS) Optional<Integer> jitterSeconds;
@Inject @Parameter(PARAM_KEEP_TASKS) boolean keepTasks;
@Inject DnsQueue dnsQueue;
@Inject TaskEnqueuer taskEnqueuer;
@Inject ReadDnsQueueAction() {}
@ -161,24 +161,29 @@ public final class ReadDnsQueueAction implements Runnable {
if (!pausedTlds.isEmpty()) {
logger.infofmt("the dns-pull queue is paused for tlds: %s", pausedTlds);
}
// Loop through the multimap by TLD and generate refresh tasks for the hosts and domains.
// Loop through the multimap by TLD and generate refresh tasks for the hosts and domains for
// each configured DNS writer.
for (Map.Entry<String, Collection<RefreshItem>> tldRefreshItemsEntry
: refreshItemMultimap.asMap().entrySet()) {
for (List<RefreshItem> chunk : Iterables.partition(
tldRefreshItemsEntry.getValue(), tldUpdateBatchSize)) {
TaskOptions options = withUrl(PublishDnsUpdatesAction.PATH)
.countdownMillis(jitterSeconds.isPresent()
? random.nextInt((int) SECONDS.toMillis(jitterSeconds.get()))
: 0)
.param(RequestParameters.PARAM_TLD, tldRefreshItemsEntry.getKey());
for (RefreshItem refreshItem : chunk) {
options.param(
(refreshItem.type() == TargetType.HOST)
? PublishDnsUpdatesAction.HOSTS_PARAM
: PublishDnsUpdatesAction.DOMAINS_PARAM,
refreshItem.name());
String tld = tldRefreshItemsEntry.getKey();
for (List<RefreshItem> chunk :
Iterables.partition(tldRefreshItemsEntry.getValue(), tldUpdateBatchSize)) {
for (String dnsWriter : Registry.get(tld).getDnsWriters()) {
TaskOptions options = withUrl(PublishDnsUpdatesAction.PATH)
.countdownMillis(jitterSeconds.isPresent()
? random.nextInt((int) SECONDS.toMillis(jitterSeconds.get()))
: 0)
.param(RequestParameters.PARAM_TLD, tld)
.param(PublishDnsUpdatesAction.PARAM_DNS_WRITER, dnsWriter);
for (RefreshItem refreshItem : chunk) {
options.param(
(refreshItem.type() == TargetType.HOST)
? PublishDnsUpdatesAction.PARAM_HOSTS
: PublishDnsUpdatesAction.PARAM_DOMAINS,
refreshItem.name());
}
taskEnqueuer.enqueue(dnsPublishPushQueue, options);
}
taskEnqueuer.enqueue(dnsPublishPushQueue, options);
}
}
Set<TaskHandle> tasksToDelete = difference(ImmutableSet.copyOf(tasks), tasksToKeep);