Add sharded DNS publishing capability

This enables sharded DNS publishing on a per-TLD basis. Instead of a TLD-wide lock, the sharded scheme locks each update on the shard number, allowing parallel writes to DNS.

We allow N (the number of shards) to be 0 or 1 for no sharding, and N > 1 for an N-way sharding scheme. Unless explicitly set, all TLDs default to a numShards of 0, so we don't have to reload all registry objects explicitly.

WARNING: This will change the lock name upon deployment for the PublishDnsAction from "<TLD> Dns Updates" to "<TLD> Dns Updates shard 0". This may cause concurrency issues if the underlying DNSWriter is not parallel-write tolerant (currently all production usages are ZonemanWriter, which is parallel-tolerant, so no issues are expected).

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=187525655
This commit is contained in:
larryruili 2018-03-01 13:33:46 -08:00 committed by jianglai
parent 24799b394d
commit fa989e754b
16 changed files with 474 additions and 64 deletions

View file

@ -15,11 +15,14 @@
package google.registry.dns;
import static com.google.appengine.api.taskqueue.TaskOptions.Builder.withUrl;
import static com.google.common.collect.ImmutableSetMultimap.toImmutableSetMultimap;
import static com.google.common.collect.Sets.difference;
import static google.registry.dns.DnsConstants.DNS_PUBLISH_PUSH_QUEUE_NAME;
import static google.registry.dns.DnsConstants.DNS_TARGET_CREATE_TIME_PARAM;
import static google.registry.dns.DnsConstants.DNS_TARGET_NAME_PARAM;
import static google.registry.dns.DnsConstants.DNS_TARGET_TYPE_PARAM;
import static google.registry.util.DomainNameUtils.getSecondLevelDomain;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.TimeUnit.SECONDS;
import com.google.appengine.api.taskqueue.Queue;
@ -32,6 +35,8 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import google.registry.config.RegistryConfig.Config;
import google.registry.dns.DnsConstants.TargetType;
import google.registry.model.registry.Registries;
@ -95,6 +100,7 @@ public final class ReadDnsQueueAction implements Runnable {
@Inject @Parameter(PARAM_JITTER_SECONDS) Optional<Integer> jitterSeconds;
@Inject Clock clock;
@Inject DnsQueue dnsQueue;
@Inject HashFunction hashFunction;
@Inject TaskEnqueuer taskEnqueuer;
@Inject ReadDnsQueueAction() {}
@ -220,7 +226,7 @@ public final class ReadDnsQueueAction implements Runnable {
logger.warningfmt(
"The dns-pull queue has unknown TLDs: %s.", classifiedTasks.unknownTlds());
}
enqueueUpdates(classifiedTasks.refreshItemsByTld());
bucketRefreshItems(classifiedTasks.refreshItemsByTld());
if (!classifiedTasks.tasksToKeep().isEmpty()) {
logger.warningfmt(
"Keeping %d DNS update tasks in the queue.", classifiedTasks.tasksToKeep().size());
@ -297,40 +303,85 @@ public final class ReadDnsQueueAction implements Runnable {
return classifiedTasksBuilder.build();
}
private void enqueueUpdates(ImmutableSetMultimap<String, RefreshItem> refreshItemsByTld) {
/**
* Subdivides the tld to {@link RefreshItem} multimap into buckets by lock index, if applicable.
*
* <p>If the tld has numDnsPublishLocks <= 1, we enqueue all updates on the default lock 1 of 1.
*/
private void bucketRefreshItems(ImmutableSetMultimap<String, RefreshItem> refreshItemsByTld) {
// Loop through the multimap by TLD and generate refresh tasks for the hosts and domains for
// each configured DNS writer.
for (Map.Entry<String, Collection<RefreshItem>> tldRefreshItemsEntry
: refreshItemsByTld.asMap().entrySet()) {
String tld = tldRefreshItemsEntry.getKey();
for (List<RefreshItem> chunk :
Iterables.partition(tldRefreshItemsEntry.getValue(), tldUpdateBatchSize)) {
DateTime earliestCreateTime =
chunk.stream().map(RefreshItem::creationTime).min(Comparator.naturalOrder()).get();
for (String dnsWriter : Registry.get(tld).getDnsWriters()) {
TaskOptions options =
withUrl(PublishDnsUpdatesAction.PATH)
.countdownMillis(
jitterSeconds.isPresent()
? random.nextInt((int) SECONDS.toMillis(jitterSeconds.get()))
: 0)
.param(RequestParameters.PARAM_TLD, tld)
.param(PublishDnsUpdatesAction.PARAM_DNS_WRITER, dnsWriter)
.param(
PublishDnsUpdatesAction.PARAM_PUBLISH_TASK_ENQUEUED,
clock.nowUtc().toString())
.param(
PublishDnsUpdatesAction.PARAM_REFRESH_REQUEST_CREATED,
earliestCreateTime.toString());
for (RefreshItem refreshItem : chunk) {
options.param(
(refreshItem.type() == TargetType.HOST)
? PublishDnsUpdatesAction.PARAM_HOSTS
: PublishDnsUpdatesAction.PARAM_DOMAINS,
refreshItem.name());
}
taskEnqueuer.enqueue(dnsPublishPushQueue, options);
int numPublishLocks = Registry.get(tld).getNumDnsPublishLocks();
// 1 lock or less implies no TLD-wide locks, simply enqueue everything under lock 1 of 1
if (numPublishLocks <= 1) {
enqueueUpdates(tld, 1, 1, tldRefreshItemsEntry.getValue());
} else {
tldRefreshItemsEntry
.getValue()
.stream()
.collect(
toImmutableSetMultimap(
refreshItem -> getLockIndex(tld, numPublishLocks, refreshItem),
refreshItem -> refreshItem))
.asMap()
.entrySet()
.forEach(
entry -> enqueueUpdates(tld, entry.getKey(), numPublishLocks, entry.getValue()));
}
}
}
/**
* Returns the lock index for a given refreshItem.
*
* <p>We hash the second level domain domain for all records, to group in-balliwick hosts (the
* only ones we refresh DNS for) with their superordinate domains. We use consistent hashing to
* determine the lock index because it gives us [0,N) bucketing properties out of the box, then
* add 1 to make indexes within [1,N].
*/
private int getLockIndex(String tld, int numPublishLocks, RefreshItem refreshItem) {
String domain = getSecondLevelDomain(refreshItem.name(), tld);
return Hashing.consistentHash(hashFunction.hashString(domain, UTF_8), numPublishLocks) + 1;
}
/**
* Creates DNS refresh tasks for all writers for the tld within a lock index and batches large
* updates into smaller chunks.
*/
private void enqueueUpdates(
String tld, int lockIndex, int numPublishLocks, Collection<RefreshItem> items) {
for (List<RefreshItem> chunk : Iterables.partition(items, tldUpdateBatchSize)) {
DateTime earliestCreateTime =
chunk.stream().map(RefreshItem::creationTime).min(Comparator.naturalOrder()).get();
for (String dnsWriter : Registry.get(tld).getDnsWriters()) {
TaskOptions options =
withUrl(PublishDnsUpdatesAction.PATH)
.countdownMillis(
jitterSeconds
.map(seconds -> random.nextInt((int) SECONDS.toMillis(seconds)))
.orElse(0))
.param(RequestParameters.PARAM_TLD, tld)
.param(PublishDnsUpdatesAction.PARAM_DNS_WRITER, dnsWriter)
.param(PublishDnsUpdatesAction.PARAM_LOCK_INDEX, Integer.toString(lockIndex))
.param(
PublishDnsUpdatesAction.PARAM_NUM_PUBLISH_LOCKS,
Integer.toString(numPublishLocks))
.param(
PublishDnsUpdatesAction.PARAM_PUBLISH_TASK_ENQUEUED, clock.nowUtc().toString())
.param(
PublishDnsUpdatesAction.PARAM_REFRESH_REQUEST_CREATED,
earliestCreateTime.toString());
for (RefreshItem refreshItem : chunk) {
options.param(
(refreshItem.type() == TargetType.HOST)
? PublishDnsUpdatesAction.PARAM_HOSTS
: PublishDnsUpdatesAction.PARAM_DOMAINS,
refreshItem.name());
}
taskEnqueuer.enqueue(dnsPublishPushQueue, options);
}
}
}