diff --git a/java/google/registry/batch/DeleteContactsAndHostsAction.java b/java/google/registry/batch/DeleteContactsAndHostsAction.java index f5f532698..2bbd1bb88 100644 --- a/java/google/registry/batch/DeleteContactsAndHostsAction.java +++ b/java/google/registry/batch/DeleteContactsAndHostsAction.java @@ -42,7 +42,10 @@ import static google.registry.model.reporting.HistoryEntry.Type.HOST_DELETE_FAIL import static google.registry.util.PipelineUtils.createJobPath; import static java.math.RoundingMode.CEILING; import static java.util.concurrent.TimeUnit.DAYS; -import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.logging.Level.INFO; +import static java.util.logging.Level.SEVERE; +import static org.joda.time.Duration.standardHours; import com.google.appengine.api.taskqueue.LeaseOptions; import com.google.appengine.api.taskqueue.Queue; @@ -65,6 +68,7 @@ import google.registry.flows.async.AsyncFlowMetrics; import google.registry.flows.async.AsyncFlowMetrics.OperationResult; import google.registry.flows.async.AsyncFlowMetrics.OperationType; import google.registry.mapreduce.MapreduceRunner; +import google.registry.mapreduce.UnlockerOutput; import google.registry.mapreduce.inputs.EppResourceInputs; import google.registry.mapreduce.inputs.NullInput; import google.registry.model.EppResource; @@ -80,21 +84,26 @@ import google.registry.model.poll.PendingActionNotificationResponse.ContactPendi import google.registry.model.poll.PendingActionNotificationResponse.HostPendingActionNotificationResponse; import google.registry.model.poll.PollMessage; import google.registry.model.reporting.HistoryEntry; +import google.registry.model.server.Lock; import google.registry.model.transfer.TransferStatus; import google.registry.request.Action; import google.registry.request.Response; import google.registry.request.auth.Auth; import google.registry.util.Clock; import google.registry.util.NonFinalForTesting; +import google.registry.util.RequestStatusChecker; import google.registry.util.Retrier; import google.registry.util.SystemClock; import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import java.util.Optional; +import java.util.logging.Level; import javax.annotation.Nullable; import javax.inject.Inject; import javax.inject.Named; import org.joda.time.DateTime; +import org.joda.time.Duration; /** * A mapreduce that processes batch asynchronous deletions of contact and host resources by mapping @@ -110,7 +119,7 @@ public class DeleteContactsAndHostsAction implements Runnable { static final String KIND_CONTACT = getKind(ContactResource.class); static final String KIND_HOST = getKind(HostResource.class); - private static final long LEASE_MINUTES = 120; + private static final Duration LEASE_LENGTH = standardHours(4); private static final FluentLogger logger = FluentLogger.forEnclosingClass(); private static final int MAX_REDUCE_SHARDS = 50; private static final int DELETES_PER_SHARD = 5; @@ -119,20 +128,40 @@ public class DeleteContactsAndHostsAction implements Runnable { @Inject Clock clock; @Inject MapreduceRunner mrRunner; @Inject @Named(QUEUE_ASYNC_DELETE) Queue queue; + @Inject RequestStatusChecker requestStatusChecker; @Inject Response response; @Inject Retrier retrier; @Inject DeleteContactsAndHostsAction() {} @Override public void run() { - LeaseOptions options = - LeaseOptions.Builder.withCountLimit(maxLeaseCount()).leasePeriod(LEASE_MINUTES, MINUTES); - List tasks = queue.leaseTasks(options); - asyncFlowMetrics.recordContactHostDeletionBatchSize(tasks.size()); - if (tasks.isEmpty()) { - response.setPayload("No contact/host deletion tasks in pull queue."); + // Check if the lock can be acquired, and if not, a previous run of this mapreduce is still + // executing, so return early. + Optional lock = + Lock.acquire( + DeleteContactsAndHostsAction.class.getSimpleName(), + null, + LEASE_LENGTH, + requestStatusChecker, + false); + if (!lock.isPresent()) { + logRespondAndUnlock(INFO, "Can't acquire lock; aborting.", lock); return; } + + // Lease the async tasks to process. + LeaseOptions options = + LeaseOptions.Builder.withCountLimit(maxLeaseCount()) + .leasePeriod(LEASE_LENGTH.getStandardSeconds(), SECONDS); + List tasks = queue.leaseTasks(options); + asyncFlowMetrics.recordContactHostDeletionBatchSize(tasks.size()); + + // Check if there are no tasks to process, and if so, return early. + if (tasks.isEmpty()) { + logRespondAndUnlock(INFO, "No contact/host deletion tasks in pull queue; finishing.", lock); + return; + } + Multiset kindCounts = HashMultiset.create(2); ImmutableList.Builder builder = new ImmutableList.Builder<>(); ImmutableList.Builder> resourceKeys = new ImmutableList.Builder<>(); @@ -158,13 +187,13 @@ public class DeleteContactsAndHostsAction implements Runnable { deleteStaleTasksWithRetry(requestsToDelete); ImmutableList deletionRequests = builder.build(); if (deletionRequests.isEmpty()) { - logger.atInfo().log("No asynchronous deletions to process because all were already handled."); - response.setPayload("All requested deletions of contacts/hosts have already occurred."); + logRespondAndUnlock( + INFO, "No async deletions to process because all were already handled.", lock); } else { logger.atInfo().log( "Processing asynchronous deletion of %d contacts and %d hosts: %s", kindCounts.count(KIND_CONTACT), kindCounts.count(KIND_HOST), resourceKeys.build()); - runMapreduce(deletionRequests); + runMapreduce(deletionRequests, lock); } } @@ -187,27 +216,35 @@ public class DeleteContactsAndHostsAction implements Runnable { deletionRequest.requestedTime())); } - private void runMapreduce(ImmutableList deletionRequests) { + private void runMapreduce(ImmutableList deletionRequests, Optional lock) { try { int numReducers = Math.min(MAX_REDUCE_SHARDS, divide(deletionRequests.size(), DELETES_PER_SHARD, CEILING)); - response.sendJavaScriptRedirect(createJobPath(mrRunner - .setJobName("Check for EPP resource references and then delete") - .setModuleName("backend") - .setDefaultReduceShards(numReducers) - .runMapreduce( - new DeleteContactsAndHostsMapper(deletionRequests), - new DeleteEppResourceReducer(), - ImmutableList.of( - // Add an extra shard that maps over a null domain. See the mapper code for why. - new NullInput<>(), - EppResourceInputs.createEntityInput(DomainBase.class))))); + response.sendJavaScriptRedirect( + createJobPath( + mrRunner + .setJobName("Check for EPP resource references and then delete") + .setModuleName("backend") + .setDefaultReduceShards(numReducers) + .runMapreduce( + new DeleteContactsAndHostsMapper(deletionRequests), + new DeleteEppResourceReducer(), + ImmutableList.of( + // Add an extra shard that maps over a null domain. See the mapper code + // for why. + new NullInput<>(), EppResourceInputs.createEntityInput(DomainBase.class)), + new UnlockerOutput(lock.get())))); } catch (Throwable t) { - logger.atSevere().withCause(t).log( - "Error while kicking off mapreduce to delete contacts/hosts"); + logRespondAndUnlock(SEVERE, "Error starting mapreduce to delete contacts/hosts.", lock); } } + private void logRespondAndUnlock(Level level, String message, Optional lock) { + logger.at(level).log(message); + response.setPayload(message); + lock.ifPresent(Lock::release); + } + /** * A mapper that iterates over all {@link DomainBase} entities. * diff --git a/java/google/registry/batch/RefreshDnsOnHostRenameAction.java b/java/google/registry/batch/RefreshDnsOnHostRenameAction.java index ffcbf0b6c..70a3b9998 100644 --- a/java/google/registry/batch/RefreshDnsOnHostRenameAction.java +++ b/java/google/registry/batch/RefreshDnsOnHostRenameAction.java @@ -29,7 +29,10 @@ import static google.registry.model.ofy.ObjectifyService.ofy; import static google.registry.util.DateTimeUtils.latestOf; import static google.registry.util.PipelineUtils.createJobPath; import static java.util.concurrent.TimeUnit.DAYS; -import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.logging.Level.INFO; +import static java.util.logging.Level.SEVERE; +import static org.joda.time.Duration.standardHours; import com.google.appengine.api.taskqueue.LeaseOptions; import com.google.appengine.api.taskqueue.Queue; @@ -50,20 +53,25 @@ import google.registry.mapreduce.MapreduceRunner; import google.registry.mapreduce.inputs.NullInput; import google.registry.model.domain.DomainResource; import google.registry.model.host.HostResource; +import google.registry.model.server.Lock; import google.registry.request.Action; import google.registry.request.Response; import google.registry.request.auth.Auth; import google.registry.util.Clock; import google.registry.util.NonFinalForTesting; +import google.registry.util.RequestStatusChecker; import google.registry.util.Retrier; import google.registry.util.SystemClock; import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import java.util.Optional; +import java.util.logging.Level; import javax.annotation.Nullable; import javax.inject.Inject; import javax.inject.Named; import org.joda.time.DateTime; +import org.joda.time.Duration; /** Performs batched DNS refreshes for applicable domains following a host rename. */ @Action( @@ -73,26 +81,48 @@ import org.joda.time.DateTime; public class RefreshDnsOnHostRenameAction implements Runnable { private static final FluentLogger logger = FluentLogger.forEnclosingClass(); - private static final long LEASE_MINUTES = 20; + private static final Duration LEASE_LENGTH = standardHours(4); @Inject AsyncFlowMetrics asyncFlowMetrics; @Inject Clock clock; @Inject MapreduceRunner mrRunner; @Inject @Named(QUEUE_ASYNC_HOST_RENAME) Queue pullQueue; + @Inject RequestStatusChecker requestStatusChecker; @Inject Response response; @Inject Retrier retrier; @Inject RefreshDnsOnHostRenameAction() {} @Override public void run() { - LeaseOptions options = - LeaseOptions.Builder.withCountLimit(maxLeaseCount()).leasePeriod(LEASE_MINUTES, MINUTES); - List tasks = pullQueue.leaseTasks(options); - asyncFlowMetrics.recordDnsRefreshBatchSize(tasks.size()); - if (tasks.isEmpty()) { - response.setPayload("No DNS refresh on host rename tasks to process in pull queue."); + // Check if the lock can be acquired, and if not, a previous run of this mapreduce is still + // executing, so return early. + Optional lock = + Lock.acquire( + RefreshDnsOnHostRenameAction.class.getSimpleName(), + null, + LEASE_LENGTH, + requestStatusChecker, + false); + + if (!lock.isPresent()) { + logRespondAndUnlock(INFO, "Can't acquire lock; aborting.", lock); return; } + + // Lease the async tasks to process. + LeaseOptions options = + LeaseOptions.Builder.withCountLimit(maxLeaseCount()) + .leasePeriod(LEASE_LENGTH.getStandardSeconds(), SECONDS); + List tasks = pullQueue.leaseTasks(options); + asyncFlowMetrics.recordDnsRefreshBatchSize(tasks.size()); + + // Check if there are no tasks to process, and if so, return early. + if (tasks.isEmpty()) { + logRespondAndUnlock( + INFO, "No DNS refresh on host rename tasks to process in pull queue; finishing.", lock); + return; + } + ImmutableList.Builder requestsBuilder = new ImmutableList.Builder<>(); ImmutableList.Builder> hostKeys = new ImmutableList.Builder<>(); final List requestsToDelete = new ArrayList<>(); @@ -119,34 +149,41 @@ public class RefreshDnsOnHostRenameAction implements Runnable { requestsToDelete, pullQueue, asyncFlowMetrics, retrier, OperationResult.STALE); ImmutableList refreshRequests = requestsBuilder.build(); if (refreshRequests.isEmpty()) { - logger.atInfo().log( - "No asynchronous DNS refreshes to process because all renamed hosts are deleted."); - response.setPayload("All requested DNS refreshes are on hosts that were since deleted."); + logRespondAndUnlock( + INFO, "No async DNS refreshes to process because all renamed hosts are deleted.", lock); } else { logger.atInfo().log( "Processing asynchronous DNS refresh for renamed hosts: %s", hostKeys.build()); - runMapreduce(refreshRequests); + runMapreduce(refreshRequests, lock); } } - private void runMapreduce(ImmutableList refreshRequests) { + private void runMapreduce(ImmutableList refreshRequests, Optional lock) { try { - response.sendJavaScriptRedirect(createJobPath(mrRunner - .setJobName("Enqueue DNS refreshes for domains referencing renamed hosts") - .setModuleName("backend") - .setDefaultReduceShards(1) - .runMapreduce( - new RefreshDnsOnHostRenameMapper(refreshRequests, retrier), - new RefreshDnsOnHostRenameReducer(refreshRequests, retrier), - // Add an extra NullInput so that the reducer always fires exactly once. - ImmutableList.of( - new NullInput<>(), createEntityInput(DomainResource.class))))); + response.sendJavaScriptRedirect( + createJobPath( + mrRunner + .setJobName("Enqueue DNS refreshes for domains referencing renamed hosts") + .setModuleName("backend") + .setDefaultReduceShards(1) + .runMapreduce( + new RefreshDnsOnHostRenameMapper(refreshRequests, retrier), + new RefreshDnsOnHostRenameReducer(refreshRequests, lock.get(), retrier), + // Add an extra NullInput so that the reducer always fires exactly once. + ImmutableList.of( + new NullInput<>(), createEntityInput(DomainResource.class))))); } catch (Throwable t) { - logger.atSevere().withCause(t).log( - "Error while kicking off mapreduce to refresh DNS for renamed hosts."); + logRespondAndUnlock( + SEVERE, "Error starting mapreduce to refresh DNS for renamed hosts.", lock); } } + private void logRespondAndUnlock(Level level, String message, Optional lock) { + logger.at(level).log(message); + response.setPayload(message); + lock.ifPresent(Lock::release); + } + /** Map over domains and refresh the DNS of those that reference the renamed hosts. */ public static class RefreshDnsOnHostRenameMapper extends Mapper { @@ -205,27 +242,34 @@ public class RefreshDnsOnHostRenameAction implements Runnable { */ public static class RefreshDnsOnHostRenameReducer extends Reducer { - private static final long serialVersionUID = -2850944843275790412L; + private static final long serialVersionUID = 9077366205249562118L; @NonFinalForTesting private static AsyncFlowMetrics asyncFlowMetrics = new AsyncFlowMetrics(new SystemClock()); + private final Lock lock; private final Retrier retrier; private final List refreshRequests; - RefreshDnsOnHostRenameReducer(List refreshRequests, Retrier retrier) { + RefreshDnsOnHostRenameReducer( + List refreshRequests, Lock lock, Retrier retrier) { this.refreshRequests = refreshRequests; + this.lock = lock; this.retrier = retrier; } @Override public void reduce(Boolean key, ReducerInput values) { + // The reduce() method is run precisely once, because the NullInput caused the mapper to emit + // a dummy value once. deleteTasksWithRetry( refreshRequests, getQueue(QUEUE_ASYNC_HOST_RENAME), asyncFlowMetrics, retrier, OperationResult.SUCCESS); + + lock.release(); } } diff --git a/java/google/registry/mapreduce/BUILD b/java/google/registry/mapreduce/BUILD index fba909685..591edeb4f 100644 --- a/java/google/registry/mapreduce/BUILD +++ b/java/google/registry/mapreduce/BUILD @@ -9,6 +9,7 @@ java_library( srcs = glob(["*.java"]), deps = [ "//java/google/registry/mapreduce/inputs", + "//java/google/registry/model", "//java/google/registry/request", "//java/google/registry/util", "//third_party/objectify:objectify-v4_1", diff --git a/java/google/registry/mapreduce/UnlockerOutput.java b/java/google/registry/mapreduce/UnlockerOutput.java new file mode 100644 index 000000000..c37d8e51d --- /dev/null +++ b/java/google/registry/mapreduce/UnlockerOutput.java @@ -0,0 +1,65 @@ +// Copyright 2018 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.mapreduce; + +import static com.google.common.collect.ImmutableList.toImmutableList; + +import com.google.appengine.tools.mapreduce.Output; +import com.google.appengine.tools.mapreduce.OutputWriter; +import com.google.common.flogger.FluentLogger; +import google.registry.model.server.Lock; +import java.util.Collection; +import java.util.List; +import java.util.stream.Stream; + +/** An App Engine MapReduce "Output" that releases the given {@link Lock}. */ +public class UnlockerOutput extends Output { + + private static final long serialVersionUID = 2884979908715512998L; + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); + + private final Lock lock; + + public UnlockerOutput(Lock lock) { + this.lock = lock; + } + + private static class NoopWriter extends OutputWriter { + + private static final long serialVersionUID = -8327197554987150393L; + + @Override + public void write(O object) { + // Noop + } + + @Override + public boolean allowSliceRetry() { + return true; + } + } + + @Override + public List> createWriters(int numShards) { + return Stream.generate(NoopWriter::new).limit(numShards).collect(toImmutableList()); + } + + @Override + public Lock finish(Collection> writers) { + logger.atInfo().log("Mapreduce finished; releasing lock: %s", lock); + lock.release(); + return lock; + } +} diff --git a/java/google/registry/model/domain/DesignatedContact.java b/java/google/registry/model/domain/DesignatedContact.java index 62f13e1e0..b95e42401 100644 --- a/java/google/registry/model/domain/DesignatedContact.java +++ b/java/google/registry/model/domain/DesignatedContact.java @@ -14,6 +14,8 @@ package google.registry.model.domain; +import static google.registry.util.PreconditionsUtils.checkArgumentNotNull; + import com.google.common.annotations.VisibleForTesting; import com.googlecode.objectify.Key; import com.googlecode.objectify.annotation.Embed; @@ -59,7 +61,7 @@ public class DesignatedContact extends ImmutableObject { public static DesignatedContact create(Type type, Key contact) { DesignatedContact instance = new DesignatedContact(); instance.type = type; - instance.contact = contact; + instance.contact = checkArgumentNotNull(contact, "Must specify contact key"); return instance; } diff --git a/java/google/registry/model/domain/DomainBase.java b/java/google/registry/model/domain/DomainBase.java index 637855ea1..022a42cd5 100644 --- a/java/google/registry/model/domain/DomainBase.java +++ b/java/google/registry/model/domain/DomainBase.java @@ -45,6 +45,7 @@ import google.registry.model.domain.DesignatedContact.Type; import google.registry.model.domain.launch.LaunchNotice; import google.registry.model.domain.secdns.DelegationSignerData; import google.registry.model.host.HostResource; +import java.util.Objects; import java.util.Set; import java.util.function.Predicate; @@ -165,12 +166,11 @@ public abstract class DomainBase extends EppResource { /** Returns all referenced contacts from this domain or application. */ public ImmutableSet> getReferencedContacts() { - ImmutableSet.Builder> contactsBuilder = - new ImmutableSet.Builder<>(); - for (DesignatedContact designated : nullToEmptyImmutableCopy(allContacts)) { - contactsBuilder.add(designated.getContactKey()); - } - return contactsBuilder.build(); + return nullToEmptyImmutableCopy(allContacts) + .stream() + .map(DesignatedContact::getContactKey) + .filter(Objects::nonNull) + .collect(toImmutableSet()); } public String getTld() { diff --git a/java/google/registry/model/server/Lock.java b/java/google/registry/model/server/Lock.java index b39e2d31c..b897c76e0 100644 --- a/java/google/registry/model/server/Lock.java +++ b/java/google/registry/model/server/Lock.java @@ -29,6 +29,7 @@ import google.registry.model.annotations.NotBackedUp; import google.registry.model.annotations.NotBackedUp.Reason; import google.registry.util.RequestStatusChecker; import google.registry.util.RequestStatusCheckerImpl; +import java.io.Serializable; import java.util.Optional; import javax.annotation.Nullable; import org.joda.time.DateTime; @@ -45,8 +46,9 @@ import org.joda.time.Duration; */ @Entity @NotBackedUp(reason = Reason.TRANSIENT) -public class Lock extends ImmutableObject { +public class Lock extends ImmutableObject implements Serializable { + private static final long serialVersionUID = 756397280691684645L; private static final FluentLogger logger = FluentLogger.forEnclosingClass(); /** Disposition of locking, for monitoring. */ @@ -164,10 +166,11 @@ public class Lock extends ImmutableObject { /** Try to acquire a lock. Returns absent if it can't be acquired. */ public static Optional acquire( - final String resourceName, - @Nullable final String tld, - final Duration leaseLength, - final RequestStatusChecker requestStatusChecker) { + String resourceName, + @Nullable String tld, + Duration leaseLength, + RequestStatusChecker requestStatusChecker, + boolean checkThreadRunning) { String lockId = makeLockId(resourceName, tld); // It's important to use transactNew rather than transact, because a Lock can be used to control // access to resources like GCS that can't be transactionally rolled back. Therefore, the lock @@ -189,7 +192,8 @@ public class Lock extends ImmutableObject { lockState = LockState.FREE; } else if (isAtOrAfter(now, lock.expirationTime)) { lockState = LockState.TIMED_OUT; - } else if (!requestStatusChecker.isRunning(lock.requestLogId)) { + } else if (checkThreadRunning + && !requestStatusChecker.isRunning(lock.requestLogId)) { lockState = LockState.OWNER_DIED; } else { lockState = LockState.IN_USE; diff --git a/java/google/registry/request/lock/LockHandlerImpl.java b/java/google/registry/request/lock/LockHandlerImpl.java index 197c7e546..2d44dd152 100644 --- a/java/google/registry/request/lock/LockHandlerImpl.java +++ b/java/google/registry/request/lock/LockHandlerImpl.java @@ -105,7 +105,7 @@ public class LockHandlerImpl implements LockHandler { /** Allows injection of mock Lock in tests. */ @VisibleForTesting Optional acquire(String lockName, @Nullable String tld, Duration leaseLength) { - return Lock.acquire(lockName, tld, leaseLength, requestStatusChecker); + return Lock.acquire(lockName, tld, leaseLength, requestStatusChecker, true); } /** A {@link Callable} that acquires and releases a lock around a delegate {@link Callable}. */ diff --git a/javatests/google/registry/batch/DeleteContactsAndHostsActionTest.java b/javatests/google/registry/batch/DeleteContactsAndHostsActionTest.java index 5d1993227..9019d4288 100644 --- a/javatests/google/registry/batch/DeleteContactsAndHostsActionTest.java +++ b/javatests/google/registry/batch/DeleteContactsAndHostsActionTest.java @@ -52,11 +52,14 @@ import static google.registry.testing.TaskQueueHelper.assertTasksEnqueued; import static google.registry.util.DateTimeUtils.END_OF_TIME; import static org.joda.time.DateTimeZone.UTC; import static org.joda.time.Duration.millis; +import static org.joda.time.Duration.standardDays; import static org.joda.time.Duration.standardHours; import static org.joda.time.Duration.standardSeconds; +import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; import com.google.appengine.api.modules.ModulesService; import com.google.appengine.api.taskqueue.TaskOptions; @@ -88,6 +91,7 @@ import google.registry.model.poll.PollMessage; import google.registry.model.poll.PollMessage.OneTime; import google.registry.model.registry.Registry; import google.registry.model.reporting.HistoryEntry; +import google.registry.model.server.Lock; import google.registry.model.transfer.TransferData; import google.registry.model.transfer.TransferResponse; import google.registry.model.transfer.TransferStatus; @@ -95,8 +99,10 @@ import google.registry.testing.FakeClock; import google.registry.testing.FakeResponse; import google.registry.testing.FakeSleeper; import google.registry.testing.InjectRule; +import google.registry.testing.MockitoJUnitRule; import google.registry.testing.TaskQueueHelper.TaskMatcher; import google.registry.testing.mapreduce.MapreduceTestCase; +import google.registry.util.RequestStatusChecker; import google.registry.util.Retrier; import google.registry.util.Sleeper; import google.registry.util.SystemSleeper; @@ -108,17 +114,20 @@ import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.Mock; /** Unit tests for {@link DeleteContactsAndHostsAction}. */ @RunWith(JUnit4.class) public class DeleteContactsAndHostsActionTest extends MapreduceTestCase { - @Rule - public final InjectRule inject = new InjectRule(); + @Rule public final InjectRule inject = new InjectRule(); + @Rule public final MockitoJUnitRule mocks = MockitoJUnitRule.create(); - AsyncFlowEnqueuer enqueuer; - FakeClock clock = new FakeClock(DateTime.parse("2015-01-15T11:22:33Z")); + private AsyncFlowEnqueuer enqueuer; + private final FakeClock clock = new FakeClock(DateTime.parse("2015-01-15T11:22:33Z")); + private final FakeResponse fakeResponse = new FakeResponse(); + @Mock private RequestStatusChecker requestStatusChecker; private void runMapreduce() throws Exception { clock.advanceBy(standardSeconds(5)); @@ -134,8 +143,17 @@ public class DeleteContactsAndHostsActionTest ofy().clearSessionCache(); } + /** Kicks off, but does not run, the mapreduce tasks. Useful for testing validation/setup. */ + private void enqueueMapreduceOnly() { + clock.advanceBy(standardSeconds(5)); + action.run(); + clock.advanceBy(standardSeconds(5)); + ofy().clearSessionCache(); + } + @Before public void setup() { + inject.setStaticField(Ofy.class, "clock", clock); enqueuer = new AsyncFlowEnqueuer( getQueue(QUEUE_ASYNC_ACTIONS), @@ -150,10 +168,13 @@ public class DeleteContactsAndHostsActionTest inject.setStaticField(DeleteEppResourceReducer.class, "asyncFlowMetrics", asyncFlowMetricsMock); action.clock = clock; action.mrRunner = makeDefaultRunner(); - action.response = new FakeResponse(); + action.requestStatusChecker = requestStatusChecker; + action.response = fakeResponse; action.retrier = new Retrier(new FakeSleeper(clock), 1); action.queue = getQueue(QUEUE_ASYNC_DELETE); - inject.setStaticField(Ofy.class, "clock", clock); + when(requestStatusChecker.getLogId()).thenReturn("requestId"); + when(requestStatusChecker.isRunning(anyString())) + .thenThrow(new AssertionError("Should not be called")); createTld("tld"); clock.advanceOneMilli(); @@ -207,6 +228,35 @@ public class DeleteContactsAndHostsActionTest runSuccessfulContactDeletionTest(Optional.empty()); } + @Test + public void test_cannotAcquireLock() { + // Make lock acquisition fail. + acquireLock(); + enqueueMapreduceOnly(); + assertThat(fakeResponse.getPayload()).isEqualTo("Can't acquire lock; aborting."); + } + + @Test + public void test_mapreduceHasWorkToDo_lockIsAcquired() { + ContactResource contact = persistContactPendingDelete("blah8221"); + persistResource(newDomainResource("example.tld", contact)); + DateTime timeEnqueued = clock.nowUtc(); + enqueuer.enqueueAsyncDelete( + contact, + timeEnqueued, + "TheRegistrar", + Trid.create("fakeClientTrid", "fakeServerTrid"), + false); + enqueueMapreduceOnly(); + assertThat(acquireLock()).isEmpty(); + } + + @Test + public void test_noTasksToLease_releasesLockImmediately() { + enqueueMapreduceOnly(); + // If the Lock was correctly released, then we can acquire it now. + assertThat(acquireLock()).isPresent(); + } private void runSuccessfulContactDeletionTest(Optional clientTrid) throws Exception { ContactResource contact = persistContactWithPii("jim919"); @@ -454,7 +504,7 @@ public class DeleteContactsAndHostsActionTest "TheRegistrar", Trid.create("fakeClientTrid", "fakeServerTrid"), false); - runMapreduce(); + enqueueMapreduceOnly(); assertTasksEnqueued( QUEUE_ASYNC_DELETE, new TaskMatcher() @@ -473,6 +523,7 @@ public class DeleteContactsAndHostsActionTest .param("serverTransactionId", "fakeServerTrid") .param("isSuperuser", "false") .param("requestedTime", timeBeforeRun.toString())); + assertThat(acquireLock()).isPresent(); } @Test @@ -480,7 +531,7 @@ public class DeleteContactsAndHostsActionTest TaskOptions task = TaskOptions.Builder.withMethod(Method.PULL).param("gobbledygook", "kljhadfgsd9f7gsdfh"); getQueue(QUEUE_ASYNC_DELETE).add(task); - runMapreduce(); + enqueueMapreduceOnly(); assertTasksEnqueued( QUEUE_ASYNC_DELETE, new TaskMatcher() @@ -488,6 +539,7 @@ public class DeleteContactsAndHostsActionTest .etaDelta(standardHours(23), standardHours(25))); verify(action.asyncFlowMetrics).recordContactHostDeletionBatchSize(1L); verifyNoMoreInteractions(action.asyncFlowMetrics); + assertThat(acquireLock()).isPresent(); } @Test @@ -507,7 +559,7 @@ public class DeleteContactsAndHostsActionTest "TheRegistrar", Trid.create("fakeClientTrid", "fakeServerTrid"), false); - runMapreduce(); + enqueueMapreduceOnly(); assertThat(loadByForeignKey(ContactResource.class, "blah2222", clock.nowUtc())) .isEqualTo(contact); assertThat(loadByForeignKey(HostResource.class, "rustles.your.jimmies", clock.nowUtc())) @@ -519,6 +571,7 @@ public class DeleteContactsAndHostsActionTest verify(action.asyncFlowMetrics) .recordAsyncFlowResult(OperationType.HOST_DELETE, STALE, timeEnqueued); verifyNoMoreInteractions(action.asyncFlowMetrics); + assertThat(acquireLock()).isPresent(); } @Test @@ -537,10 +590,11 @@ public class DeleteContactsAndHostsActionTest "TheRegistrar", Trid.create("fakeClientTrid", "fakeServerTrid"), false); - runMapreduce(); + enqueueMapreduceOnly(); assertThat(ofy().load().entity(contactDeleted).now()).isEqualTo(contactDeleted); assertThat(ofy().load().entity(hostDeleted).now()).isEqualTo(hostDeleted); assertNoTasksEnqueued(QUEUE_ASYNC_DELETE); + assertThat(acquireLock()).isPresent(); } @Test @@ -894,4 +948,13 @@ public class DeleteContactsAndHostsActionTest .setNameservers(ImmutableSet.of(Key.create(host))) .build()); } + + private Optional acquireLock() { + return Lock.acquire( + DeleteContactsAndHostsAction.class.getSimpleName(), + null, + standardDays(30), + requestStatusChecker, + false); + } } diff --git a/javatests/google/registry/batch/RefreshDnsOnHostRenameActionTest.java b/javatests/google/registry/batch/RefreshDnsOnHostRenameActionTest.java index 11a8b60f6..f8170ec7b 100644 --- a/javatests/google/registry/batch/RefreshDnsOnHostRenameActionTest.java +++ b/javatests/google/registry/batch/RefreshDnsOnHostRenameActionTest.java @@ -15,6 +15,8 @@ package google.registry.batch; import static com.google.appengine.api.taskqueue.QueueFactory.getQueue; +import static com.google.common.truth.Truth.assertThat; +import static com.google.common.truth.Truth8.assertThat; import static google.registry.flows.async.AsyncFlowEnqueuer.QUEUE_ASYNC_ACTIONS; import static google.registry.flows.async.AsyncFlowEnqueuer.QUEUE_ASYNC_DELETE; import static google.registry.flows.async.AsyncFlowEnqueuer.QUEUE_ASYNC_HOST_RENAME; @@ -33,12 +35,15 @@ import static google.registry.testing.TaskQueueHelper.assertNoTasksEnqueued; import static google.registry.testing.TaskQueueHelper.assertTasksEnqueued; import static google.registry.util.DateTimeUtils.START_OF_TIME; import static org.joda.time.Duration.millis; +import static org.joda.time.Duration.standardDays; import static org.joda.time.Duration.standardHours; import static org.joda.time.Duration.standardSeconds; +import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; import com.google.appengine.api.modules.ModulesService; import com.google.common.collect.ImmutableSet; @@ -48,15 +53,19 @@ import google.registry.flows.async.AsyncFlowEnqueuer; import google.registry.flows.async.AsyncFlowMetrics; import google.registry.flows.async.AsyncFlowMetrics.OperationResult; import google.registry.model.host.HostResource; +import google.registry.model.server.Lock; import google.registry.testing.FakeClock; import google.registry.testing.FakeResponse; import google.registry.testing.FakeSleeper; import google.registry.testing.InjectRule; +import google.registry.testing.MockitoJUnitRule; import google.registry.testing.TaskQueueHelper.TaskMatcher; import google.registry.testing.mapreduce.MapreduceTestCase; +import google.registry.util.RequestStatusChecker; import google.registry.util.Retrier; import google.registry.util.Sleeper; import google.registry.util.SystemSleeper; +import java.util.Optional; import org.joda.time.DateTime; import org.joda.time.Duration; import org.junit.Before; @@ -64,17 +73,20 @@ import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.Mock; /** Unit tests for {@link RefreshDnsOnHostRenameAction}. */ @RunWith(JUnit4.class) public class RefreshDnsOnHostRenameActionTest extends MapreduceTestCase { - @Rule - public InjectRule inject = new InjectRule(); + @Rule public final InjectRule inject = new InjectRule(); + @Rule public final MockitoJUnitRule mocks = MockitoJUnitRule.create(); private AsyncFlowEnqueuer enqueuer; private final FakeClock clock = new FakeClock(DateTime.parse("2015-01-15T11:22:33Z")); + private final FakeResponse fakeResponse = new FakeResponse(); + @Mock private RequestStatusChecker requestStatusChecker; @Before public void setup() { @@ -95,8 +107,12 @@ public class RefreshDnsOnHostRenameActionTest action.clock = clock; action.mrRunner = makeDefaultRunner(); action.pullQueue = getQueue(QUEUE_ASYNC_HOST_RENAME); - action.response = new FakeResponse(); + action.requestStatusChecker = requestStatusChecker; + action.response = fakeResponse; action.retrier = new Retrier(new FakeSleeper(clock), 1); + when(requestStatusChecker.getLogId()).thenReturn("requestId"); + when(requestStatusChecker.isRunning(anyString())) + .thenThrow(new AssertionError("Should not be called")); } private void runMapreduce() throws Exception { @@ -112,6 +128,14 @@ public class RefreshDnsOnHostRenameActionTest ofy().clearSessionCache(); } + /** Kicks off, but does not run, the mapreduce tasks. Useful for testing validation/setup. */ + private void enqueueMapreduceOnly() { + clock.advanceOneMilli(); + action.run(); + clock.advanceBy(standardSeconds(5)); + ofy().clearSessionCache(); + } + @Test public void testSuccess_dnsUpdateEnqueued() throws Exception { HostResource host = persistActiveHost("ns1.example.tld"); @@ -191,12 +215,48 @@ public class RefreshDnsOnHostRenameActionTest public void testRun_hostDoesntExist_delaysTask() throws Exception { HostResource host = newHostResource("ns1.example.tld"); enqueuer.enqueueAsyncDnsRefresh(host, clock.nowUtc()); - runMapreduce(); + enqueueMapreduceOnly(); assertNoDnsTasksEnqueued(); assertTasksEnqueued( QUEUE_ASYNC_HOST_RENAME, new TaskMatcher() .etaDelta(standardHours(23), standardHours(25)) .param("hostKey", Key.create(host).getString())); + assertThat(acquireLock()).isPresent(); + } + + @Test + public void test_cannotAcquireLock() { + // Make lock acquisition fail. + acquireLock(); + enqueueMapreduceOnly(); + assertThat(fakeResponse.getPayload()).isEqualTo("Can't acquire lock; aborting."); + assertNoDnsTasksEnqueued(); + } + + @Test + public void test_mapreduceHasWorkToDo_lockIsAcquired() { + HostResource host = persistActiveHost("ns1.example.tld"); + enqueuer.enqueueAsyncDnsRefresh(host, clock.nowUtc()); + enqueueMapreduceOnly(); + assertThat(acquireLock()).isEmpty(); + } + + @Test + public void test_noTasksToLease_releasesLockImmediately() throws Exception { + enqueueMapreduceOnly(); + assertNoDnsTasksEnqueued(); + assertNoTasksEnqueued(QUEUE_ASYNC_HOST_RENAME); + // If the Lock was correctly released, then we can acquire it now. + assertThat(acquireLock()).isPresent(); + } + + private Optional acquireLock() { + return Lock.acquire( + RefreshDnsOnHostRenameAction.class.getSimpleName(), + null, + standardDays(30), + requestStatusChecker, + false); } } diff --git a/javatests/google/registry/model/server/LockTest.java b/javatests/google/registry/model/server/LockTest.java index ced8a37cf..33fa11e5c 100644 --- a/javatests/google/registry/model/server/LockTest.java +++ b/javatests/google/registry/model/server/LockTest.java @@ -50,16 +50,12 @@ public class LockTest { private static final RequestStatusChecker requestStatusChecker = mock(RequestStatusChecker.class); private static final FakeClock clock = new FakeClock(); - @Rule - public final AppEngineRule appEngine = AppEngineRule.builder() - .withDatastore() - .build(); + @Rule public final AppEngineRule appEngine = AppEngineRule.builder().withDatastore().build(); + @Rule public final InjectRule inject = new InjectRule(); - @Rule - public final InjectRule inject = new InjectRule(); private Optional acquire(String tld, Duration leaseLength, LockState expectedLockState) { Lock.lockMetrics = mock(LockMetrics.class); - Optional lock = Lock.acquire(RESOURCE_NAME, tld, leaseLength, requestStatusChecker); + Optional lock = Lock.acquire(RESOURCE_NAME, tld, leaseLength, requestStatusChecker, true); verify(Lock.lockMetrics).recordAcquire(RESOURCE_NAME, tld, expectedLockState); verifyNoMoreInteractions(Lock.lockMetrics); Lock.lockMetrics = null; @@ -75,7 +71,6 @@ public class LockTest { Lock.lockMetrics = null; } - @Before public void setUp() { inject.setStaticField(Ofy.class, "clock", clock); Lock.lockMetrics = null; @@ -138,7 +133,7 @@ public class LockTest { IllegalArgumentException thrown = assertThrows( IllegalArgumentException.class, - () -> Lock.acquire("", "", TWO_MILLIS, requestStatusChecker)); + () -> Lock.acquire("", "", TWO_MILLIS, requestStatusChecker, true)); assertThat(thrown).hasMessageThat().contains("resourceName cannot be null or empty"); } }