mirror of
https://github.com/google/nomulus.git
synced 2025-05-14 00:17:20 +02:00
Use locking on async mapreduces
This ensures that only one will run at a time, which should help fix the clogged up mapreduces we've seen on sandbox. In order to do this, the UnlockerOutput is introduced. This unlocks the given Lock after all reducer shards have finished. Also increases the lease duration of the DNS refresh action from 20 to 240 minutes. 20 minutes isn't long enough; when there's a lot of domains and decent system load the mapreduce could take longer than that in the ordinary case. TESTED=Deployed to alpha and verified that more than one copy of the mapreduce wouldn't run simultaneously, and also that the lock is released when the mapreduce is finished. ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=205887554
This commit is contained in:
parent
0cdbf71daf
commit
ded40851d3
11 changed files with 360 additions and 89 deletions
|
@ -42,7 +42,10 @@ import static google.registry.model.reporting.HistoryEntry.Type.HOST_DELETE_FAIL
|
|||
import static google.registry.util.PipelineUtils.createJobPath;
|
||||
import static java.math.RoundingMode.CEILING;
|
||||
import static java.util.concurrent.TimeUnit.DAYS;
|
||||
import static java.util.concurrent.TimeUnit.MINUTES;
|
||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||
import static java.util.logging.Level.INFO;
|
||||
import static java.util.logging.Level.SEVERE;
|
||||
import static org.joda.time.Duration.standardHours;
|
||||
|
||||
import com.google.appengine.api.taskqueue.LeaseOptions;
|
||||
import com.google.appengine.api.taskqueue.Queue;
|
||||
|
@ -65,6 +68,7 @@ import google.registry.flows.async.AsyncFlowMetrics;
|
|||
import google.registry.flows.async.AsyncFlowMetrics.OperationResult;
|
||||
import google.registry.flows.async.AsyncFlowMetrics.OperationType;
|
||||
import google.registry.mapreduce.MapreduceRunner;
|
||||
import google.registry.mapreduce.UnlockerOutput;
|
||||
import google.registry.mapreduce.inputs.EppResourceInputs;
|
||||
import google.registry.mapreduce.inputs.NullInput;
|
||||
import google.registry.model.EppResource;
|
||||
|
@ -80,21 +84,26 @@ import google.registry.model.poll.PendingActionNotificationResponse.ContactPendi
|
|||
import google.registry.model.poll.PendingActionNotificationResponse.HostPendingActionNotificationResponse;
|
||||
import google.registry.model.poll.PollMessage;
|
||||
import google.registry.model.reporting.HistoryEntry;
|
||||
import google.registry.model.server.Lock;
|
||||
import google.registry.model.transfer.TransferStatus;
|
||||
import google.registry.request.Action;
|
||||
import google.registry.request.Response;
|
||||
import google.registry.request.auth.Auth;
|
||||
import google.registry.util.Clock;
|
||||
import google.registry.util.NonFinalForTesting;
|
||||
import google.registry.util.RequestStatusChecker;
|
||||
import google.registry.util.Retrier;
|
||||
import google.registry.util.SystemClock;
|
||||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.logging.Level;
|
||||
import javax.annotation.Nullable;
|
||||
import javax.inject.Inject;
|
||||
import javax.inject.Named;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Duration;
|
||||
|
||||
/**
|
||||
* A mapreduce that processes batch asynchronous deletions of contact and host resources by mapping
|
||||
|
@ -110,7 +119,7 @@ public class DeleteContactsAndHostsAction implements Runnable {
|
|||
static final String KIND_CONTACT = getKind(ContactResource.class);
|
||||
static final String KIND_HOST = getKind(HostResource.class);
|
||||
|
||||
private static final long LEASE_MINUTES = 120;
|
||||
private static final Duration LEASE_LENGTH = standardHours(4);
|
||||
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
|
||||
private static final int MAX_REDUCE_SHARDS = 50;
|
||||
private static final int DELETES_PER_SHARD = 5;
|
||||
|
@ -119,20 +128,40 @@ public class DeleteContactsAndHostsAction implements Runnable {
|
|||
@Inject Clock clock;
|
||||
@Inject MapreduceRunner mrRunner;
|
||||
@Inject @Named(QUEUE_ASYNC_DELETE) Queue queue;
|
||||
@Inject RequestStatusChecker requestStatusChecker;
|
||||
@Inject Response response;
|
||||
@Inject Retrier retrier;
|
||||
@Inject DeleteContactsAndHostsAction() {}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
LeaseOptions options =
|
||||
LeaseOptions.Builder.withCountLimit(maxLeaseCount()).leasePeriod(LEASE_MINUTES, MINUTES);
|
||||
List<TaskHandle> tasks = queue.leaseTasks(options);
|
||||
asyncFlowMetrics.recordContactHostDeletionBatchSize(tasks.size());
|
||||
if (tasks.isEmpty()) {
|
||||
response.setPayload("No contact/host deletion tasks in pull queue.");
|
||||
// Check if the lock can be acquired, and if not, a previous run of this mapreduce is still
|
||||
// executing, so return early.
|
||||
Optional<Lock> lock =
|
||||
Lock.acquire(
|
||||
DeleteContactsAndHostsAction.class.getSimpleName(),
|
||||
null,
|
||||
LEASE_LENGTH,
|
||||
requestStatusChecker,
|
||||
false);
|
||||
if (!lock.isPresent()) {
|
||||
logRespondAndUnlock(INFO, "Can't acquire lock; aborting.", lock);
|
||||
return;
|
||||
}
|
||||
|
||||
// Lease the async tasks to process.
|
||||
LeaseOptions options =
|
||||
LeaseOptions.Builder.withCountLimit(maxLeaseCount())
|
||||
.leasePeriod(LEASE_LENGTH.getStandardSeconds(), SECONDS);
|
||||
List<TaskHandle> tasks = queue.leaseTasks(options);
|
||||
asyncFlowMetrics.recordContactHostDeletionBatchSize(tasks.size());
|
||||
|
||||
// Check if there are no tasks to process, and if so, return early.
|
||||
if (tasks.isEmpty()) {
|
||||
logRespondAndUnlock(INFO, "No contact/host deletion tasks in pull queue; finishing.", lock);
|
||||
return;
|
||||
}
|
||||
|
||||
Multiset<String> kindCounts = HashMultiset.create(2);
|
||||
ImmutableList.Builder<DeletionRequest> builder = new ImmutableList.Builder<>();
|
||||
ImmutableList.Builder<Key<? extends EppResource>> resourceKeys = new ImmutableList.Builder<>();
|
||||
|
@ -158,13 +187,13 @@ public class DeleteContactsAndHostsAction implements Runnable {
|
|||
deleteStaleTasksWithRetry(requestsToDelete);
|
||||
ImmutableList<DeletionRequest> deletionRequests = builder.build();
|
||||
if (deletionRequests.isEmpty()) {
|
||||
logger.atInfo().log("No asynchronous deletions to process because all were already handled.");
|
||||
response.setPayload("All requested deletions of contacts/hosts have already occurred.");
|
||||
logRespondAndUnlock(
|
||||
INFO, "No async deletions to process because all were already handled.", lock);
|
||||
} else {
|
||||
logger.atInfo().log(
|
||||
"Processing asynchronous deletion of %d contacts and %d hosts: %s",
|
||||
kindCounts.count(KIND_CONTACT), kindCounts.count(KIND_HOST), resourceKeys.build());
|
||||
runMapreduce(deletionRequests);
|
||||
runMapreduce(deletionRequests, lock);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -187,27 +216,35 @@ public class DeleteContactsAndHostsAction implements Runnable {
|
|||
deletionRequest.requestedTime()));
|
||||
}
|
||||
|
||||
private void runMapreduce(ImmutableList<DeletionRequest> deletionRequests) {
|
||||
private void runMapreduce(ImmutableList<DeletionRequest> deletionRequests, Optional<Lock> lock) {
|
||||
try {
|
||||
int numReducers =
|
||||
Math.min(MAX_REDUCE_SHARDS, divide(deletionRequests.size(), DELETES_PER_SHARD, CEILING));
|
||||
response.sendJavaScriptRedirect(createJobPath(mrRunner
|
||||
.setJobName("Check for EPP resource references and then delete")
|
||||
.setModuleName("backend")
|
||||
.setDefaultReduceShards(numReducers)
|
||||
.runMapreduce(
|
||||
new DeleteContactsAndHostsMapper(deletionRequests),
|
||||
new DeleteEppResourceReducer(),
|
||||
ImmutableList.of(
|
||||
// Add an extra shard that maps over a null domain. See the mapper code for why.
|
||||
new NullInput<>(),
|
||||
EppResourceInputs.createEntityInput(DomainBase.class)))));
|
||||
response.sendJavaScriptRedirect(
|
||||
createJobPath(
|
||||
mrRunner
|
||||
.setJobName("Check for EPP resource references and then delete")
|
||||
.setModuleName("backend")
|
||||
.setDefaultReduceShards(numReducers)
|
||||
.runMapreduce(
|
||||
new DeleteContactsAndHostsMapper(deletionRequests),
|
||||
new DeleteEppResourceReducer(),
|
||||
ImmutableList.of(
|
||||
// Add an extra shard that maps over a null domain. See the mapper code
|
||||
// for why.
|
||||
new NullInput<>(), EppResourceInputs.createEntityInput(DomainBase.class)),
|
||||
new UnlockerOutput<Void>(lock.get()))));
|
||||
} catch (Throwable t) {
|
||||
logger.atSevere().withCause(t).log(
|
||||
"Error while kicking off mapreduce to delete contacts/hosts");
|
||||
logRespondAndUnlock(SEVERE, "Error starting mapreduce to delete contacts/hosts.", lock);
|
||||
}
|
||||
}
|
||||
|
||||
private void logRespondAndUnlock(Level level, String message, Optional<Lock> lock) {
|
||||
logger.at(level).log(message);
|
||||
response.setPayload(message);
|
||||
lock.ifPresent(Lock::release);
|
||||
}
|
||||
|
||||
/**
|
||||
* A mapper that iterates over all {@link DomainBase} entities.
|
||||
*
|
||||
|
|
|
@ -29,7 +29,10 @@ import static google.registry.model.ofy.ObjectifyService.ofy;
|
|||
import static google.registry.util.DateTimeUtils.latestOf;
|
||||
import static google.registry.util.PipelineUtils.createJobPath;
|
||||
import static java.util.concurrent.TimeUnit.DAYS;
|
||||
import static java.util.concurrent.TimeUnit.MINUTES;
|
||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||
import static java.util.logging.Level.INFO;
|
||||
import static java.util.logging.Level.SEVERE;
|
||||
import static org.joda.time.Duration.standardHours;
|
||||
|
||||
import com.google.appengine.api.taskqueue.LeaseOptions;
|
||||
import com.google.appengine.api.taskqueue.Queue;
|
||||
|
@ -50,20 +53,25 @@ import google.registry.mapreduce.MapreduceRunner;
|
|||
import google.registry.mapreduce.inputs.NullInput;
|
||||
import google.registry.model.domain.DomainResource;
|
||||
import google.registry.model.host.HostResource;
|
||||
import google.registry.model.server.Lock;
|
||||
import google.registry.request.Action;
|
||||
import google.registry.request.Response;
|
||||
import google.registry.request.auth.Auth;
|
||||
import google.registry.util.Clock;
|
||||
import google.registry.util.NonFinalForTesting;
|
||||
import google.registry.util.RequestStatusChecker;
|
||||
import google.registry.util.Retrier;
|
||||
import google.registry.util.SystemClock;
|
||||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.logging.Level;
|
||||
import javax.annotation.Nullable;
|
||||
import javax.inject.Inject;
|
||||
import javax.inject.Named;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Duration;
|
||||
|
||||
/** Performs batched DNS refreshes for applicable domains following a host rename. */
|
||||
@Action(
|
||||
|
@ -73,26 +81,48 @@ import org.joda.time.DateTime;
|
|||
public class RefreshDnsOnHostRenameAction implements Runnable {
|
||||
|
||||
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
|
||||
private static final long LEASE_MINUTES = 20;
|
||||
private static final Duration LEASE_LENGTH = standardHours(4);
|
||||
|
||||
@Inject AsyncFlowMetrics asyncFlowMetrics;
|
||||
@Inject Clock clock;
|
||||
@Inject MapreduceRunner mrRunner;
|
||||
@Inject @Named(QUEUE_ASYNC_HOST_RENAME) Queue pullQueue;
|
||||
@Inject RequestStatusChecker requestStatusChecker;
|
||||
@Inject Response response;
|
||||
@Inject Retrier retrier;
|
||||
@Inject RefreshDnsOnHostRenameAction() {}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
LeaseOptions options =
|
||||
LeaseOptions.Builder.withCountLimit(maxLeaseCount()).leasePeriod(LEASE_MINUTES, MINUTES);
|
||||
List<TaskHandle> tasks = pullQueue.leaseTasks(options);
|
||||
asyncFlowMetrics.recordDnsRefreshBatchSize(tasks.size());
|
||||
if (tasks.isEmpty()) {
|
||||
response.setPayload("No DNS refresh on host rename tasks to process in pull queue.");
|
||||
// Check if the lock can be acquired, and if not, a previous run of this mapreduce is still
|
||||
// executing, so return early.
|
||||
Optional<Lock> lock =
|
||||
Lock.acquire(
|
||||
RefreshDnsOnHostRenameAction.class.getSimpleName(),
|
||||
null,
|
||||
LEASE_LENGTH,
|
||||
requestStatusChecker,
|
||||
false);
|
||||
|
||||
if (!lock.isPresent()) {
|
||||
logRespondAndUnlock(INFO, "Can't acquire lock; aborting.", lock);
|
||||
return;
|
||||
}
|
||||
|
||||
// Lease the async tasks to process.
|
||||
LeaseOptions options =
|
||||
LeaseOptions.Builder.withCountLimit(maxLeaseCount())
|
||||
.leasePeriod(LEASE_LENGTH.getStandardSeconds(), SECONDS);
|
||||
List<TaskHandle> tasks = pullQueue.leaseTasks(options);
|
||||
asyncFlowMetrics.recordDnsRefreshBatchSize(tasks.size());
|
||||
|
||||
// Check if there are no tasks to process, and if so, return early.
|
||||
if (tasks.isEmpty()) {
|
||||
logRespondAndUnlock(
|
||||
INFO, "No DNS refresh on host rename tasks to process in pull queue; finishing.", lock);
|
||||
return;
|
||||
}
|
||||
|
||||
ImmutableList.Builder<DnsRefreshRequest> requestsBuilder = new ImmutableList.Builder<>();
|
||||
ImmutableList.Builder<Key<HostResource>> hostKeys = new ImmutableList.Builder<>();
|
||||
final List<DnsRefreshRequest> requestsToDelete = new ArrayList<>();
|
||||
|
@ -119,34 +149,41 @@ public class RefreshDnsOnHostRenameAction implements Runnable {
|
|||
requestsToDelete, pullQueue, asyncFlowMetrics, retrier, OperationResult.STALE);
|
||||
ImmutableList<DnsRefreshRequest> refreshRequests = requestsBuilder.build();
|
||||
if (refreshRequests.isEmpty()) {
|
||||
logger.atInfo().log(
|
||||
"No asynchronous DNS refreshes to process because all renamed hosts are deleted.");
|
||||
response.setPayload("All requested DNS refreshes are on hosts that were since deleted.");
|
||||
logRespondAndUnlock(
|
||||
INFO, "No async DNS refreshes to process because all renamed hosts are deleted.", lock);
|
||||
} else {
|
||||
logger.atInfo().log(
|
||||
"Processing asynchronous DNS refresh for renamed hosts: %s", hostKeys.build());
|
||||
runMapreduce(refreshRequests);
|
||||
runMapreduce(refreshRequests, lock);
|
||||
}
|
||||
}
|
||||
|
||||
private void runMapreduce(ImmutableList<DnsRefreshRequest> refreshRequests) {
|
||||
private void runMapreduce(ImmutableList<DnsRefreshRequest> refreshRequests, Optional<Lock> lock) {
|
||||
try {
|
||||
response.sendJavaScriptRedirect(createJobPath(mrRunner
|
||||
.setJobName("Enqueue DNS refreshes for domains referencing renamed hosts")
|
||||
.setModuleName("backend")
|
||||
.setDefaultReduceShards(1)
|
||||
.runMapreduce(
|
||||
new RefreshDnsOnHostRenameMapper(refreshRequests, retrier),
|
||||
new RefreshDnsOnHostRenameReducer(refreshRequests, retrier),
|
||||
// Add an extra NullInput so that the reducer always fires exactly once.
|
||||
ImmutableList.of(
|
||||
new NullInput<>(), createEntityInput(DomainResource.class)))));
|
||||
response.sendJavaScriptRedirect(
|
||||
createJobPath(
|
||||
mrRunner
|
||||
.setJobName("Enqueue DNS refreshes for domains referencing renamed hosts")
|
||||
.setModuleName("backend")
|
||||
.setDefaultReduceShards(1)
|
||||
.runMapreduce(
|
||||
new RefreshDnsOnHostRenameMapper(refreshRequests, retrier),
|
||||
new RefreshDnsOnHostRenameReducer(refreshRequests, lock.get(), retrier),
|
||||
// Add an extra NullInput so that the reducer always fires exactly once.
|
||||
ImmutableList.of(
|
||||
new NullInput<>(), createEntityInput(DomainResource.class)))));
|
||||
} catch (Throwable t) {
|
||||
logger.atSevere().withCause(t).log(
|
||||
"Error while kicking off mapreduce to refresh DNS for renamed hosts.");
|
||||
logRespondAndUnlock(
|
||||
SEVERE, "Error starting mapreduce to refresh DNS for renamed hosts.", lock);
|
||||
}
|
||||
}
|
||||
|
||||
private void logRespondAndUnlock(Level level, String message, Optional<Lock> lock) {
|
||||
logger.at(level).log(message);
|
||||
response.setPayload(message);
|
||||
lock.ifPresent(Lock::release);
|
||||
}
|
||||
|
||||
/** Map over domains and refresh the DNS of those that reference the renamed hosts. */
|
||||
public static class RefreshDnsOnHostRenameMapper
|
||||
extends Mapper<DomainResource, Boolean, Boolean> {
|
||||
|
@ -205,27 +242,34 @@ public class RefreshDnsOnHostRenameAction implements Runnable {
|
|||
*/
|
||||
public static class RefreshDnsOnHostRenameReducer extends Reducer<Boolean, Boolean, Void> {
|
||||
|
||||
private static final long serialVersionUID = -2850944843275790412L;
|
||||
private static final long serialVersionUID = 9077366205249562118L;
|
||||
|
||||
@NonFinalForTesting
|
||||
private static AsyncFlowMetrics asyncFlowMetrics = new AsyncFlowMetrics(new SystemClock());
|
||||
|
||||
private final Lock lock;
|
||||
private final Retrier retrier;
|
||||
private final List<DnsRefreshRequest> refreshRequests;
|
||||
|
||||
RefreshDnsOnHostRenameReducer(List<DnsRefreshRequest> refreshRequests, Retrier retrier) {
|
||||
RefreshDnsOnHostRenameReducer(
|
||||
List<DnsRefreshRequest> refreshRequests, Lock lock, Retrier retrier) {
|
||||
this.refreshRequests = refreshRequests;
|
||||
this.lock = lock;
|
||||
this.retrier = retrier;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reduce(Boolean key, ReducerInput<Boolean> values) {
|
||||
// The reduce() method is run precisely once, because the NullInput caused the mapper to emit
|
||||
// a dummy value once.
|
||||
deleteTasksWithRetry(
|
||||
refreshRequests,
|
||||
getQueue(QUEUE_ASYNC_HOST_RENAME),
|
||||
asyncFlowMetrics,
|
||||
retrier,
|
||||
OperationResult.SUCCESS);
|
||||
|
||||
lock.release();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue