Clean up and consolidate some unused queues

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=136170276
This commit is contained in:
mcilwain 2016-10-14 10:24:16 -07:00 committed by Ben McIlwain
parent e6ba5687b1
commit 6636e02d57
10 changed files with 23 additions and 41 deletions

View file

@ -138,9 +138,6 @@ explicitly marked as otherwise.
* `export-commits` -- Queue for tasks to export commit log checkpoints. Tasks * `export-commits` -- Queue for tasks to export commit log checkpoints. Tasks
are enqueued by `CommitLogCheckpointAction` (which is run every minute by are enqueued by `CommitLogCheckpointAction` (which is run every minute by
cron) and executed by `ExportCommitLogDiffAction`. cron) and executed by `ExportCommitLogDiffAction`.
* `export-reserved-terms` -- Cron queue for tasks to export the list of
reserved terms for each TLD. The tasks are executed by
`ExportReservedTermsAction`.
* `export-snapshot` -- Cron and push queue for tasks to load a Datastore * `export-snapshot` -- Cron and push queue for tasks to load a Datastore
snapshot that was stored in Google Cloud Storage and export it to BigQuery. snapshot that was stored in Google Cloud Storage and export it to BigQuery.
Tasks are enqueued by both cron and `CheckSnapshotServlet` and are executed Tasks are enqueued by both cron and `CheckSnapshotServlet` and are executed
@ -179,6 +176,8 @@ explicitly marked as otherwise.
* `rde-upload` -- Cron queue for tasks to upload already-generated RDE files * `rde-upload` -- Cron queue for tasks to upload already-generated RDE files
from Cloud Storage to the escrow provider. Tasks are executed by from Cloud Storage to the escrow provider. Tasks are executed by
`RdeUploadAction`. `RdeUploadAction`.
* `retryable-cron-tasks` -- Catch-all cron queue for various cron tasks that
run infrequently, such as exporting reserved terms.
* `sheet` -- Queue for tasks to sync registrar updates to a Google Sheets * `sheet` -- Queue for tasks to sync registrar updates to a Google Sheets
spreadsheet. Tasks are enqueued by `RegistrarServlet` when changes are made spreadsheet. Tasks are enqueued by `RegistrarServlet` when changes are made
to registrar fields and are executed by `SyncRegistrarsSheetAction`. to registrar fields and are executed by `SyncRegistrarsSheetAction`.

View file

@ -96,7 +96,7 @@
<!-- TODO: Add borgmon job to check that these files are created and updated successfully. --> <!-- TODO: Add borgmon job to check that these files are created and updated successfully. -->
<cron> <cron>
<url><![CDATA[/_dr/cron/fanout?queue=export-reserved-terms&endpoint=/_dr/task/exportReservedTerms&forEachRealTld]]></url> <url><![CDATA[/_dr/cron/fanout?queue=retryable-cron-tasks&endpoint=/_dr/task/exportReservedTerms&forEachRealTld]]></url>
<description> <description>
Reserved terms export to Google Drive job for creating once-daily exports. Reserved terms export to Google Drive job for creating once-daily exports.
</description> </description>

View file

@ -1,12 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<queue-entries> <queue-entries>
<queue>
<name>default</name>
<rate>1/s</rate>
<bucket-size>5</bucket-size>
</queue>
<queue> <queue>
<name>dns-pull</name> <name>dns-pull</name>
<mode>pull</mode> <mode>pull</mode>
@ -51,16 +45,6 @@
</retry-parameters> </retry-parameters>
</queue> </queue>
<!-- Queue for jobs to export reserved terms to Google Drive for a TLD. -->
<queue>
<name>export-reserved-terms</name>
<rate>1/s</rate>
<bucket-size>100</bucket-size>
<retry-parameters>
<task-retry-limit>3</task-retry-limit>
</retry-parameters>
</queue>
<!-- Queue for polling export BigQuery jobs for completion. --> <!-- Queue for polling export BigQuery jobs for completion. -->
<queue> <queue>
<name>export-bigquery-poll</name> <name>export-bigquery-poll</name>
@ -213,12 +197,13 @@
<queue> <queue>
<name>retryable-cron-tasks</name> <name>retryable-cron-tasks</name>
<rate>1/s</rate> <rate>1/s</rate>
<bucket-size>100</bucket-size>
<retry-parameters> <retry-parameters>
<task-retry-limit>3</task-retry-limit> <task-retry-limit>3</task-retry-limit>
</retry-parameters> </retry-parameters>
</queue> </queue>
<!-- The load[0-9] queues are used for load-testing, and can be safely deleted
in any environment that doesn't require load-testing. -->
<queue> <queue>
<name>load0</name> <name>load0</name>
<rate>500/s</rate> <rate>500/s</rate>
@ -278,5 +263,4 @@
<rate>500/s</rate> <rate>500/s</rate>
<bucket-size>500</bucket-size> <bucket-size>500</bucket-size>
</queue> </queue>
</queue-entries> </queue-entries>

