diff --git a/java/google/registry/backup/CommitLogCheckpointAction.java b/java/google/registry/backup/CommitLogCheckpointAction.java index c353e5c37..73fbfb70f 100644 --- a/java/google/registry/backup/CommitLogCheckpointAction.java +++ b/java/google/registry/backup/CommitLogCheckpointAction.java @@ -28,7 +28,7 @@ import google.registry.request.Action; import google.registry.request.auth.Auth; import google.registry.util.Clock; import google.registry.util.FormattingLogger; -import google.registry.util.TaskEnqueuer; +import google.registry.util.TaskQueueUtils; import javax.inject.Inject; import org.joda.time.DateTime; @@ -56,7 +56,7 @@ public final class CommitLogCheckpointAction implements Runnable { @Inject Clock clock; @Inject CommitLogCheckpointStrategy strategy; - @Inject TaskEnqueuer taskEnqueuer; + @Inject TaskQueueUtils taskQueueUtils; @Inject CommitLogCheckpointAction() {} @Override @@ -76,7 +76,7 @@ public final class CommitLogCheckpointAction implements Runnable { .entities( checkpoint, CommitLogCheckpointRoot.create(checkpoint.getCheckpointTime())); // Enqueue a diff task between previous and current checkpoints. - taskEnqueuer.enqueue( + taskQueueUtils.enqueue( getQueue(QUEUE_NAME), withUrl(ExportCommitLogDiffAction.PATH) .param(LOWER_CHECKPOINT_TIME_PARAM, lastWrittenTime.toString()) diff --git a/java/google/registry/config/RegistryConfig.java b/java/google/registry/config/RegistryConfig.java index a3dc8355d..13684ac48 100644 --- a/java/google/registry/config/RegistryConfig.java +++ b/java/google/registry/config/RegistryConfig.java @@ -27,6 +27,7 @@ import com.google.common.net.HostAndPort; import dagger.Module; import dagger.Provides; import google.registry.config.RegistryConfigSettings.AppEngine.ToolsServiceUrl; +import google.registry.util.TaskQueueUtils; import java.lang.annotation.Documented; import java.lang.annotation.Retention; import java.net.URI; @@ -853,7 +854,7 @@ public final class RegistryConfig { *

Note that this uses {@code @Named} instead of {@code @Config} so that it can be used from * the low-level util package, which cannot have a dependency on the config package. * - * @see google.registry.util.TaskEnqueuer + * @see TaskQueueUtils */ @Provides @Named("transientFailureRetries") diff --git a/java/google/registry/cron/CommitLogFanoutAction.java b/java/google/registry/cron/CommitLogFanoutAction.java index 20188935b..6b18aa5f3 100644 --- a/java/google/registry/cron/CommitLogFanoutAction.java +++ b/java/google/registry/cron/CommitLogFanoutAction.java @@ -23,7 +23,7 @@ import google.registry.model.ofy.CommitLogBucket; import google.registry.request.Action; import google.registry.request.Parameter; import google.registry.request.auth.Auth; -import google.registry.util.TaskEnqueuer; +import google.registry.util.TaskQueueUtils; import java.util.Optional; import java.util.Random; import javax.inject.Inject; @@ -40,7 +40,7 @@ public final class CommitLogFanoutAction implements Runnable { private static final Random random = new Random(); - @Inject TaskEnqueuer taskEnqueuer; + @Inject TaskQueueUtils taskQueueUtils; @Inject @Parameter("endpoint") String endpoint; @Inject @Parameter("queue") String queue; @Inject @Parameter("jitterSeconds") Optional jitterSeconds; @@ -55,7 +55,7 @@ public final class CommitLogFanoutAction implements Runnable { .countdownMillis(jitterSeconds.isPresent() ? random.nextInt((int) SECONDS.toMillis(jitterSeconds.get())) : 0); - taskEnqueuer.enqueue(taskQueue, taskOptions); + taskQueueUtils.enqueue(taskQueue, taskOptions); } } } diff --git a/java/google/registry/cron/TldFanoutAction.java b/java/google/registry/cron/TldFanoutAction.java index c8aba6f16..c6ff7c5b8 100644 --- a/java/google/registry/cron/TldFanoutAction.java +++ b/java/google/registry/cron/TldFanoutAction.java @@ -43,7 +43,7 @@ import google.registry.request.RequestParameters; import google.registry.request.Response; import google.registry.request.auth.Auth; import google.registry.util.FormattingLogger; -import google.registry.util.TaskEnqueuer; +import google.registry.util.TaskQueueUtils; import java.util.Optional; import java.util.Random; import java.util.stream.Stream; @@ -103,7 +103,7 @@ public final class TldFanoutAction implements Runnable { private static final FormattingLogger logger = FormattingLogger.getLoggerForCallerClass(); - @Inject TaskEnqueuer taskEnqueuer; + @Inject TaskQueueUtils taskQueueUtils; @Inject Response response; @Inject @Parameter(ENDPOINT_PARAM) String endpoint; @Inject @Parameter(QUEUE_PARAM) String queue; @@ -144,7 +144,7 @@ public final class TldFanoutAction implements Runnable { } for (String tld : tlds) { TaskOptions taskOptions = createTaskOptions(tld, flowThruParams); - TaskHandle taskHandle = taskEnqueuer.enqueue(taskQueue, taskOptions); + TaskHandle taskHandle = taskQueueUtils.enqueue(taskQueue, taskOptions); outputPayload.append( String.format( "- Task: '%s', tld: '%s', endpoint: '%s'\n", diff --git a/java/google/registry/dns/ReadDnsQueueAction.java b/java/google/registry/dns/ReadDnsQueueAction.java index f1a96c1f9..1ea056e96 100644 --- a/java/google/registry/dns/ReadDnsQueueAction.java +++ b/java/google/registry/dns/ReadDnsQueueAction.java @@ -47,7 +47,7 @@ import google.registry.request.RequestParameters; import google.registry.request.auth.Auth; import google.registry.util.Clock; import google.registry.util.FormattingLogger; -import google.registry.util.TaskEnqueuer; +import google.registry.util.TaskQueueUtils; import java.io.UnsupportedEncodingException; import java.util.Collection; import java.util.Comparator; @@ -101,7 +101,7 @@ public final class ReadDnsQueueAction implements Runnable { @Inject Clock clock; @Inject DnsQueue dnsQueue; @Inject HashFunction hashFunction; - @Inject TaskEnqueuer taskEnqueuer; + @Inject TaskQueueUtils taskQueueUtils; @Inject ReadDnsQueueAction() {} /** Container for items we pull out of the DNS pull queue and process for fanout. */ @@ -374,7 +374,7 @@ public final class ReadDnsQueueAction implements Runnable { : PublishDnsUpdatesAction.PARAM_DOMAINS, refreshItem.name()); } - taskEnqueuer.enqueue(dnsPublishPushQueue, options); + taskQueueUtils.enqueue(dnsPublishPushQueue, options); } } } diff --git a/java/google/registry/export/BigqueryPollJobAction.java b/java/google/registry/export/BigqueryPollJobAction.java index 7c78daf5e..1174c91d2 100644 --- a/java/google/registry/export/BigqueryPollJobAction.java +++ b/java/google/registry/export/BigqueryPollJobAction.java @@ -33,7 +33,7 @@ import google.registry.request.HttpException.NotModifiedException; import google.registry.request.Payload; import google.registry.request.auth.Auth; import google.registry.util.FormattingLogger; -import google.registry.util.TaskEnqueuer; +import google.registry.util.TaskQueueUtils; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -64,7 +64,7 @@ public class BigqueryPollJobAction implements Runnable { static final Duration POLL_COUNTDOWN = Duration.standardSeconds(20); @Inject Bigquery bigquery; - @Inject TaskEnqueuer enqueuer; + @Inject TaskQueueUtils taskQueueUtils; @Inject @Header(CHAINED_TASK_QUEUE_HEADER) Lazy chainedQueueName; @Inject @Header(PROJECT_ID_HEADER) String projectId; @Inject @Header(JOB_ID_HEADER) String jobId; @@ -84,7 +84,7 @@ public class BigqueryPollJobAction implements Runnable { } catch (ClassNotFoundException | IOException e) { throw new BadRequestException("Cannot deserialize task from payload", e); } - String taskName = enqueuer.enqueue(getQueue(chainedQueueName.get()), task).getName(); + String taskName = taskQueueUtils.enqueue(getQueue(chainedQueueName.get()), task).getName(); logger.infofmt( "Added chained task %s for %s to queue %s: %s", taskName, @@ -127,16 +127,17 @@ public class BigqueryPollJobAction implements Runnable { /** Helper class to enqueue a bigquery poll job. */ public static class BigqueryPollJobEnqueuer { - private final TaskEnqueuer enqueuer; + private final TaskQueueUtils taskQueueUtils; @Inject - BigqueryPollJobEnqueuer(TaskEnqueuer enqueuer) { - this.enqueuer = enqueuer; + BigqueryPollJobEnqueuer(TaskQueueUtils taskQueueUtils) { + this.taskQueueUtils = taskQueueUtils; } /** Enqueue a task to poll for the success or failure of the referenced BigQuery job. */ public TaskHandle enqueuePollTask(JobReference jobRef) { - return enqueuer.enqueue(getQueue(QUEUE), createCommonPollTask(jobRef).method(Method.GET)); + return taskQueueUtils.enqueue( + getQueue(QUEUE), createCommonPollTask(jobRef).method(Method.GET)); } /** @@ -148,7 +149,7 @@ public class BigqueryPollJobAction implements Runnable { // Serialize the chainedTask into a byte array to put in the task payload. ByteArrayOutputStream taskBytes = new ByteArrayOutputStream(); new ObjectOutputStream(taskBytes).writeObject(chainedTask); - return enqueuer.enqueue( + return taskQueueUtils.enqueue( getQueue(QUEUE), createCommonPollTask(jobRef) .method(Method.POST) diff --git a/java/google/registry/loadtest/LoadTestAction.java b/java/google/registry/loadtest/LoadTestAction.java index 1d245e644..4161ee76c 100644 --- a/java/google/registry/loadtest/LoadTestAction.java +++ b/java/google/registry/loadtest/LoadTestAction.java @@ -34,7 +34,7 @@ import google.registry.request.Parameter; import google.registry.request.auth.Auth; import google.registry.security.XsrfTokenManager; import google.registry.util.FormattingLogger; -import google.registry.util.TaskEnqueuer; +import google.registry.util.TaskQueueUtils; import java.util.Arrays; import java.util.Iterator; import java.util.List; @@ -147,8 +147,7 @@ public class LoadTestAction implements Runnable { @Parameter("hostInfos") int hostInfosPerSecond; - @Inject - TaskEnqueuer taskEnqueuer; + @Inject TaskQueueUtils taskQueueUtils; private final String xmlContactCreateTmpl; private final String xmlContactCreateFail; @@ -344,7 +343,7 @@ public class LoadTestAction implements Runnable { List> chunks = partition(tasks, maxTasksPerAdd()); // Farm out tasks to multiple queues to work around queue qps quotas. for (int i = 0; i < chunks.size(); i++) { - taskEnqueuer.enqueue(getQueue("load" + (i % NUM_QUEUES)), chunks.get(i)); + taskQueueUtils.enqueue(getQueue("load" + (i % NUM_QUEUES)), chunks.get(i)); } } } diff --git a/java/google/registry/rde/RdeStagingReducer.java b/java/google/registry/rde/RdeStagingReducer.java index 10da93650..1a99bb962 100644 --- a/java/google/registry/rde/RdeStagingReducer.java +++ b/java/google/registry/rde/RdeStagingReducer.java @@ -44,7 +44,7 @@ import google.registry.request.RequestParameters; import google.registry.request.lock.LockHandler; import google.registry.tldconfig.idn.IdnTableEnum; import google.registry.util.FormattingLogger; -import google.registry.util.TaskEnqueuer; +import google.registry.util.TaskQueueUtils; import google.registry.xjc.rdeheader.XjcRdeHeader; import google.registry.xjc.rdeheader.XjcRdeHeaderElement; import google.registry.xml.XmlException; @@ -70,7 +70,7 @@ public final class RdeStagingReducer extends Reducer tasks = queue.leaseTasks(LeaseOptions.Builder .withTag(tld) .leasePeriod(LEASE_PERIOD.getMillis(), TimeUnit.MILLISECONDS) - .countLimit(BATCH_SIZE)); + .countLimit(TaskQueueUtils.getBatchSize())); allTasks.addAll(tasks); if (tasks.isEmpty()) { return allTasks.build(); diff --git a/java/google/registry/tmch/NordnUploadAction.java b/java/google/registry/tmch/NordnUploadAction.java index db05991df..d1442cc95 100644 --- a/java/google/registry/tmch/NordnUploadAction.java +++ b/java/google/registry/tmch/NordnUploadAction.java @@ -41,6 +41,7 @@ import google.registry.request.RequestParameters; import google.registry.request.auth.Auth; import google.registry.util.Clock; import google.registry.util.FormattingLogger; +import google.registry.util.TaskQueueUtils; import google.registry.util.UrlFetchException; import java.io.IOException; import java.net.URL; @@ -84,6 +85,7 @@ public final class NordnUploadAction implements Runnable { @Inject @Config("tmchMarksdbUrl") String tmchMarksdbUrl; @Inject @Parameter(LORDN_PHASE_PARAM) String phase; @Inject @Parameter(RequestParameters.PARAM_TLD) String tld; + @Inject TaskQueueUtils taskQueueUtils; @Inject NordnUploadAction() {} /** @@ -117,7 +119,7 @@ public final class NordnUploadAction implements Runnable { if (!tasks.isEmpty()) { String csvData = convertTasksToCsv(tasks, now, columns); uploadCsvToLordn(String.format("/LORDN/%s/%s", tld, phase), csvData); - queue.deleteTask(tasks); + taskQueueUtils.deleteTasks(queue, tasks); } } diff --git a/java/google/registry/util/TaskEnqueuer.java b/java/google/registry/util/TaskQueueUtils.java similarity index 74% rename from java/google/registry/util/TaskEnqueuer.java rename to java/google/registry/util/TaskQueueUtils.java index 47f5b2466..c70becfc7 100644 --- a/java/google/registry/util/TaskEnqueuer.java +++ b/java/google/registry/util/TaskQueueUtils.java @@ -1,4 +1,4 @@ -// Copyright 2017 The Nomulus Authors. All Rights Reserved. +// Copyright 2018 The Nomulus Authors. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -20,12 +20,14 @@ import com.google.appengine.api.taskqueue.Queue; import com.google.appengine.api.taskqueue.TaskHandle; import com.google.appengine.api.taskqueue.TaskOptions; import com.google.appengine.api.taskqueue.TransientFailureException; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; import java.io.Serializable; import java.util.List; import javax.inject.Inject; /** Utilities for dealing with App Engine task queues. */ -public class TaskEnqueuer implements Serializable { +public class TaskQueueUtils implements Serializable { private static final long serialVersionUID = 7893211200220508362L; @@ -34,10 +36,23 @@ public class TaskEnqueuer implements Serializable { private final Retrier retrier; @Inject - public TaskEnqueuer(Retrier retrier) { + public TaskQueueUtils(Retrier retrier) { this.retrier = retrier; } + @NonFinalForTesting + @VisibleForTesting + static int BATCH_SIZE = 1000; + + /** + * The batch size to use for App Engine task queue operations. + * + *

Note that 1,000 is currently the maximum allowable batch size in App Engine. + */ + public static int getBatchSize() { + return BATCH_SIZE; + } + /** * Adds a task to a App Engine task queue in a reliable manner. * @@ -73,4 +88,14 @@ public class TaskEnqueuer implements Serializable { }, TransientFailureException.class); } + + /** Deletes the specified tasks from the queue in batches, with retrying. */ + public void deleteTasks(Queue queue, List tasks) { + Lists.partition(tasks, BATCH_SIZE) + .stream() + .forEach( + batch -> + retrier.callWithRetry( + () -> queue.deleteTask(batch), TransientFailureException.class)); + } } diff --git a/javatests/google/registry/backup/CommitLogCheckpointActionTest.java b/javatests/google/registry/backup/CommitLogCheckpointActionTest.java index 736192ffb..ad7331b6c 100644 --- a/javatests/google/registry/backup/CommitLogCheckpointActionTest.java +++ b/javatests/google/registry/backup/CommitLogCheckpointActionTest.java @@ -31,7 +31,7 @@ import google.registry.testing.AppEngineRule; import google.registry.testing.FakeClock; import google.registry.testing.TaskQueueHelper.TaskMatcher; import google.registry.util.Retrier; -import google.registry.util.TaskEnqueuer; +import google.registry.util.TaskQueueUtils; import org.joda.time.DateTime; import org.junit.Before; import org.junit.Rule; @@ -60,7 +60,7 @@ public class CommitLogCheckpointActionTest { public void before() throws Exception { task.clock = new FakeClock(now); task.strategy = strategy; - task.taskEnqueuer = new TaskEnqueuer(new Retrier(null, 1)); + task.taskQueueUtils = new TaskQueueUtils(new Retrier(null, 1)); when(strategy.computeCheckpoint()) .thenReturn( CommitLogCheckpoint.create( diff --git a/javatests/google/registry/cron/CommitLogFanoutActionTest.java b/javatests/google/registry/cron/CommitLogFanoutActionTest.java index 1fda93201..3c4d85fe1 100644 --- a/javatests/google/registry/cron/CommitLogFanoutActionTest.java +++ b/javatests/google/registry/cron/CommitLogFanoutActionTest.java @@ -22,7 +22,7 @@ import google.registry.model.ofy.CommitLogBucket; import google.registry.testing.AppEngineRule; import google.registry.testing.TaskQueueHelper.TaskMatcher; import google.registry.util.Retrier; -import google.registry.util.TaskEnqueuer; +import google.registry.util.TaskQueueUtils; import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -54,7 +54,7 @@ public class CommitLogFanoutActionTest { @Test public void testSuccess() throws Exception { CommitLogFanoutAction action = new CommitLogFanoutAction(); - action.taskEnqueuer = new TaskEnqueuer(new Retrier(null, 1)); + action.taskQueueUtils = new TaskQueueUtils(new Retrier(null, 1)); action.endpoint = ENDPOINT; action.queue = QUEUE; action.jitterSeconds = Optional.empty(); diff --git a/javatests/google/registry/cron/TldFanoutActionTest.java b/javatests/google/registry/cron/TldFanoutActionTest.java index 265456cc4..e747a95ec 100644 --- a/javatests/google/registry/cron/TldFanoutActionTest.java +++ b/javatests/google/registry/cron/TldFanoutActionTest.java @@ -35,7 +35,7 @@ import google.registry.testing.AppEngineRule; import google.registry.testing.FakeResponse; import google.registry.testing.TaskQueueHelper.TaskMatcher; import google.registry.util.Retrier; -import google.registry.util.TaskEnqueuer; +import google.registry.util.TaskQueueUtils; import java.util.List; import java.util.Optional; import java.util.stream.Stream; @@ -84,7 +84,7 @@ public class TldFanoutActionTest { action.excludes = params.containsKey("exclude") ? ImmutableSet.copyOf(Splitter.on(',').split(params.get("exclude").get(0))) : ImmutableSet.of(); - action.taskEnqueuer = new TaskEnqueuer(new Retrier(null, 1)); + action.taskQueueUtils = new TaskQueueUtils(new Retrier(null, 1)); action.response = response; action.runInEmpty = params.containsKey("runInEmpty"); action.forEachRealTld = params.containsKey("forEachRealTld"); diff --git a/javatests/google/registry/dns/ReadDnsQueueActionTest.java b/javatests/google/registry/dns/ReadDnsQueueActionTest.java index 7cbcd12db..44a6f4b0d 100644 --- a/javatests/google/registry/dns/ReadDnsQueueActionTest.java +++ b/javatests/google/registry/dns/ReadDnsQueueActionTest.java @@ -47,7 +47,7 @@ import google.registry.testing.AppEngineRule; import google.registry.testing.FakeClock; import google.registry.testing.TaskQueueHelper.TaskMatcher; import google.registry.util.Retrier; -import google.registry.util.TaskEnqueuer; +import google.registry.util.TaskQueueUtils; import java.util.ArrayList; import java.util.List; import java.util.Map.Entry; @@ -122,7 +122,7 @@ public class ReadDnsQueueActionTest { action.dnsQueue = dnsQueue; action.dnsPublishPushQueue = QueueFactory.getQueue(DNS_PUBLISH_PUSH_QUEUE_NAME); action.hashFunction = Hashing.murmur3_32(); - action.taskEnqueuer = new TaskEnqueuer(new Retrier(null, 1)); + action.taskQueueUtils = new TaskQueueUtils(new Retrier(null, 1)); action.jitterSeconds = Optional.empty(); // Advance the time a little, to ensure that leaseTasks() returns all tasks. clock.advanceBy(Duration.standardHours(1)); diff --git a/javatests/google/registry/export/BigqueryPollJobActionTest.java b/javatests/google/registry/export/BigqueryPollJobActionTest.java index 6d7cf01da..4fd643d3a 100644 --- a/javatests/google/registry/export/BigqueryPollJobActionTest.java +++ b/javatests/google/registry/export/BigqueryPollJobActionTest.java @@ -44,7 +44,7 @@ import google.registry.testing.TaskQueueHelper; import google.registry.testing.TaskQueueHelper.TaskMatcher; import google.registry.util.CapturingLogHandler; import google.registry.util.Retrier; -import google.registry.util.TaskEnqueuer; +import google.registry.util.TaskQueueUtils; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -71,8 +71,8 @@ public class BigqueryPollJobActionTest { private static final String PROJECT_ID = "project_id"; private static final String JOB_ID = "job_id"; private static final String CHAINED_QUEUE_NAME = UpdateSnapshotViewAction.QUEUE; - private static final TaskEnqueuer ENQUEUER = - new TaskEnqueuer(new Retrier(new FakeSleeper(new FakeClock()), 1)); + private static final TaskQueueUtils TASK_QUEUE_UTILS = + new TaskQueueUtils(new Retrier(new FakeSleeper(new FakeClock()), 1)); private final Bigquery bigquery = mock(Bigquery.class); private final Bigquery.Jobs bigqueryJobs = mock(Bigquery.Jobs.class); @@ -86,7 +86,7 @@ public class BigqueryPollJobActionTest { action.bigquery = bigquery; when(bigquery.jobs()).thenReturn(bigqueryJobs); when(bigqueryJobs.get(PROJECT_ID, JOB_ID)).thenReturn(bigqueryJobsGet); - action.enqueuer = ENQUEUER; + action.taskQueueUtils = TASK_QUEUE_UTILS; action.projectId = PROJECT_ID; action.jobId = JOB_ID; action.chainedQueueName = () -> CHAINED_QUEUE_NAME; @@ -103,7 +103,7 @@ public class BigqueryPollJobActionTest { @Test public void testSuccess_enqueuePollTask() throws Exception { - new BigqueryPollJobEnqueuer(ENQUEUER).enqueuePollTask( + new BigqueryPollJobEnqueuer(TASK_QUEUE_UTILS).enqueuePollTask( new JobReference().setProjectId(PROJECT_ID).setJobId(JOB_ID)); assertTasksEnqueued(BigqueryPollJobAction.QUEUE, newPollJobTaskMatcher("GET")); } @@ -115,7 +115,7 @@ public class BigqueryPollJobActionTest { .method(Method.POST) .header("X-Testing", "foo") .param("testing", "bar"); - new BigqueryPollJobEnqueuer(ENQUEUER).enqueuePollTask( + new BigqueryPollJobEnqueuer(TASK_QUEUE_UTILS).enqueuePollTask( new JobReference().setProjectId(PROJECT_ID).setJobId(JOB_ID), chainedTask, getQueue(CHAINED_QUEUE_NAME)); diff --git a/javatests/google/registry/rde/RdeStagingActionTest.java b/javatests/google/registry/rde/RdeStagingActionTest.java index b947eaf18..fe581ba94 100644 --- a/javatests/google/registry/rde/RdeStagingActionTest.java +++ b/javatests/google/registry/rde/RdeStagingActionTest.java @@ -64,7 +64,7 @@ import google.registry.testing.mapreduce.MapreduceTestCase; import google.registry.tldconfig.idn.IdnTableEnum; import google.registry.util.Retrier; import google.registry.util.SystemSleeper; -import google.registry.util.TaskEnqueuer; +import google.registry.util.TaskQueueUtils; import google.registry.xjc.XjcXmlTransformer; import google.registry.xjc.rde.XjcRdeContentType; import google.registry.xjc.rde.XjcRdeDeposit; @@ -132,7 +132,7 @@ public class RdeStagingActionTest extends MapreduceTestCase { action.mrRunner = makeDefaultRunner(); action.lenient = false; action.reducer = new RdeStagingReducer( - new TaskEnqueuer(new Retrier(new SystemSleeper(), 1)), // taskEnqueuer + new TaskQueueUtils(new Retrier(new SystemSleeper(), 1)), // taskQueueUtils new FakeLockHandler(true), 0, // gcsBufferSize "rde-bucket", // bucket diff --git a/javatests/google/registry/rde/RdeUploadActionTest.java b/javatests/google/registry/rde/RdeUploadActionTest.java index d85ac7400..ab4c84d7e 100644 --- a/javatests/google/registry/rde/RdeUploadActionTest.java +++ b/javatests/google/registry/rde/RdeUploadActionTest.java @@ -72,7 +72,7 @@ import google.registry.testing.Lazies; import google.registry.testing.TaskQueueHelper.TaskMatcher; import google.registry.testing.sftp.SftpServerRule; import google.registry.util.Retrier; -import google.registry.util.TaskEnqueuer; +import google.registry.util.TaskQueueUtils; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; @@ -208,7 +208,7 @@ public class RdeUploadActionTest { action.stagingDecryptionKey = keyring.getRdeStagingDecryptionKey(); action.reportQueue = QueueFactory.getQueue("rde-report"); action.runner = runner; - action.taskEnqueuer = new TaskEnqueuer(new Retrier(null, 1)); + action.taskQueueUtils = new TaskQueueUtils(new Retrier(null, 1)); action.retrier = new Retrier(new FakeSleeper(clock), 3); return action; } diff --git a/javatests/google/registry/tmch/NordnUploadActionTest.java b/javatests/google/registry/tmch/NordnUploadActionTest.java index 6124afdad..0a704686a 100644 --- a/javatests/google/registry/tmch/NordnUploadActionTest.java +++ b/javatests/google/registry/tmch/NordnUploadActionTest.java @@ -47,9 +47,12 @@ import google.registry.model.ofy.Ofy; import google.registry.model.registry.Registry; import google.registry.testing.AppEngineRule; import google.registry.testing.FakeClock; +import google.registry.testing.FakeSleeper; import google.registry.testing.InjectRule; import google.registry.testing.MockitoJUnitRule; import google.registry.testing.TaskQueueHelper.TaskMatcher; +import google.registry.util.Retrier; +import google.registry.util.TaskQueueUtils; import google.registry.util.UrlFetchException; import java.net.URL; import java.util.Optional; @@ -109,6 +112,7 @@ public class NordnUploadActionTest { action.fetchService = fetchService; action.lordnRequestInitializer = lordnRequestInitializer; action.phase = "claims"; + action.taskQueueUtils = new TaskQueueUtils(new Retrier(new FakeSleeper(clock), 3)); action.tld = "tld"; action.tmchMarksdbUrl = "http://127.0.0.1"; } diff --git a/javatests/google/registry/util/TaskEnqueuerTest.java b/javatests/google/registry/util/TaskQueueUtilsTest.java similarity index 70% rename from javatests/google/registry/util/TaskEnqueuerTest.java rename to javatests/google/registry/util/TaskQueueUtilsTest.java index 91bcc9cb9..68bbd7a16 100644 --- a/javatests/google/registry/util/TaskEnqueuerTest.java +++ b/javatests/google/registry/util/TaskQueueUtilsTest.java @@ -1,4 +1,4 @@ -// Copyright 2017 The Nomulus Authors. All Rights Reserved. +// Copyright 2018 The Nomulus Authors. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -17,12 +17,14 @@ package google.registry.util; import static com.google.appengine.api.taskqueue.TaskOptions.Builder.withUrl; import static com.google.common.truth.Truth.assertThat; import static google.registry.testing.JUnitBackports.assertThrows; +import static google.registry.testing.TaskQueueHelper.getQueueInfo; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.google.appengine.api.taskqueue.Queue; +import com.google.appengine.api.taskqueue.QueueFactory; import com.google.appengine.api.taskqueue.TaskHandle; import com.google.appengine.api.taskqueue.TaskOptions; import com.google.appengine.api.taskqueue.TransientFailureException; @@ -31,33 +33,39 @@ import google.registry.testing.AppEngineRule; import google.registry.testing.FakeClock; import google.registry.testing.FakeSleeper; import org.joda.time.DateTime; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -/** Unit tests for {@link TaskEnqueuer}. */ +/** Unit tests for {@link TaskQueueUtils}. */ @RunWith(JUnit4.class) -public final class TaskEnqueuerTest { +public final class TaskQueueUtilsTest { private static final int MAX_RETRIES = 3; + @Rule - public final AppEngineRule appEngine = AppEngineRule.builder() - .withDatastore() - .build(); + public final AppEngineRule appEngine = + AppEngineRule.builder().withDatastore().withTaskQueue().build(); private final FakeClock clock = new FakeClock(DateTime.parse("2000-01-01TZ")); private final FakeSleeper sleeper = new FakeSleeper(clock); - private final TaskEnqueuer taskEnqueuer = - new TaskEnqueuer(new Retrier(sleeper, MAX_RETRIES)); + private final TaskQueueUtils taskQueueUtils = + new TaskQueueUtils(new Retrier(sleeper, MAX_RETRIES)); private final Queue queue = mock(Queue.class); private final TaskOptions task = withUrl("url").taskName("name"); private final TaskHandle handle = new TaskHandle(task, "handle"); + @Before + public void before() { + TaskQueueUtils.BATCH_SIZE = 2; + } + @Test public void testEnqueue_worksOnFirstTry_doesntSleep() throws Exception { when(queue.add(ImmutableList.of(task))).thenReturn(ImmutableList.of(handle)); - assertThat(taskEnqueuer.enqueue(queue, task)).isSameAs(handle); + assertThat(taskQueueUtils.enqueue(queue, task)).isSameAs(handle); verify(queue).add(ImmutableList.of(task)); assertThat(clock.nowUtc()).isEqualTo(DateTime.parse("2000-01-01TZ")); } @@ -68,7 +76,7 @@ public final class TaskEnqueuerTest { .thenThrow(new TransientFailureException("")) .thenThrow(new TransientFailureException("")) .thenReturn(ImmutableList.of(handle)); - assertThat(taskEnqueuer.enqueue(queue, task)).isSameAs(handle); + assertThat(taskQueueUtils.enqueue(queue, task)).isSameAs(handle); verify(queue, times(3)).add(ImmutableList.of(task)); assertThat(clock.nowUtc()).isEqualTo(DateTime.parse("2000-01-01T00:00:00.6Z")); // 200 + 400ms } @@ -80,7 +88,7 @@ public final class TaskEnqueuerTest { ImmutableList handles = ImmutableList.of(new TaskHandle(taskA, "a"), new TaskHandle(taskB, "b")); when(queue.add(ImmutableList.of(taskA, taskB))).thenReturn(handles); - assertThat(taskEnqueuer.enqueue(queue, ImmutableList.of(taskA, taskB))).isSameAs(handles); + assertThat(taskQueueUtils.enqueue(queue, ImmutableList.of(taskA, taskB))).isSameAs(handles); assertThat(clock.nowUtc()).isEqualTo(DateTime.parse("2000-01-01TZ")); } @@ -92,7 +100,7 @@ public final class TaskEnqueuerTest { .thenThrow(new TransientFailureException("three")) .thenThrow(new TransientFailureException("four")); TransientFailureException thrown = - assertThrows(TransientFailureException.class, () -> taskEnqueuer.enqueue(queue, task)); + assertThrows(TransientFailureException.class, () -> taskQueueUtils.enqueue(queue, task)); assertThat(thrown).hasMessageThat().contains("three"); } @@ -101,9 +109,27 @@ public final class TaskEnqueuerTest { when(queue.add(ImmutableList.of(task))).thenThrow(new TransientFailureException("")); try { Thread.currentThread().interrupt(); - assertThrows(TransientFailureException.class, () -> taskEnqueuer.enqueue(queue, task)); + assertThrows(TransientFailureException.class, () -> taskQueueUtils.enqueue(queue, task)); } finally { Thread.interrupted(); // Clear interrupt state so it doesn't pwn other tests. } } + + @Test + public void testDeleteTasks_usesMultipleBatches() { + Queue defaultQ = QueueFactory.getQueue("default"); + TaskOptions taskOptA = withUrl("/a").taskName("a"); + TaskOptions taskOptB = withUrl("/b").taskName("b"); + TaskOptions taskOptC = withUrl("/c").taskName("c"); + taskQueueUtils.enqueue(defaultQ, ImmutableList.of(taskOptA, taskOptB, taskOptC)); + assertThat(getQueueInfo("default").getTaskInfo()).hasSize(3); + + taskQueueUtils.deleteTasks( + defaultQ, + ImmutableList.of( + new TaskHandle(taskOptA, "default"), + new TaskHandle(taskOptB, "default"), + new TaskHandle(taskOptC, "default"))); + assertThat(getQueueInfo("default").getTaskInfo()).hasSize(0); + } }