Add metrics for async batch operation processing

We want to know how long it's actually taking to process asynchronous
contact/host deletions and DNS refreshes on host renames. This adds
instrumentation. Five metrics are recorded as follows:

* An incrementable metric for each async task processed (split out by
  type of task and result).
* Two event metrics for processing time between when a task is enqueued
  and when it is processed -- tracked separately for contact/host
  deletion and DNS refresh on host rename.
* Two event metrics for batch size every time the two mapreduces are
  run (this is usually 0). Tracked separately for contact/host deletion
  and DNS refresh on host rename.

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=157001310
This commit is contained in:
mcilwain 2017-05-24 10:00:14 -07:00 committed by Ben McIlwain
parent 1adeb57fea
commit bb67841884
14 changed files with 671 additions and 154 deletions

View file

@ -18,7 +18,9 @@ import static com.google.appengine.api.taskqueue.QueueConstants.maxLeaseCount;
import static com.google.appengine.api.taskqueue.QueueFactory.getQueue;
import static com.google.common.base.Preconditions.checkNotNull;
import static google.registry.flows.async.AsyncFlowEnqueuer.PARAM_HOST_KEY;
import static google.registry.flows.async.AsyncFlowEnqueuer.PARAM_REQUESTED_TIME;
import static google.registry.flows.async.AsyncFlowEnqueuer.QUEUE_ASYNC_HOST_RENAME;
import static google.registry.flows.async.AsyncFlowMetrics.OperationType.DNS_REFRESH;
import static google.registry.mapreduce.inputs.EppResourceInputs.createEntityInput;
import static google.registry.model.EppResourceUtils.isActive;
import static google.registry.model.EppResourceUtils.isDeleted;
@ -27,6 +29,7 @@ import static google.registry.util.DateTimeUtils.latestOf;
import static google.registry.util.PipelineUtils.createJobPath;
import static java.util.concurrent.TimeUnit.DAYS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static org.joda.time.DateTimeZone.UTC;
import com.google.appengine.api.taskqueue.LeaseOptions;
import com.google.appengine.api.taskqueue.Queue;
@ -36,11 +39,14 @@ import com.google.appengine.tools.mapreduce.Mapper;
import com.google.appengine.tools.mapreduce.Reducer;
import com.google.appengine.tools.mapreduce.ReducerInput;
import com.google.auto.value.AutoValue;
import com.google.common.base.Optional;
import com.google.common.base.Function;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.googlecode.objectify.Key;
import google.registry.dns.DnsQueue;
import google.registry.flows.async.AsyncFlowMetrics;
import google.registry.flows.async.AsyncFlowMetrics.OperationResult;
import google.registry.mapreduce.MapreduceRunner;
import google.registry.mapreduce.inputs.NullInput;
import google.registry.model.domain.DomainResource;
@ -49,7 +55,9 @@ import google.registry.request.Action;
import google.registry.request.Response;
import google.registry.util.Clock;
import google.registry.util.FormattingLogger;
import google.registry.util.NonFinalForTesting;
import google.registry.util.Retrier;
import google.registry.util.SystemClock;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
@ -66,6 +74,7 @@ public class RefreshDnsOnHostRenameAction implements Runnable {
private static final FormattingLogger logger = FormattingLogger.getLoggerForCallerClass();
private static final long LEASE_MINUTES = 20;
@Inject AsyncFlowMetrics asyncFlowMetrics;
@Inject Clock clock;
@Inject MapreduceRunner mrRunner;
@Inject @Named(QUEUE_ASYNC_HOST_RENAME) Queue pullQueue;
@ -78,24 +87,24 @@ public class RefreshDnsOnHostRenameAction implements Runnable {
LeaseOptions options =
LeaseOptions.Builder.withCountLimit(maxLeaseCount()).leasePeriod(LEASE_MINUTES, MINUTES);
List<TaskHandle> tasks = pullQueue.leaseTasks(options);
asyncFlowMetrics.recordDnsRefreshBatchSize(tasks.size());
if (tasks.isEmpty()) {
response.setPayload("No DNS refresh on host rename tasks to process in pull queue.");
return;
}
ImmutableList.Builder<DnsRefreshRequest> requestsBuilder = new ImmutableList.Builder<>();
ImmutableList.Builder<Key<HostResource>> hostKeys = new ImmutableList.Builder<>();
final List<TaskHandle> tasksToDelete = new ArrayList<>();
final List<DnsRefreshRequest> requestsToDelete = new ArrayList<>();
for (TaskHandle task : tasks) {
try {
Optional<DnsRefreshRequest> request =
DnsRefreshRequest.createFromTask(task, clock.nowUtc());
if (request.isPresent()) {
requestsBuilder.add(request.get());
hostKeys.add(request.get().hostKey());
DnsRefreshRequest request = DnsRefreshRequest.createFromTask(task, clock.nowUtc());
if (request.isRefreshNeeded()) {
requestsBuilder.add(request);
hostKeys.add(request.hostKey());
} else {
// Skip hosts that are deleted.
tasksToDelete.add(task);
requestsToDelete.add(request);
}
} catch (Exception e) {
logger.severefmt(
@ -105,7 +114,8 @@ public class RefreshDnsOnHostRenameAction implements Runnable {
}
}
deleteTasksWithRetry(tasksToDelete, pullQueue, retrier);
deleteTasksWithRetry(
requestsToDelete, pullQueue, asyncFlowMetrics, retrier, OperationResult.STALE);
ImmutableList<DnsRefreshRequest> refreshRequests = requestsBuilder.build();
if (refreshRequests.isEmpty()) {
logger.info(
@ -113,12 +123,11 @@ public class RefreshDnsOnHostRenameAction implements Runnable {
response.setPayload("All requested DNS refreshes are on hosts that were since deleted.");
} else {
logger.infofmt("Processing asynchronous DNS refresh for renamed hosts: %s", hostKeys.build());
runMapreduce(refreshRequests, tasks);
runMapreduce(refreshRequests);
}
}
private void runMapreduce(
ImmutableList<DnsRefreshRequest> refreshRequests, List<TaskHandle> tasks) {
private void runMapreduce(ImmutableList<DnsRefreshRequest> refreshRequests) {
try {
response.sendJavaScriptRedirect(createJobPath(mrRunner
.setJobName("Enqueue DNS refreshes for domains referencing renamed hosts")
@ -126,7 +135,7 @@ public class RefreshDnsOnHostRenameAction implements Runnable {
.setDefaultReduceShards(1)
.runMapreduce(
new RefreshDnsOnHostRenameMapper(refreshRequests, retrier),
new RefreshDnsOnHostRenameReducer(tasks, retrier),
new RefreshDnsOnHostRenameReducer(refreshRequests, retrier),
// Add an extra NullInput so that the reducer always fires exactly once.
ImmutableList.of(
new NullInput<DomainResource>(), createEntityInput(DomainResource.class)))));
@ -199,26 +208,48 @@ public class RefreshDnsOnHostRenameAction implements Runnable {
private static final long serialVersionUID = -2850944843275790412L;
private final Retrier retrier;
private final List<TaskHandle> tasks;
@NonFinalForTesting
private static AsyncFlowMetrics asyncFlowMetrics = new AsyncFlowMetrics(new SystemClock());
RefreshDnsOnHostRenameReducer(List<TaskHandle> tasks, Retrier retrier) {
this.tasks = tasks;
private final Retrier retrier;
private final List<DnsRefreshRequest> refreshRequests;
RefreshDnsOnHostRenameReducer(List<DnsRefreshRequest> refreshRequests, Retrier retrier) {
this.refreshRequests = refreshRequests;
this.retrier = retrier;
}
@Override
public void reduce(Boolean key, ReducerInput<Boolean> values) {
deleteTasksWithRetry(tasks, getQueue(QUEUE_ASYNC_HOST_RENAME), retrier);
deleteTasksWithRetry(
refreshRequests,
getQueue(QUEUE_ASYNC_HOST_RENAME),
asyncFlowMetrics,
retrier,
OperationResult.SUCCESS);
}
}
/** Deletes a list of tasks from the given queue using a retrier. */
private static void deleteTasksWithRetry(
final List<TaskHandle> tasks, final Queue queue, Retrier retrier) {
if (tasks.isEmpty()) {
final List<DnsRefreshRequest> refreshRequests,
final Queue queue,
AsyncFlowMetrics asyncFlowMetrics,
Retrier retrier,
OperationResult result) {
if (refreshRequests.isEmpty()) {
return;
}
final List<TaskHandle> tasks =
FluentIterable.from(refreshRequests)
.transform(
new Function<DnsRefreshRequest, TaskHandle>() {
@Override
public TaskHandle apply(DnsRefreshRequest refreshRequest) {
return refreshRequest.task();
}
})
.toList();
retrier.callWithRetry(
new Callable<Void>() {
@Override
@ -226,35 +257,60 @@ public class RefreshDnsOnHostRenameAction implements Runnable {
queue.deleteTask(tasks);
return null;
}}, TransientFailureException.class);
for (DnsRefreshRequest refreshRequest : refreshRequests) {
asyncFlowMetrics.recordAsyncFlowResult(DNS_REFRESH, result, refreshRequest.requestedTime());
}
}
/** A class that encapsulates the values of a request to refresh DNS for a renamed host. */
@AutoValue
abstract static class DnsRefreshRequest implements Serializable {
private static final long serialVersionUID = 2188894914017230887L;
private static final long serialVersionUID = 1772812852271288622L;
abstract Key<HostResource> hostKey();
abstract DateTime lastUpdateTime();
abstract DateTime requestedTime();
abstract boolean isRefreshNeeded();
abstract TaskHandle task();
@AutoValue.Builder
abstract static class Builder {
abstract Builder setHostKey(Key<HostResource> hostKey);
abstract Builder setLastUpdateTime(DateTime lastUpdateTime);
abstract Builder setRequestedTime(DateTime requestedTime);
abstract Builder setIsRefreshNeeded(boolean isRefreshNeeded);
abstract Builder setTask(TaskHandle task);
abstract DnsRefreshRequest build();
}
/**
* Returns a packaged-up {@link DnsRefreshRequest} parsed from a task queue task, or absent if
* the host specified is already deleted.
* Returns a packaged-up {@link DnsRefreshRequest} parsed from a task queue task.
*/
static Optional<DnsRefreshRequest> createFromTask(TaskHandle task, DateTime now)
throws Exception {
static DnsRefreshRequest createFromTask(TaskHandle task, DateTime now) throws Exception {
ImmutableMap<String, String> params = ImmutableMap.copyOf(task.extractParams());
Key<HostResource> hostKey =
Key.create(checkNotNull(params.get(PARAM_HOST_KEY), "Host to refresh not specified"));
HostResource host =
checkNotNull(ofy().load().key(hostKey).now(), "Host to refresh doesn't exist");
if (isDeleted(host, latestOf(now, host.getUpdateAutoTimestamp().getTimestamp()))) {
boolean isHostDeleted =
isDeleted(host, latestOf(now, host.getUpdateAutoTimestamp().getTimestamp()));
if (isHostDeleted) {
logger.infofmt("Host %s is already deleted, not refreshing DNS.", hostKey);
return Optional.absent();
}
return Optional.<DnsRefreshRequest>of(
new AutoValue_RefreshDnsOnHostRenameAction_DnsRefreshRequest(
hostKey, host.getUpdateAutoTimestamp().getTimestamp()));
return new AutoValue_RefreshDnsOnHostRenameAction_DnsRefreshRequest.Builder()
.setHostKey(hostKey)
.setLastUpdateTime(host.getUpdateAutoTimestamp().getTimestamp())
// TODO(b/38386090): After push is completed and all old tasks are processed, change to:
// .setRequestedTime(DateTime.parse(
// checkNotNull(params.get(PARAM_REQUESTED_TIME), "Requested time not specified")))
.setRequestedTime(
(params.containsKey(PARAM_REQUESTED_TIME))
? DateTime.parse(params.get(PARAM_REQUESTED_TIME))
: DateTime.now(UTC))
.setIsRefreshNeeded(!isHostDeleted)
.setTask(task)
.build();
}
}
}