diff --git a/java/google/registry/flows/async/AsyncFlowEnqueuer.java b/java/google/registry/flows/async/AsyncFlowEnqueuer.java index c8c09c158..04d21014a 100644 --- a/java/google/registry/flows/async/AsyncFlowEnqueuer.java +++ b/java/google/registry/flows/async/AsyncFlowEnqueuer.java @@ -14,17 +14,23 @@ package google.registry.flows.async; +import static com.google.appengine.api.taskqueue.QueueFactory.getQueue; import static google.registry.flows.async.DeleteContactsAndHostsAction.PARAM_IS_SUPERUSER; import static google.registry.flows.async.DeleteContactsAndHostsAction.PARAM_REQUESTING_CLIENT_ID; import static google.registry.flows.async.DeleteContactsAndHostsAction.PARAM_RESOURCE_KEY; +import static google.registry.flows.async.DnsRefreshForHostRenameAction.PARAM_HOST_KEY; +import static google.registry.request.Actions.getPathForAction; import com.google.appengine.api.taskqueue.Queue; +import com.google.appengine.api.taskqueue.RetryOptions; import com.google.appengine.api.taskqueue.TaskOptions; import com.google.appengine.api.taskqueue.TaskOptions.Method; import com.google.appengine.api.taskqueue.TransientFailureException; import com.googlecode.objectify.Key; import google.registry.config.ConfigModule.Config; +import google.registry.config.RegistryEnvironment; import google.registry.model.EppResource; +import google.registry.model.host.HostResource; import google.registry.util.FormattingLogger; import google.registry.util.Retrier; import java.util.concurrent.Callable; @@ -32,40 +38,58 @@ import javax.inject.Inject; import javax.inject.Named; import org.joda.time.Duration; -/** Helper class to enqueue tasks for handling asynchronous deletions to pull queues. */ +/** Helper class to enqueue tasks for handling asynchronous operations in flows. */ public final class AsyncFlowEnqueuer { private static final FormattingLogger logger = FormattingLogger.getLoggerForCallerClass(); @Inject @Config("asyncDeleteFlowMapreduceDelay") Duration asyncDeleteDelay; - @Inject @Named("async-delete-pull") Queue queue; + @Inject @Named("async-delete-pull") Queue asyncDeletePullQueue; @Inject Retrier retrier; @Inject AsyncFlowEnqueuer() {} - /** - * Enqueues a task to asynchronously delete a contact or host, by key. - * - *

Note that the clientId is of the logged-in registrar that is requesting the deletion, not - * necessarily the current owner of the resource. - */ + /** Enqueues a task to asynchronously delete a contact or host, by key. */ public void enqueueAsyncDelete( - EppResource resourceToDelete, String clientId, boolean isSuperuser) { + EppResource resourceToDelete, String requestingClientId, boolean isSuperuser) { Key resourceKey = Key.create(resourceToDelete); logger.infofmt( - "Enqueueing async action to delete %s on behalf of registrar %s.", resourceKey, clientId); - final TaskOptions options = + "Enqueueing async deletion of %s on behalf of registrar %s.", + resourceKey, requestingClientId); + final TaskOptions task = TaskOptions.Builder .withMethod(Method.PULL) .countdownMillis(asyncDeleteDelay.getMillis()) .param(PARAM_RESOURCE_KEY, resourceKey.getString()) - .param(PARAM_REQUESTING_CLIENT_ID, clientId) + .param(PARAM_REQUESTING_CLIENT_ID, requestingClientId) .param(PARAM_IS_SUPERUSER, Boolean.toString(isSuperuser)); - // Retry on transient failure exceptions so that the entire flow isn't aborted unnecessarily. + addTaskToQueueWithRetry(asyncDeletePullQueue, task); + } + + /** Enqueues a task to asynchronously refresh DNS for a host. */ + public void enqueueAsyncDnsRefresh(HostResource host) { + logger.infofmt("Enqueueing async DNS refresh for host %s", Key.create(host)); + // Aggressively back off if the task fails, to minimize flooding the logs. + RetryOptions retryOptions = + RetryOptions.Builder.withMinBackoffSeconds( + RegistryEnvironment.get().config().getAsyncFlowFailureBackoff().getStandardSeconds()); + final TaskOptions task = + TaskOptions.Builder.withUrl(getPathForAction(DnsRefreshForHostRenameAction.class)) + .retryOptions(retryOptions) + .param(PARAM_HOST_KEY, Key.create(host).getString()) + .method(Method.GET); + addTaskToQueueWithRetry(getQueue("flows-async"), task); + } + + /** + * Adds a task to a queue with retrying, to avoid aborting the entire flow over a transient issue + * enqueuing a task. + */ + private void addTaskToQueueWithRetry(final Queue queue, final TaskOptions task) { retrier.callWithRetry(new Callable() { - @Override - public Void call() throws Exception { - queue.add(options); - return null; - }}, TransientFailureException.class); + @Override + public Void call() throws Exception { + queue.add(task); + return null; + }}, TransientFailureException.class); } } diff --git a/java/google/registry/flows/async/AsyncFlowUtils.java b/java/google/registry/flows/async/AsyncFlowUtils.java deleted file mode 100644 index ab5fe027c..000000000 --- a/java/google/registry/flows/async/AsyncFlowUtils.java +++ /dev/null @@ -1,64 +0,0 @@ -// Copyright 2016 The Domain Registry Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package google.registry.flows.async; - -import static google.registry.request.Actions.getPathForAction; - -import com.google.appengine.api.taskqueue.Queue; -import com.google.appengine.api.taskqueue.QueueFactory; -import com.google.appengine.api.taskqueue.RetryOptions; -import com.google.appengine.api.taskqueue.TaskHandle; -import com.google.appengine.api.taskqueue.TaskOptions; -import com.google.appengine.api.taskqueue.TaskOptions.Method; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableMap; -import google.registry.config.RegistryEnvironment; -import google.registry.util.FormattingLogger; -import java.util.Map.Entry; -import org.joda.time.Duration; - -/** Utility methods specific to async flows. */ -// TODO(b/26140521): Delete this class once non-batched async operations are deleted. -public final class AsyncFlowUtils { - - private static final FormattingLogger logger = FormattingLogger.getLoggerForCallerClass(); - - @VisibleForTesting - public static final String ASYNC_FLOW_QUEUE_NAME = "flows-async"; // See queue.xml. - - private AsyncFlowUtils() {} - - /** Enqueues a mapreduce action to perform an async flow operation. */ - public static TaskHandle enqueueMapreduceAction( - Class action, - ImmutableMap params, - Duration executionDelay) { - Queue queue = QueueFactory.getQueue(ASYNC_FLOW_QUEUE_NAME); - String path = getPathForAction(action); - logger.infofmt("Enqueueing async mapreduce action with path %s and params %s", path, params); - // Aggressively back off if the task fails, to minimize flooding the logs. - RetryOptions retryOptions = RetryOptions.Builder.withMinBackoffSeconds( - RegistryEnvironment.get().config().getAsyncFlowFailureBackoff().getStandardSeconds()); - TaskOptions options = TaskOptions.Builder - .withUrl(path) - .retryOptions(retryOptions) - .countdownMillis(executionDelay.getMillis()) - .method(Method.GET); - for (Entry entry : params.entrySet()) { - options.param(entry.getKey(), entry.getValue()); - } - return queue.add(options); - } -} diff --git a/java/google/registry/flows/contact/ContactDeleteFlow.java b/java/google/registry/flows/contact/ContactDeleteFlow.java index 0b6965797..71a3bb134 100644 --- a/java/google/registry/flows/contact/ContactDeleteFlow.java +++ b/java/google/registry/flows/contact/ContactDeleteFlow.java @@ -24,7 +24,6 @@ import static google.registry.model.ofy.ObjectifyService.ofy; import com.google.common.base.Function; import com.google.common.base.Optional; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.googlecode.objectify.Key; import google.registry.config.ConfigModule.Config; @@ -34,9 +33,6 @@ import google.registry.flows.FlowModule.TargetId; import google.registry.flows.LoggedInFlow; import google.registry.flows.TransactionalFlow; import google.registry.flows.async.AsyncFlowEnqueuer; -import google.registry.flows.async.AsyncFlowUtils; -import google.registry.flows.async.DeleteContactResourceAction; -import google.registry.flows.async.DeleteEppResourceAction; import google.registry.flows.exceptions.ResourceToMutateDoesNotExistException; import google.registry.model.contact.ContactResource; import google.registry.model.domain.DomainBase; @@ -96,18 +92,7 @@ public class ContactDeleteFlow extends LoggedInFlow implements TransactionalFlow if (!isSuperuser) { verifyResourceOwnership(clientId, existingResource); } - AsyncFlowUtils.enqueueMapreduceAction( - DeleteContactResourceAction.class, - ImmutableMap.of( - DeleteEppResourceAction.PARAM_RESOURCE_KEY, - Key.create(existingResource).getString(), - DeleteEppResourceAction.PARAM_REQUESTING_CLIENT_ID, - clientId, - DeleteEppResourceAction.PARAM_IS_SUPERUSER, - Boolean.toString(isSuperuser)), - mapreduceDelay); - // TODO(b/26140521): Switch over to batch async operations as follows: - // asyncFlowEnqueuer.enqueueAsyncDelete(existingResource, getClientId(), isSuperuser); + asyncFlowEnqueuer.enqueueAsyncDelete(existingResource, clientId, isSuperuser); ContactResource newResource = existingResource.asBuilder().addStatusValue(StatusValue.PENDING_DELETE).build(); historyBuilder diff --git a/java/google/registry/flows/host/HostDeleteFlow.java b/java/google/registry/flows/host/HostDeleteFlow.java index c45c02edc..426e3d4f1 100644 --- a/java/google/registry/flows/host/HostDeleteFlow.java +++ b/java/google/registry/flows/host/HostDeleteFlow.java @@ -24,7 +24,6 @@ import static google.registry.model.ofy.ObjectifyService.ofy; import com.google.common.base.Function; import com.google.common.base.Optional; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.googlecode.objectify.Key; import google.registry.config.ConfigModule.Config; @@ -32,9 +31,7 @@ import google.registry.flows.EppException; import google.registry.flows.FlowModule.ClientId; import google.registry.flows.LoggedInFlow; import google.registry.flows.TransactionalFlow; -import google.registry.flows.async.AsyncFlowUtils; -import google.registry.flows.async.DeleteEppResourceAction; -import google.registry.flows.async.DeleteHostResourceAction; +import google.registry.flows.async.AsyncFlowEnqueuer; import google.registry.flows.exceptions.ResourceToMutateDoesNotExistException; import google.registry.model.domain.DomainBase; import google.registry.model.domain.metadata.MetadataExtension; @@ -71,6 +68,7 @@ public class HostDeleteFlow extends LoggedInFlow implements TransactionalFlow { return domain.getNameservers(); }}; + @Inject AsyncFlowEnqueuer asyncFlowEnqueuer; @Inject ResourceCommand resourceCommand; @Inject Optional authInfo; @Inject @ClientId String clientId; @@ -97,16 +95,7 @@ public class HostDeleteFlow extends LoggedInFlow implements TransactionalFlow { if (!isSuperuser) { verifyResourceOwnership(clientId, existingResource); } - AsyncFlowUtils.enqueueMapreduceAction( - DeleteHostResourceAction.class, - ImmutableMap.of( - DeleteEppResourceAction.PARAM_RESOURCE_KEY, - Key.create(existingResource).getString(), - DeleteEppResourceAction.PARAM_REQUESTING_CLIENT_ID, - clientId, - DeleteEppResourceAction.PARAM_IS_SUPERUSER, - Boolean.toString(isSuperuser)), - mapreduceDelay); + asyncFlowEnqueuer.enqueueAsyncDelete(existingResource, clientId, isSuperuser); HostResource newResource = existingResource.asBuilder().addStatusValue(StatusValue.PENDING_DELETE).build(); historyBuilder diff --git a/java/google/registry/flows/host/HostUpdateFlow.java b/java/google/registry/flows/host/HostUpdateFlow.java index 782c805a1..018eba814 100644 --- a/java/google/registry/flows/host/HostUpdateFlow.java +++ b/java/google/registry/flows/host/HostUpdateFlow.java @@ -28,7 +28,6 @@ import static google.registry.model.ofy.ObjectifyService.ofy; import static google.registry.util.CollectionUtils.isNullOrEmpty; import com.google.common.base.Optional; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import com.googlecode.objectify.Key; @@ -41,8 +40,7 @@ import google.registry.flows.EppException.StatusProhibitsOperationException; import google.registry.flows.FlowModule.ClientId; import google.registry.flows.LoggedInFlow; import google.registry.flows.TransactionalFlow; -import google.registry.flows.async.AsyncFlowUtils; -import google.registry.flows.async.DnsRefreshForHostRenameAction; +import google.registry.flows.async.AsyncFlowEnqueuer; import google.registry.flows.exceptions.AddRemoveSameValueEppException; import google.registry.flows.exceptions.ResourceHasClientUpdateProhibitedException; import google.registry.flows.exceptions.ResourceToMutateDoesNotExistException; @@ -62,7 +60,6 @@ import google.registry.model.index.ForeignKeyIndex; import google.registry.model.reporting.HistoryEntry; import java.util.Objects; import javax.inject.Inject; -import org.joda.time.Duration; /** * An EPP flow that updates a host resource. @@ -96,6 +93,7 @@ public class HostUpdateFlow extends LoggedInFlow implements TransactionalFlow { @Inject Optional authInfo; @Inject @ClientId String clientId; @Inject HistoryEntry.Builder historyBuilder; + @Inject AsyncFlowEnqueuer asyncFlowEnqueuer; @Inject HostUpdateFlow() {} @Override @@ -225,12 +223,7 @@ public class HostUpdateFlow extends LoggedInFlow implements TransactionalFlow { } // We must also enqueue updates for all domains that use this host as their nameserver so // that their NS records can be updated to point at the new name. - AsyncFlowUtils.enqueueMapreduceAction( - DnsRefreshForHostRenameAction.class, - ImmutableMap.of( - DnsRefreshForHostRenameAction.PARAM_HOST_KEY, - Key.create(existingResource).getString()), - Duration.ZERO); + asyncFlowEnqueuer.enqueueAsyncDnsRefresh(existingResource); } } diff --git a/javatests/google/registry/flows/ResourceFlowTestCase.java b/javatests/google/registry/flows/ResourceFlowTestCase.java index 6f7d1101f..960d8428d 100644 --- a/javatests/google/registry/flows/ResourceFlowTestCase.java +++ b/javatests/google/registry/flows/ResourceFlowTestCase.java @@ -18,6 +18,7 @@ import static com.google.common.truth.Truth.assertThat; import static google.registry.model.EppResourceUtils.loadByUniqueId; import static google.registry.model.ofy.ObjectifyService.ofy; import static google.registry.model.tmch.ClaimsListShardTest.createTestClaimsListShard; +import static google.registry.testing.TaskQueueHelper.assertTasksEnqueued; import com.google.common.base.Predicate; import com.google.common.collect.FluentIterable; @@ -34,8 +35,10 @@ import google.registry.model.index.EppResourceIndexBucket; import google.registry.model.tmch.ClaimsListShard.ClaimsListRevision; import google.registry.model.tmch.ClaimsListShard.ClaimsListSingleton; import google.registry.testing.ExceptionRule; +import google.registry.testing.TaskQueueHelper.TaskMatcher; import google.registry.util.TypeUtils.TypeInstantiator; import org.joda.time.DateTime; +import org.joda.time.Duration; import org.junit.Rule; import org.junit.Test; @@ -136,4 +139,18 @@ public abstract class ResourceFlowTestCase void assertAsyncDeletionTaskEnqueued( + T resource, String requestingClientId, boolean isSuperuser) throws Exception { + String expectedPayload = + String.format( + "resourceKey=%s&requestingClientId=%s&isSuperuser=%s", + Key.create(resource).getString(), requestingClientId, Boolean.toString(isSuperuser)); + assertTasksEnqueued( + "async-delete-pull", + new TaskMatcher() + .etaDelta(Duration.standardSeconds(75), Duration.standardSeconds(105)) // expected: 90 + .payload(expectedPayload)); + } } diff --git a/javatests/google/registry/flows/async/DeleteContactsAndHostsActionTest.java b/javatests/google/registry/flows/async/DeleteContactsAndHostsActionTest.java index e64211f24..6b11abd8a 100644 --- a/javatests/google/registry/flows/async/DeleteContactsAndHostsActionTest.java +++ b/javatests/google/registry/flows/async/DeleteContactsAndHostsActionTest.java @@ -126,7 +126,7 @@ public class DeleteContactsAndHostsActionTest public void setup() throws Exception { enqueuer = new AsyncFlowEnqueuer(); enqueuer.asyncDeleteDelay = Duration.ZERO; - enqueuer.queue = QueueFactory.getQueue(QUEUE_ASYNC_DELETE); + enqueuer.asyncDeletePullQueue = QueueFactory.getQueue(QUEUE_ASYNC_DELETE); enqueuer.retrier = new Retrier(new FakeSleeper(clock), 1); action = new DeleteContactsAndHostsAction(); diff --git a/javatests/google/registry/flows/contact/ContactDeleteFlowTest.java b/javatests/google/registry/flows/contact/ContactDeleteFlowTest.java index ec2ea08a8..dba86026e 100644 --- a/javatests/google/registry/flows/contact/ContactDeleteFlowTest.java +++ b/javatests/google/registry/flows/contact/ContactDeleteFlowTest.java @@ -14,8 +14,6 @@ package google.registry.flows.contact; -import static google.registry.flows.async.AsyncFlowUtils.ASYNC_FLOW_QUEUE_NAME; -import static google.registry.request.Actions.getPathForAction; import static google.registry.testing.ContactResourceSubject.assertAboutContacts; import static google.registry.testing.DatastoreHelper.assertNoBillingEvents; import static google.registry.testing.DatastoreHelper.createTld; @@ -24,22 +22,16 @@ import static google.registry.testing.DatastoreHelper.newDomainResource; import static google.registry.testing.DatastoreHelper.persistActiveContact; import static google.registry.testing.DatastoreHelper.persistDeletedContact; import static google.registry.testing.DatastoreHelper.persistResource; -import static google.registry.testing.TaskQueueHelper.assertTasksEnqueued; import com.google.common.collect.ImmutableSet; -import com.googlecode.objectify.Key; import google.registry.flows.ResourceFlowTestCase; import google.registry.flows.ResourceFlowUtils.ResourceNotOwnedException; -import google.registry.flows.async.DeleteContactResourceAction; -import google.registry.flows.async.DeleteEppResourceAction; import google.registry.flows.exceptions.ResourceStatusProhibitsOperationException; import google.registry.flows.exceptions.ResourceToDeleteIsReferencedException; import google.registry.flows.exceptions.ResourceToMutateDoesNotExistException; import google.registry.model.contact.ContactResource; import google.registry.model.eppcommon.StatusValue; import google.registry.model.reporting.HistoryEntry; -import google.registry.testing.TaskQueueHelper.TaskMatcher; -import org.joda.time.Duration; import org.junit.Before; import org.junit.Test; @@ -76,18 +68,7 @@ public class ContactDeleteFlowTest runFlowAssertResponse(readFile("contact_delete_response.xml")); ContactResource deletedContact = reloadResourceByUniqueId(); assertAboutContacts().that(deletedContact).hasStatusValue(StatusValue.PENDING_DELETE); - assertTasksEnqueued(ASYNC_FLOW_QUEUE_NAME, new TaskMatcher() - .url(getPathForAction(DeleteContactResourceAction.class)) - .etaDelta(Duration.standardSeconds(75), Duration.standardSeconds(105)) // expected: 90 - .param( - DeleteEppResourceAction.PARAM_REQUESTING_CLIENT_ID, - "TheRegistrar") - .param( - DeleteEppResourceAction.PARAM_IS_SUPERUSER, - Boolean.toString(false)) - .param( - DeleteEppResourceAction.PARAM_RESOURCE_KEY, - Key.create(deletedContact).getString())); + assertAsyncDeletionTaskEnqueued(deletedContact, "TheRegistrar", false); assertAboutContacts().that(deletedContact) .hasOnlyOneHistoryEntryWhich() .hasType(HistoryEntry.Type.CONTACT_PENDING_DELETE); @@ -146,18 +127,7 @@ public class ContactDeleteFlowTest CommitMode.LIVE, UserPrivileges.SUPERUSER, readFile("contact_delete_response.xml")); ContactResource deletedContact = reloadResourceByUniqueId(); assertAboutContacts().that(deletedContact).hasStatusValue(StatusValue.PENDING_DELETE); - assertTasksEnqueued(ASYNC_FLOW_QUEUE_NAME, new TaskMatcher() - .url(getPathForAction(DeleteContactResourceAction.class)) - .etaDelta(Duration.standardSeconds(75), Duration.standardSeconds(105)) // expected: 90 - .param( - DeleteEppResourceAction.PARAM_REQUESTING_CLIENT_ID, - "NewRegistrar") - .param( - DeleteEppResourceAction.PARAM_IS_SUPERUSER, - Boolean.toString(true)) - .param( - DeleteEppResourceAction.PARAM_RESOURCE_KEY, - Key.create(deletedContact).getString())); + assertAsyncDeletionTaskEnqueued(deletedContact, "NewRegistrar", true); assertAboutContacts().that(deletedContact) .hasOnlyOneHistoryEntryWhich() .hasType(HistoryEntry.Type.CONTACT_PENDING_DELETE); diff --git a/javatests/google/registry/flows/host/HostDeleteFlowTest.java b/javatests/google/registry/flows/host/HostDeleteFlowTest.java index e0c8c9041..71f3aa86c 100644 --- a/javatests/google/registry/flows/host/HostDeleteFlowTest.java +++ b/javatests/google/registry/flows/host/HostDeleteFlowTest.java @@ -13,9 +13,6 @@ // limitations under the License. package google.registry.flows.host; - -import static google.registry.flows.async.AsyncFlowUtils.ASYNC_FLOW_QUEUE_NAME; -import static google.registry.request.Actions.getPathForAction; import static google.registry.testing.DatastoreHelper.assertNoBillingEvents; import static google.registry.testing.DatastoreHelper.createTld; import static google.registry.testing.DatastoreHelper.newDomainApplication; @@ -26,22 +23,17 @@ import static google.registry.testing.DatastoreHelper.persistDeletedHost; import static google.registry.testing.DatastoreHelper.persistResource; import static google.registry.testing.HostResourceSubject.assertAboutHosts; import static google.registry.testing.TaskQueueHelper.assertNoDnsTasksEnqueued; -import static google.registry.testing.TaskQueueHelper.assertTasksEnqueued; import com.google.common.collect.ImmutableSet; import com.googlecode.objectify.Key; import google.registry.flows.ResourceFlowTestCase; import google.registry.flows.ResourceFlowUtils.ResourceNotOwnedException; -import google.registry.flows.async.DeleteEppResourceAction; -import google.registry.flows.async.DeleteHostResourceAction; import google.registry.flows.exceptions.ResourceStatusProhibitsOperationException; import google.registry.flows.exceptions.ResourceToDeleteIsReferencedException; import google.registry.flows.exceptions.ResourceToMutateDoesNotExistException; import google.registry.model.eppcommon.StatusValue; import google.registry.model.host.HostResource; import google.registry.model.reporting.HistoryEntry; -import google.registry.testing.TaskQueueHelper.TaskMatcher; -import org.joda.time.Duration; import org.junit.Before; import org.junit.Test; @@ -77,18 +69,7 @@ public class HostDeleteFlowTest extends ResourceFlowTestCase