View file

@ -85,7 +85,7 @@
<!-- TODO: Add borgmon job to check that these files are created and updated successfully. --> <!-- TODO: Add borgmon job to check that these files are created and updated successfully. -->
<cron> <cron>
<url><![CDATA[/_dr/cron/fanout?queue=export-reserved-terms&endpoint=/_dr/task/exportReservedTerms&forEachRealTld]]></url> <url><![CDATA[/_dr/cron/fanout?queue=retryable-cron-tasks&endpoint=/_dr/task/exportReservedTerms&forEachRealTld]]></url>
<description> <description>
Reserved terms export to Google Drive job for creating once-daily exports. Reserved terms export to Google Drive job for creating once-daily exports.
</description> </description>

View file

@ -202,7 +202,7 @@
</cron> </cron>
<cron> <cron>
<url><![CDATA[/_dr/cron/fanout?queue=export-reserved-terms&endpoint=/_dr/task/exportReservedTerms&forEachRealTld]]></url> <url><![CDATA[/_dr/cron/fanout?queue=retryable-cron-tasks&endpoint=/_dr/task/exportReservedTerms&forEachRealTld]]></url>
<description> <description>
Reserved terms export to Google Drive job for creating once-daily exports. Reserved terms export to Google Drive job for creating once-daily exports.
</description> </description>

View file

@ -75,7 +75,7 @@
</cron> </cron>
<cron> <cron>
<url><![CDATA[/_dr/cron/fanout?queue=export-reserved-terms&endpoint=/_dr/task/exportReservedTerms&forEachRealTld]]></url> <url><![CDATA[/_dr/cron/fanout?queue=retryable-cron-tasks&endpoint=/_dr/task/exportReservedTerms&forEachRealTld]]></url>
<description> <description>
Reserved terms export to Google Drive job for creating once-daily exports. Reserved terms export to Google Drive job for creating once-daily exports.
</description> </description>

View file

@ -14,10 +14,12 @@
package google.registry.export; package google.registry.export;
import static com.google.appengine.api.taskqueue.QueueFactory.getQueue;
import static com.google.common.collect.Iterables.getOnlyElement; import static com.google.common.collect.Iterables.getOnlyElement;
import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.Truth.assert_; import static com.google.common.truth.Truth.assert_;
import static google.registry.testing.TaskQueueHelper.assertTasksEnqueued; import static google.registry.testing.TaskQueueHelper.assertTasksEnqueued;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.logging.Level.INFO; import static java.util.logging.Level.INFO;
import static java.util.logging.Level.SEVERE; import static java.util.logging.Level.SEVERE;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -27,7 +29,6 @@ import com.google.api.services.bigquery.model.ErrorProto;
import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobReference; import com.google.api.services.bigquery.model.JobReference;
import com.google.api.services.bigquery.model.JobStatus; import com.google.api.services.bigquery.model.JobStatus;
import com.google.appengine.api.taskqueue.QueueFactory;
import com.google.appengine.api.taskqueue.TaskOptions; import com.google.appengine.api.taskqueue.TaskOptions;
import com.google.appengine.api.taskqueue.TaskOptions.Method; import com.google.appengine.api.taskqueue.TaskOptions.Method;
import com.google.appengine.api.taskqueue.dev.QueueStateInfo.TaskStateInfo; import com.google.appengine.api.taskqueue.dev.QueueStateInfo.TaskStateInfo;
@ -78,7 +79,7 @@ public class BigqueryPollJobActionTest {
static final String PROJECT_ID = "project_id"; static final String PROJECT_ID = "project_id";
static final String JOB_ID = "job_id"; static final String JOB_ID = "job_id";
static final String CHAINED_QUEUE_NAME = "default"; static final String CHAINED_QUEUE_NAME = UpdateSnapshotViewAction.QUEUE;
static final TaskEnqueuer ENQUEUER = static final TaskEnqueuer ENQUEUER =
new TaskEnqueuer(new Retrier(new FakeSleeper(new FakeClock()), 1)); new TaskEnqueuer(new Retrier(new FakeSleeper(new FakeClock()), 1));
@ -127,7 +128,7 @@ public class BigqueryPollJobActionTest {
new BigqueryPollJobEnqueuer(ENQUEUER).enqueuePollTask( new BigqueryPollJobEnqueuer(ENQUEUER).enqueuePollTask(
new JobReference().setProjectId(PROJECT_ID).setJobId(JOB_ID), new JobReference().setProjectId(PROJECT_ID).setJobId(JOB_ID),
chainedTask, chainedTask,
QueueFactory.getQueue(CHAINED_QUEUE_NAME)); getQueue(CHAINED_QUEUE_NAME));
assertTasksEnqueued(BigqueryPollJobAction.QUEUE, newPollJobTaskMatcher("POST")); assertTasksEnqueued(BigqueryPollJobAction.QUEUE, newPollJobTaskMatcher("POST"));
TaskStateInfo taskInfo = getOnlyElement( TaskStateInfo taskInfo = getOnlyElement(
TaskQueueHelper.getQueueInfo(BigqueryPollJobAction.QUEUE).getTaskInfo()); TaskQueueHelper.getQueueInfo(BigqueryPollJobAction.QUEUE).getTaskInfo());
@ -174,7 +175,7 @@ public class BigqueryPollJobActionTest {
String.format("Bigquery job succeeded - %s:%s", PROJECT_ID, JOB_ID)); String.format("Bigquery job succeeded - %s:%s", PROJECT_ID, JOB_ID));
assertLogMessage( assertLogMessage(
INFO, INFO,
"Added chained task my_task_name for /_dr/something to queue default"); "Added chained task my_task_name for /_dr/something to queue " + CHAINED_QUEUE_NAME);
assertTasksEnqueued(CHAINED_QUEUE_NAME, new TaskMatcher() assertTasksEnqueued(CHAINED_QUEUE_NAME, new TaskMatcher()
.url("/_dr/something") .url("/_dr/something")
.method("POST") .method("POST")
@ -213,7 +214,7 @@ public class BigqueryPollJobActionTest {
public void testFailure_badChainedTaskPayload() throws Exception { public void testFailure_badChainedTaskPayload() throws Exception {
when(bigqueryJobsGet.execute()).thenReturn( when(bigqueryJobsGet.execute()).thenReturn(
new Job().setStatus(new JobStatus().setState("DONE"))); new Job().setStatus(new JobStatus().setState("DONE")));
action.payload = "payload".getBytes(); action.payload = "payload".getBytes(UTF_8);
thrown.expect(BadRequestException.class, "Cannot deserialize task from payload"); thrown.expect(BadRequestException.class, "Cannot deserialize task from payload");
action.run(); action.run();
} }

View file

@ -51,6 +51,7 @@ public class DatastoreBackupServiceTest {
@Rule @Rule
public final AppEngineRule appEngine = AppEngineRule.builder() public final AppEngineRule appEngine = AppEngineRule.builder()
.withDatastore() .withDatastore()
.withTaskQueue()
.build(); .build();
@Mock @Mock
@ -84,8 +85,8 @@ public class DatastoreBackupServiceTest {
@Test @Test
public void testSuccess_launchBackup() throws Exception { public void testSuccess_launchBackup() throws Exception {
backupService.launchNewBackup( backupService.launchNewBackup(
"default", "backup1", "somebucket", ImmutableSet.of("foo", "bar")); "export-snapshot", "backup1", "somebucket", ImmutableSet.of("foo", "bar"));
assertTasksEnqueued("default", assertTasksEnqueued("export-snapshot",
new TaskMatcher() new TaskMatcher()
.url("/_ah/datastore_admin/backup.create") .url("/_ah/datastore_admin/backup.create")
.header("Host", "ah-builtin-python-bundle.default.localhost") .header("Host", "ah-builtin-python-bundle.default.localhost")
@ -93,7 +94,7 @@ public class DatastoreBackupServiceTest {
.param("name", "backup1_") .param("name", "backup1_")
.param("filesystem", "gs") .param("filesystem", "gs")
.param("gs_bucket_name", "somebucket") .param("gs_bucket_name", "somebucket")
.param("queue", "default") .param("queue", "export-snapshot")
.param("kind", "foo") .param("kind", "foo")
.param("kind", "bar")); .param("kind", "bar"));
} }

View file

@ -14,10 +14,13 @@
package google.registry.export; package google.registry.export;
import static com.google.appengine.api.taskqueue.QueueFactory.getQueue;
import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.Truth.assertThat;
import static google.registry.export.UpdateSnapshotViewAction.QUEUE;
import static google.registry.export.UpdateSnapshotViewAction.UPDATE_SNAPSHOT_DATASET_ID_PARAM; import static google.registry.export.UpdateSnapshotViewAction.UPDATE_SNAPSHOT_DATASET_ID_PARAM;
import static google.registry.export.UpdateSnapshotViewAction.UPDATE_SNAPSHOT_KIND_PARAM; import static google.registry.export.UpdateSnapshotViewAction.UPDATE_SNAPSHOT_KIND_PARAM;
import static google.registry.export.UpdateSnapshotViewAction.UPDATE_SNAPSHOT_TABLE_ID_PARAM; import static google.registry.export.UpdateSnapshotViewAction.UPDATE_SNAPSHOT_TABLE_ID_PARAM;
import static google.registry.export.UpdateSnapshotViewAction.createViewUpdateTask;
import static google.registry.testing.TaskQueueHelper.assertTasksEnqueued; import static google.registry.testing.TaskQueueHelper.assertTasksEnqueued;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyString;
@ -28,7 +31,6 @@ import static org.mockito.Mockito.when;
import com.google.api.services.bigquery.Bigquery; import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.model.Dataset; import com.google.api.services.bigquery.model.Dataset;
import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.Table;
import com.google.appengine.api.taskqueue.QueueFactory;
import google.registry.bigquery.BigqueryFactory; import google.registry.bigquery.BigqueryFactory;
import google.registry.request.HttpException.InternalServerErrorException; import google.registry.request.HttpException.InternalServerErrorException;
import google.registry.testing.AppEngineRule; import google.registry.testing.AppEngineRule;
@ -95,9 +97,8 @@ public class UpdateSnapshotViewActionTest {
@Test @Test
public void testSuccess_createViewUpdateTask() throws Exception { public void testSuccess_createViewUpdateTask() throws Exception {
QueueFactory.getDefaultQueue().add( getQueue(QUEUE).add(createViewUpdateTask("some_dataset", "12345_fookind", "fookind"));
UpdateSnapshotViewAction.createViewUpdateTask("some_dataset", "12345_fookind", "fookind")); assertTasksEnqueued(QUEUE,
assertTasksEnqueued("default",
new TaskMatcher() new TaskMatcher()
.url(UpdateSnapshotViewAction.PATH) .url(UpdateSnapshotViewAction.PATH)
.method("POST") .method("POST")

View file

@ -158,10 +158,6 @@ public abstract class MapreduceTestCase<T> extends ShardableTestCase {
} }
} }
protected void executeTasksUntilEmpty() throws Exception {
executeTasksUntilEmpty("default");
}
protected void executeTasksUntilEmpty(String queueName) throws Exception { protected void executeTasksUntilEmpty(String queueName) throws Exception {
executeTasksUntilEmpty(queueName, null); executeTasksUntilEmpty(queueName, null);
} }