mirror of
https://github.com/google/nomulus.git
synced 2025-05-13 16:07:15 +02:00
Cut over to batched async deletion for contacts/hosts
Also consolidates the DNS refresh functionality in AsyncFlowUtils that was being used by HostUpdateFlow into AsyncFlowEnqueuer. TESTED=I threw together some batch scripts to create dozens of contacts on alpha and then request their deletion, and the [] ran fine and successfully deleted them in batches. ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=133714691
This commit is contained in:
parent
65ff6b45d1
commit
2dcac3ca68
11 changed files with 74 additions and 190 deletions
|
@ -14,17 +14,23 @@
|
|||
|
||||
package google.registry.flows.async;
|
||||
|
||||
import static com.google.appengine.api.taskqueue.QueueFactory.getQueue;
|
||||
import static google.registry.flows.async.DeleteContactsAndHostsAction.PARAM_IS_SUPERUSER;
|
||||
import static google.registry.flows.async.DeleteContactsAndHostsAction.PARAM_REQUESTING_CLIENT_ID;
|
||||
import static google.registry.flows.async.DeleteContactsAndHostsAction.PARAM_RESOURCE_KEY;
|
||||
import static google.registry.flows.async.DnsRefreshForHostRenameAction.PARAM_HOST_KEY;
|
||||
import static google.registry.request.Actions.getPathForAction;
|
||||
|
||||
import com.google.appengine.api.taskqueue.Queue;
|
||||
import com.google.appengine.api.taskqueue.RetryOptions;
|
||||
import com.google.appengine.api.taskqueue.TaskOptions;
|
||||
import com.google.appengine.api.taskqueue.TaskOptions.Method;
|
||||
import com.google.appengine.api.taskqueue.TransientFailureException;
|
||||
import com.googlecode.objectify.Key;
|
||||
import google.registry.config.ConfigModule.Config;
|
||||
import google.registry.config.RegistryEnvironment;
|
||||
import google.registry.model.EppResource;
|
||||
import google.registry.model.host.HostResource;
|
||||
import google.registry.util.FormattingLogger;
|
||||
import google.registry.util.Retrier;
|
||||
import java.util.concurrent.Callable;
|
||||
|
@ -32,40 +38,58 @@ import javax.inject.Inject;
|
|||
import javax.inject.Named;
|
||||
import org.joda.time.Duration;
|
||||
|
||||
/** Helper class to enqueue tasks for handling asynchronous deletions to pull queues. */
|
||||
/** Helper class to enqueue tasks for handling asynchronous operations in flows. */
|
||||
public final class AsyncFlowEnqueuer {
|
||||
|
||||
private static final FormattingLogger logger = FormattingLogger.getLoggerForCallerClass();
|
||||
|
||||
@Inject @Config("asyncDeleteFlowMapreduceDelay") Duration asyncDeleteDelay;
|
||||
@Inject @Named("async-delete-pull") Queue queue;
|
||||
@Inject @Named("async-delete-pull") Queue asyncDeletePullQueue;
|
||||
@Inject Retrier retrier;
|
||||
@Inject AsyncFlowEnqueuer() {}
|
||||
|
||||
/**
|
||||
* Enqueues a task to asynchronously delete a contact or host, by key.
|
||||
*
|
||||
* <p>Note that the clientId is of the logged-in registrar that is requesting the deletion, not
|
||||
* necessarily the current owner of the resource.
|
||||
*/
|
||||
/** Enqueues a task to asynchronously delete a contact or host, by key. */
|
||||
public void enqueueAsyncDelete(
|
||||
EppResource resourceToDelete, String clientId, boolean isSuperuser) {
|
||||
EppResource resourceToDelete, String requestingClientId, boolean isSuperuser) {
|
||||
Key<EppResource> resourceKey = Key.create(resourceToDelete);
|
||||
logger.infofmt(
|
||||
"Enqueueing async action to delete %s on behalf of registrar %s.", resourceKey, clientId);
|
||||
final TaskOptions options =
|
||||
"Enqueueing async deletion of %s on behalf of registrar %s.",
|
||||
resourceKey, requestingClientId);
|
||||
final TaskOptions task =
|
||||
TaskOptions.Builder
|
||||
.withMethod(Method.PULL)
|
||||
.countdownMillis(asyncDeleteDelay.getMillis())
|
||||
.param(PARAM_RESOURCE_KEY, resourceKey.getString())
|
||||
.param(PARAM_REQUESTING_CLIENT_ID, clientId)
|
||||
.param(PARAM_REQUESTING_CLIENT_ID, requestingClientId)
|
||||
.param(PARAM_IS_SUPERUSER, Boolean.toString(isSuperuser));
|
||||
// Retry on transient failure exceptions so that the entire flow isn't aborted unnecessarily.
|
||||
addTaskToQueueWithRetry(asyncDeletePullQueue, task);
|
||||
}
|
||||
|
||||
/** Enqueues a task to asynchronously refresh DNS for a host. */
|
||||
public void enqueueAsyncDnsRefresh(HostResource host) {
|
||||
logger.infofmt("Enqueueing async DNS refresh for host %s", Key.create(host));
|
||||
// Aggressively back off if the task fails, to minimize flooding the logs.
|
||||
RetryOptions retryOptions =
|
||||
RetryOptions.Builder.withMinBackoffSeconds(
|
||||
RegistryEnvironment.get().config().getAsyncFlowFailureBackoff().getStandardSeconds());
|
||||
final TaskOptions task =
|
||||
TaskOptions.Builder.withUrl(getPathForAction(DnsRefreshForHostRenameAction.class))
|
||||
.retryOptions(retryOptions)
|
||||
.param(PARAM_HOST_KEY, Key.create(host).getString())
|
||||
.method(Method.GET);
|
||||
addTaskToQueueWithRetry(getQueue("flows-async"), task);
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a task to a queue with retrying, to avoid aborting the entire flow over a transient issue
|
||||
* enqueuing a task.
|
||||
*/
|
||||
private void addTaskToQueueWithRetry(final Queue queue, final TaskOptions task) {
|
||||
retrier.callWithRetry(new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
queue.add(options);
|
||||
return null;
|
||||
}}, TransientFailureException.class);
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
queue.add(task);
|
||||
return null;
|
||||
}}, TransientFailureException.class);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,64 +0,0 @@
|
|||
// Copyright 2016 The Domain Registry Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package google.registry.flows.async;
|
||||
|
||||
import static google.registry.request.Actions.getPathForAction;
|
||||
|
||||
import com.google.appengine.api.taskqueue.Queue;
|
||||
import com.google.appengine.api.taskqueue.QueueFactory;
|
||||
import com.google.appengine.api.taskqueue.RetryOptions;
|
||||
import com.google.appengine.api.taskqueue.TaskHandle;
|
||||
import com.google.appengine.api.taskqueue.TaskOptions;
|
||||
import com.google.appengine.api.taskqueue.TaskOptions.Method;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import google.registry.config.RegistryEnvironment;
|
||||
import google.registry.util.FormattingLogger;
|
||||
import java.util.Map.Entry;
|
||||
import org.joda.time.Duration;
|
||||
|
||||
/** Utility methods specific to async flows. */
|
||||
// TODO(b/26140521): Delete this class once non-batched async operations are deleted.
|
||||
public final class AsyncFlowUtils {
|
||||
|
||||
private static final FormattingLogger logger = FormattingLogger.getLoggerForCallerClass();
|
||||
|
||||
@VisibleForTesting
|
||||
public static final String ASYNC_FLOW_QUEUE_NAME = "flows-async"; // See queue.xml.
|
||||
|
||||
private AsyncFlowUtils() {}
|
||||
|
||||
/** Enqueues a mapreduce action to perform an async flow operation. */
|
||||
public static TaskHandle enqueueMapreduceAction(
|
||||
Class<? extends Runnable> action,
|
||||
ImmutableMap<String, String> params,
|
||||
Duration executionDelay) {
|
||||
Queue queue = QueueFactory.getQueue(ASYNC_FLOW_QUEUE_NAME);
|
||||
String path = getPathForAction(action);
|
||||
logger.infofmt("Enqueueing async mapreduce action with path %s and params %s", path, params);
|
||||
// Aggressively back off if the task fails, to minimize flooding the logs.
|
||||
RetryOptions retryOptions = RetryOptions.Builder.withMinBackoffSeconds(
|
||||
RegistryEnvironment.get().config().getAsyncFlowFailureBackoff().getStandardSeconds());
|
||||
TaskOptions options = TaskOptions.Builder
|
||||
.withUrl(path)
|
||||
.retryOptions(retryOptions)
|
||||
.countdownMillis(executionDelay.getMillis())
|
||||
.method(Method.GET);
|
||||
for (Entry<String, String> entry : params.entrySet()) {
|
||||
options.param(entry.getKey(), entry.getValue());
|
||||
}
|
||||
return queue.add(options);
|
||||
}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue