diff --git a/core/src/main/java/google/registry/export/BackupDatastoreAction.java b/core/src/main/java/google/registry/export/BackupDatastoreAction.java index 93e8e4ab5..8d5f52d5f 100644 --- a/core/src/main/java/google/registry/export/BackupDatastoreAction.java +++ b/core/src/main/java/google/registry/export/BackupDatastoreAction.java @@ -14,18 +14,22 @@ package google.registry.export; -import static google.registry.export.CheckBackupAction.enqueuePollTask; import static google.registry.request.Action.Method.POST; +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableMultimap; import com.google.common.flogger.FluentLogger; import google.registry.config.RegistryConfig; import google.registry.export.datastore.DatastoreAdmin; import google.registry.export.datastore.Operation; import google.registry.model.annotations.DeleteAfterMigration; import google.registry.request.Action; +import google.registry.request.Action.Service; import google.registry.request.HttpException.InternalServerErrorException; import google.registry.request.Response; import google.registry.request.auth.Auth; +import google.registry.util.Clock; +import google.registry.util.CloudTasksUtils; import javax.inject.Inject; /** @@ -59,6 +63,8 @@ public class BackupDatastoreAction implements Runnable { @Inject DatastoreAdmin datastoreAdmin; @Inject Response response; + @Inject Clock clock; + @Inject CloudTasksUtils cloudTasksUtils; @Inject BackupDatastoreAction() {} @@ -73,8 +79,19 @@ public class BackupDatastoreAction implements Runnable { .execute(); String backupName = backup.getName(); - // Enqueue a poll task to monitor the backup and load REPORTING-related kinds into bigquery. - enqueuePollTask(backupName, AnnotatedEntities.getReportingKinds()); + // Enqueue a poll task to monitor the backup for completion and load reporting-related kinds + // into bigquery. + cloudTasksUtils.enqueue( + CheckBackupAction.QUEUE, + cloudTasksUtils.createPostTaskWithDelay( + CheckBackupAction.PATH, + Service.BACKEND.toString(), + ImmutableMultimap.of( + CheckBackupAction.CHECK_BACKUP_NAME_PARAM, + backupName, + CheckBackupAction.CHECK_BACKUP_KINDS_TO_LOAD_PARAM, + Joiner.on(',').join(AnnotatedEntities.getReportingKinds())), + CheckBackupAction.POLL_COUNTDOWN)); String message = String.format( "Datastore backup started with name: %s\nSaving to %s", diff --git a/core/src/main/java/google/registry/export/BigqueryPollJobAction.java b/core/src/main/java/google/registry/export/BigqueryPollJobAction.java index 21b789110..e97fc43c7 100644 --- a/core/src/main/java/google/registry/export/BigqueryPollJobAction.java +++ b/core/src/main/java/google/registry/export/BigqueryPollJobAction.java @@ -14,18 +14,14 @@ package google.registry.export; -import static com.google.appengine.api.taskqueue.QueueFactory.getQueue; -import static com.google.appengine.api.taskqueue.TaskOptions.Builder.withUrl; import static google.registry.bigquery.BigqueryUtils.toJobReferenceString; import com.google.api.services.bigquery.Bigquery; import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobReference; -import com.google.appengine.api.taskqueue.Queue; -import com.google.appengine.api.taskqueue.TaskHandle; -import com.google.appengine.api.taskqueue.TaskOptions; -import com.google.appengine.api.taskqueue.TaskOptions.Method; +import com.google.cloud.tasks.v2.Task; import com.google.common.flogger.FluentLogger; +import com.google.protobuf.ByteString; import dagger.Lazy; import google.registry.request.Action; import google.registry.request.Header; @@ -33,12 +29,10 @@ import google.registry.request.HttpException.BadRequestException; import google.registry.request.HttpException.NotModifiedException; import google.registry.request.Payload; import google.registry.request.auth.Auth; -import google.registry.util.TaskQueueUtils; +import google.registry.util.CloudTasksUtils; import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; import javax.inject.Inject; import org.joda.time.Duration; @@ -67,11 +61,14 @@ public class BigqueryPollJobAction implements Runnable { static final Duration POLL_COUNTDOWN = Duration.standardSeconds(20); @Inject Bigquery bigquery; - @Inject TaskQueueUtils taskQueueUtils; + @Inject CloudTasksUtils cloudTasksUtils; + @Inject @Header(CHAINED_TASK_QUEUE_HEADER) Lazy chainedQueueName; @Inject @Header(PROJECT_ID_HEADER) String projectId; @Inject @Header(JOB_ID_HEADER) String jobId; - @Inject @Payload byte[] payload; + + @Inject @Payload ByteString payload; + @Inject BigqueryPollJobAction() {} @Override @@ -79,20 +76,25 @@ public class BigqueryPollJobAction implements Runnable { boolean jobOutcome = checkJobOutcome(); // Throws a NotModifiedException if the job hasn't completed. // If the job failed, do not enqueue the next step. - if (!jobOutcome || payload == null || payload.length == 0) { + if (!jobOutcome || payload == null || payload.size() == 0) { return; } // If there is a payload, it's a chained task, so enqueue it. - TaskOptions task; + Task task; try { - task = (TaskOptions) new ObjectInputStream(new ByteArrayInputStream(payload)).readObject(); + task = + (Task) + new ObjectInputStream(new ByteArrayInputStream(payload.toByteArray())).readObject(); } catch (ClassNotFoundException | IOException e) { throw new BadRequestException("Cannot deserialize task from payload", e); } - String taskName = taskQueueUtils.enqueue(getQueue(chainedQueueName.get()), task).getName(); + Task enqueuedTask = cloudTasksUtils.enqueue(chainedQueueName.get(), task); logger.atInfo().log( "Added chained task %s for %s to queue %s: %s", - taskName, task.getUrl(), chainedQueueName.get(), task); + enqueuedTask.getName(), + enqueuedTask.getAppEngineHttpRequest().getRelativeUri(), + chainedQueueName.get(), + enqueuedTask); } /** @@ -124,51 +126,4 @@ public class BigqueryPollJobAction implements Runnable { logger.atInfo().log("Bigquery job succeeded - %s.", jobRefString); return true; } - - - /** Helper class to enqueue a bigquery poll job. */ - public static class BigqueryPollJobEnqueuer { - - private final TaskQueueUtils taskQueueUtils; - - @Inject - BigqueryPollJobEnqueuer(TaskQueueUtils taskQueueUtils) { - this.taskQueueUtils = taskQueueUtils; - } - - /** Enqueue a task to poll for the success or failure of the referenced BigQuery job. */ - public TaskHandle enqueuePollTask(JobReference jobRef) { - return taskQueueUtils.enqueue( - getQueue(QUEUE), createCommonPollTask(jobRef).method(Method.GET)); - } - - /** - * Enqueue a task to poll for the success or failure of the referenced BigQuery job and to - * launch the provided task in the specified queue if the job succeeds. - */ - public TaskHandle enqueuePollTask( - JobReference jobRef, TaskOptions chainedTask, Queue chainedTaskQueue) throws IOException { - // Serialize the chainedTask into a byte array to put in the task payload. - ByteArrayOutputStream taskBytes = new ByteArrayOutputStream(); - new ObjectOutputStream(taskBytes).writeObject(chainedTask); - return taskQueueUtils.enqueue( - getQueue(QUEUE), - createCommonPollTask(jobRef) - .method(Method.POST) - .header(CHAINED_TASK_QUEUE_HEADER, chainedTaskQueue.getQueueName()) - .payload(taskBytes.toByteArray())); - } - - /** - * Enqueue a task to poll for the success or failure of the referenced BigQuery job and to - * launch the provided task in the specified queue if the job succeeds. - */ - private static TaskOptions createCommonPollTask(JobReference jobRef) { - // Omit host header so that task will be run on the current backend/module. - return withUrl(PATH) - .countdownMillis(POLL_COUNTDOWN.getMillis()) - .header(PROJECT_ID_HEADER, jobRef.getProjectId()) - .header(JOB_ID_HEADER, jobRef.getJobId()); - } - } } diff --git a/core/src/main/java/google/registry/export/CheckBackupAction.java b/core/src/main/java/google/registry/export/CheckBackupAction.java index 73bced19f..4b94873a0 100644 --- a/core/src/main/java/google/registry/export/CheckBackupAction.java +++ b/core/src/main/java/google/registry/export/CheckBackupAction.java @@ -16,18 +16,14 @@ package google.registry.export; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.Sets.intersection; -import static google.registry.export.UploadDatastoreBackupAction.enqueueUploadBackupTask; import static google.registry.request.Action.Method.GET; import static google.registry.request.Action.Method.POST; import static javax.servlet.http.HttpServletResponse.SC_NOT_FOUND; import com.google.api.client.googleapis.json.GoogleJsonResponseException; -import com.google.appengine.api.taskqueue.QueueFactory; -import com.google.appengine.api.taskqueue.TaskHandle; -import com.google.appengine.api.taskqueue.TaskOptions; -import com.google.appengine.api.taskqueue.TaskOptions.Method; import com.google.common.base.Joiner; import com.google.common.base.Splitter; +import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import com.google.common.flogger.FluentLogger; @@ -35,6 +31,7 @@ import google.registry.export.datastore.DatastoreAdmin; import google.registry.export.datastore.Operation; import google.registry.model.annotations.DeleteAfterMigration; import google.registry.request.Action; +import google.registry.request.Action.Service; import google.registry.request.HttpException; import google.registry.request.HttpException.BadRequestException; import google.registry.request.HttpException.InternalServerErrorException; @@ -45,6 +42,7 @@ import google.registry.request.RequestMethod; import google.registry.request.Response; import google.registry.request.auth.Auth; import google.registry.util.Clock; +import google.registry.util.CloudTasksUtils; import java.io.IOException; import java.util.Set; import javax.inject.Inject; @@ -83,6 +81,7 @@ public class CheckBackupAction implements Runnable { @Inject DatastoreAdmin datastoreAdmin; @Inject Clock clock; @Inject Response response; + @Inject CloudTasksUtils cloudTasksUtils; @Inject @RequestMethod Action.Method requestMethod; @Inject @@ -175,21 +174,22 @@ public class CheckBackupAction implements Runnable { if (exportedKindsToLoad.isEmpty()) { message += "no kinds to load into BigQuery."; } else { - enqueueUploadBackupTask(backupId, backup.getExportFolderUrl(), exportedKindsToLoad); + /** Enqueue a task for starting a backup load. */ + cloudTasksUtils.enqueue( + UploadDatastoreBackupAction.QUEUE, + cloudTasksUtils.createPostTask( + UploadDatastoreBackupAction.PATH, + Service.BACKEND.toString(), + ImmutableMultimap.of( + UploadDatastoreBackupAction.UPLOAD_BACKUP_ID_PARAM, + backupId, + UploadDatastoreBackupAction.UPLOAD_BACKUP_FOLDER_PARAM, + backup.getExportFolderUrl(), + UploadDatastoreBackupAction.UPLOAD_BACKUP_KINDS_PARAM, + Joiner.on(',').join(exportedKindsToLoad)))); message += "BigQuery load task enqueued."; } logger.atInfo().log(message); response.setPayload(message); } - - /** Enqueue a poll task to monitor the named backup for completion. */ - static TaskHandle enqueuePollTask(String backupId, ImmutableSet kindsToLoad) { - return QueueFactory.getQueue(QUEUE) - .add( - TaskOptions.Builder.withUrl(PATH) - .method(Method.POST) - .countdownMillis(POLL_COUNTDOWN.getMillis()) - .param(CHECK_BACKUP_NAME_PARAM, backupId) - .param(CHECK_BACKUP_KINDS_TO_LOAD_PARAM, Joiner.on(',').join(kindsToLoad))); - } } diff --git a/core/src/main/java/google/registry/export/UpdateSnapshotViewAction.java b/core/src/main/java/google/registry/export/UpdateSnapshotViewAction.java index b3296c7f8..333d52764 100644 --- a/core/src/main/java/google/registry/export/UpdateSnapshotViewAction.java +++ b/core/src/main/java/google/registry/export/UpdateSnapshotViewAction.java @@ -21,8 +21,6 @@ import com.google.api.services.bigquery.Bigquery; import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.ViewDefinition; -import com.google.appengine.api.taskqueue.TaskOptions; -import com.google.appengine.api.taskqueue.TaskOptions.Method; import com.google.common.flogger.FluentLogger; import google.registry.bigquery.CheckedBigquery; import google.registry.config.RegistryConfig.Config; @@ -85,17 +83,6 @@ public class UpdateSnapshotViewAction implements Runnable { @Inject UpdateSnapshotViewAction() {} - /** Create a task for updating a snapshot view. */ - static TaskOptions createViewUpdateTask( - String datasetId, String tableId, String kindName, String viewName) { - return TaskOptions.Builder.withUrl(PATH) - .method(Method.POST) - .param(UPDATE_SNAPSHOT_DATASET_ID_PARAM, datasetId) - .param(UPDATE_SNAPSHOT_TABLE_ID_PARAM, tableId) - .param(UPDATE_SNAPSHOT_KIND_PARAM, kindName) - .param(UPDATE_SNAPSHOT_VIEWNAME_PARAM, viewName); - } - @Override public void run() { try { diff --git a/core/src/main/java/google/registry/export/UploadDatastoreBackupAction.java b/core/src/main/java/google/registry/export/UploadDatastoreBackupAction.java index d57cb5528..c42ef3756 100644 --- a/core/src/main/java/google/registry/export/UploadDatastoreBackupAction.java +++ b/core/src/main/java/google/registry/export/UploadDatastoreBackupAction.java @@ -14,9 +14,7 @@ package google.registry.export; -import static com.google.appengine.api.taskqueue.QueueFactory.getQueue; import static com.google.common.base.MoreObjects.firstNonNull; -import static google.registry.export.UpdateSnapshotViewAction.createViewUpdateTask; import static google.registry.request.Action.Method.POST; import com.google.api.services.bigquery.Bigquery; @@ -25,27 +23,33 @@ import com.google.api.services.bigquery.model.JobConfiguration; import com.google.api.services.bigquery.model.JobConfigurationLoad; import com.google.api.services.bigquery.model.JobReference; import com.google.api.services.bigquery.model.TableReference; -import com.google.appengine.api.taskqueue.TaskHandle; -import com.google.appengine.api.taskqueue.TaskOptions; -import com.google.appengine.api.taskqueue.TaskOptions.Method; +import com.google.cloud.tasks.v2.Task; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ImmutableMultimap; import com.google.common.flogger.FluentLogger; +import com.google.common.net.HttpHeaders; +import com.google.common.net.MediaType; +import com.google.protobuf.ByteString; +import com.google.protobuf.util.Timestamps; import google.registry.bigquery.BigqueryUtils.SourceFormat; import google.registry.bigquery.BigqueryUtils.WriteDisposition; import google.registry.bigquery.CheckedBigquery; import google.registry.config.RegistryConfig.Config; -import google.registry.export.BigqueryPollJobAction.BigqueryPollJobEnqueuer; import google.registry.model.annotations.DeleteAfterMigration; import google.registry.request.Action; +import google.registry.request.Action.Service; import google.registry.request.HttpException.BadRequestException; import google.registry.request.HttpException.InternalServerErrorException; import google.registry.request.Parameter; import google.registry.request.auth.Auth; +import google.registry.util.Clock; +import google.registry.util.CloudTasksUtils; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.ObjectOutputStream; import javax.inject.Inject; /** Action to load a Datastore backup from Google Cloud Storage into BigQuery. */ @@ -75,7 +79,9 @@ public class UploadDatastoreBackupAction implements Runnable { private static final FluentLogger logger = FluentLogger.forEnclosingClass(); @Inject CheckedBigquery checkedBigquery; - @Inject BigqueryPollJobEnqueuer bigqueryPollEnqueuer; + @Inject CloudTasksUtils cloudTasksUtils; + @Inject Clock clock; + @Inject @Config("projectId") String projectId; @Inject @@ -93,18 +99,6 @@ public class UploadDatastoreBackupAction implements Runnable { @Inject UploadDatastoreBackupAction() {} - /** Enqueue a task for starting a backup load. */ - public static TaskHandle enqueueUploadBackupTask( - String backupId, String gcsFile, ImmutableSet kinds) { - return getQueue(QUEUE) - .add( - TaskOptions.Builder.withUrl(PATH) - .method(Method.POST) - .param(UPLOAD_BACKUP_ID_PARAM, backupId) - .param(UPLOAD_BACKUP_FOLDER_PARAM, gcsFile) - .param(UPLOAD_BACKUP_KINDS_PARAM, Joiner.on(',').join(kinds))); - } - @Override public void run() { try { @@ -142,12 +136,46 @@ public class UploadDatastoreBackupAction implements Runnable { Job job = makeLoadJob(jobRef, sourceUri, tableId); bigquery.jobs().insert(projectId, job).execute(); - // Enqueue a task to check on the load job's completion, and if it succeeds, to update a - // well-known view in BigQuery to point at the newly loaded backup table for this kind. - bigqueryPollEnqueuer.enqueuePollTask( - jobRef, - createViewUpdateTask(BACKUP_DATASET, tableId, kindName, LATEST_BACKUP_VIEW_NAME), - getQueue(UpdateSnapshotViewAction.QUEUE)); + // Serialize the chainedTask into a byte array to put in the task payload. + ByteArrayOutputStream taskBytes = new ByteArrayOutputStream(); + new ObjectOutputStream(taskBytes) + .writeObject( + cloudTasksUtils.createPostTask( + UpdateSnapshotViewAction.PATH, + Service.BACKEND.toString(), + ImmutableMultimap.of( + UpdateSnapshotViewAction.UPDATE_SNAPSHOT_DATASET_ID_PARAM, + BACKUP_DATASET, + UpdateSnapshotViewAction.UPDATE_SNAPSHOT_TABLE_ID_PARAM, + tableId, + UpdateSnapshotViewAction.UPDATE_SNAPSHOT_KIND_PARAM, + kindName, + UpdateSnapshotViewAction.UPDATE_SNAPSHOT_VIEWNAME_PARAM, + LATEST_BACKUP_VIEW_NAME))); + + // Enqueues a task to poll for the success or failure of the referenced BigQuery job and to + // launch the provided task in the specified queue if the job succeeds. + cloudTasksUtils.enqueue( + BigqueryPollJobAction.QUEUE, + Task.newBuilder() + .setAppEngineHttpRequest( + cloudTasksUtils + .createPostTask(BigqueryPollJobAction.PATH, Service.BACKEND.toString(), null) + .getAppEngineHttpRequest() + .toBuilder() + .putHeaders(BigqueryPollJobAction.PROJECT_ID_HEADER, jobRef.getProjectId()) + .putHeaders(BigqueryPollJobAction.JOB_ID_HEADER, jobRef.getJobId()) + .putHeaders( + BigqueryPollJobAction.CHAINED_TASK_QUEUE_HEADER, + UpdateSnapshotViewAction.QUEUE) + // need to include CONTENT_TYPE in header when body is not empty + .putHeaders(HttpHeaders.CONTENT_TYPE, MediaType.FORM_DATA.toString()) + .setBody(ByteString.copyFrom(taskBytes.toByteArray())) + .build()) + .setScheduleTime( + Timestamps.fromMillis( + clock.nowUtc().plus(BigqueryPollJobAction.POLL_COUNTDOWN).getMillis())) + .build()); builder.append(String.format(" - %s:%s\n", projectId, jobId)); logger.atInfo().log("Submitted load job %s:%s.", projectId, jobId); diff --git a/core/src/main/java/google/registry/request/RequestModule.java b/core/src/main/java/google/registry/request/RequestModule.java index 8d2efaad7..1209b6c1c 100644 --- a/core/src/main/java/google/registry/request/RequestModule.java +++ b/core/src/main/java/google/registry/request/RequestModule.java @@ -30,6 +30,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.io.ByteStreams; import com.google.common.io.CharStreams; import com.google.common.net.MediaType; +import com.google.protobuf.ByteString; import dagger.Module; import dagger.Provides; import google.registry.model.common.DatabaseMigrationStateSchedule.PrimaryDatabase; @@ -184,6 +185,16 @@ public final class RequestModule { } } + @Provides + @Payload + static ByteString providePayloadAsByteString(HttpServletRequest req) { + try { + return ByteString.copyFrom(ByteStreams.toByteArray(req.getInputStream())); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + @Provides static LockHandler provideLockHandler(LockHandlerImpl lockHandler) { return lockHandler; diff --git a/core/src/test/java/google/registry/export/BackupDatastoreActionTest.java b/core/src/test/java/google/registry/export/BackupDatastoreActionTest.java index 739b0b647..cc4ffce08 100644 --- a/core/src/test/java/google/registry/export/BackupDatastoreActionTest.java +++ b/core/src/test/java/google/registry/export/BackupDatastoreActionTest.java @@ -17,16 +17,19 @@ package google.registry.export; import static com.google.common.truth.Truth.assertThat; import static google.registry.export.CheckBackupAction.CHECK_BACKUP_KINDS_TO_LOAD_PARAM; import static google.registry.export.CheckBackupAction.CHECK_BACKUP_NAME_PARAM; -import static google.registry.testing.TaskQueueHelper.assertTasksEnqueued; import static org.mockito.Mockito.when; +import com.google.cloud.tasks.v2.HttpMethod; import com.google.common.base.Joiner; +import com.google.protobuf.util.Timestamps; import google.registry.export.datastore.DatastoreAdmin; import google.registry.export.datastore.DatastoreAdmin.Export; import google.registry.export.datastore.Operation; import google.registry.testing.AppEngineExtension; +import google.registry.testing.CloudTasksHelper; +import google.registry.testing.CloudTasksHelper.TaskMatcher; +import google.registry.testing.FakeClock; import google.registry.testing.FakeResponse; -import google.registry.testing.TaskQueueHelper.TaskMatcher; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -46,13 +49,15 @@ public class BackupDatastoreActionTest { @Mock private Operation backupOperation; private final FakeResponse response = new FakeResponse(); + private CloudTasksHelper cloudTasksHelper = new CloudTasksHelper(); private final BackupDatastoreAction action = new BackupDatastoreAction(); @BeforeEach void beforeEach() throws Exception { action.datastoreAdmin = datastoreAdmin; action.response = response; - + action.cloudTasksUtils = cloudTasksHelper.getTestCloudTasksUtils(); + action.clock = new FakeClock(); when(datastoreAdmin.export( "gs://registry-project-id-datastore-backups", AnnotatedEntities.getBackupKinds())) .thenReturn(exportRequest); @@ -66,7 +71,7 @@ public class BackupDatastoreActionTest { @Test void testBackup_enqueuesPollTask() { action.run(); - assertTasksEnqueued( + cloudTasksHelper.assertTasksEnqueued( CheckBackupAction.QUEUE, new TaskMatcher() .url(CheckBackupAction.PATH) @@ -74,7 +79,10 @@ public class BackupDatastoreActionTest { .param( CHECK_BACKUP_KINDS_TO_LOAD_PARAM, Joiner.on(",").join(AnnotatedEntities.getReportingKinds())) - .method("POST")); + .method(HttpMethod.POST) + .scheduleTime( + Timestamps.fromMillis( + action.clock.nowUtc().plus(CheckBackupAction.POLL_COUNTDOWN).getMillis()))); assertThat(response.getPayload()) .isEqualTo( "Datastore backup started with name: " diff --git a/core/src/test/java/google/registry/export/BigqueryPollJobActionTest.java b/core/src/test/java/google/registry/export/BigqueryPollJobActionTest.java index 1ba2a72d1..20a738438 100644 --- a/core/src/test/java/google/registry/export/BigqueryPollJobActionTest.java +++ b/core/src/test/java/google/registry/export/BigqueryPollJobActionTest.java @@ -14,11 +14,7 @@ package google.registry.export; -import static com.google.appengine.api.taskqueue.QueueFactory.getQueue; -import static com.google.common.collect.Iterables.getOnlyElement; import static com.google.common.truth.Truth.assertThat; -import static google.registry.testing.TaskQueueHelper.assertNoTasksEnqueued; -import static google.registry.testing.TaskQueueHelper.assertTasksEnqueued; import static google.registry.testing.TestLogHandlerUtils.assertLogMessage; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.logging.Level.INFO; @@ -30,27 +26,22 @@ import static org.mockito.Mockito.when; import com.google.api.services.bigquery.Bigquery; import com.google.api.services.bigquery.model.ErrorProto; import com.google.api.services.bigquery.model.Job; -import com.google.api.services.bigquery.model.JobReference; import com.google.api.services.bigquery.model.JobStatus; -import com.google.appengine.api.taskqueue.TaskOptions; -import com.google.appengine.api.taskqueue.TaskOptions.Method; -import com.google.appengine.api.taskqueue.dev.QueueStateInfo.TaskStateInfo; -import google.registry.export.BigqueryPollJobAction.BigqueryPollJobEnqueuer; +import com.google.cloud.tasks.v2.AppEngineHttpRequest; +import com.google.cloud.tasks.v2.HttpMethod; +import com.google.cloud.tasks.v2.Task; +import com.google.common.net.HttpHeaders; +import com.google.common.net.MediaType; +import com.google.protobuf.ByteString; import google.registry.request.HttpException.BadRequestException; import google.registry.request.HttpException.NotModifiedException; import google.registry.testing.AppEngineExtension; -import google.registry.testing.FakeClock; -import google.registry.testing.FakeSleeper; -import google.registry.testing.TaskQueueHelper; -import google.registry.testing.TaskQueueHelper.TaskMatcher; +import google.registry.testing.CloudTasksHelper; +import google.registry.testing.CloudTasksHelper.TaskMatcher; import google.registry.util.CapturingLogHandler; import google.registry.util.JdkLoggerConfig; -import google.registry.util.Retrier; -import google.registry.util.TaskQueueUtils; -import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -66,8 +57,6 @@ public class BigqueryPollJobActionTest { private static final String PROJECT_ID = "project_id"; private static final String JOB_ID = "job_id"; private static final String CHAINED_QUEUE_NAME = UpdateSnapshotViewAction.QUEUE; - private static final TaskQueueUtils TASK_QUEUE_UTILS = - new TaskQueueUtils(new Retrier(new FakeSleeper(new FakeClock()), 1)); private final Bigquery bigquery = mock(Bigquery.class); private final Bigquery.Jobs bigqueryJobs = mock(Bigquery.Jobs.class); @@ -75,53 +64,20 @@ public class BigqueryPollJobActionTest { private final CapturingLogHandler logHandler = new CapturingLogHandler(); private BigqueryPollJobAction action = new BigqueryPollJobAction(); + private CloudTasksHelper cloudTasksHelper = new CloudTasksHelper(); @BeforeEach void beforeEach() throws Exception { action.bigquery = bigquery; when(bigquery.jobs()).thenReturn(bigqueryJobs); when(bigqueryJobs.get(PROJECT_ID, JOB_ID)).thenReturn(bigqueryJobsGet); - action.taskQueueUtils = TASK_QUEUE_UTILS; + action.cloudTasksUtils = cloudTasksHelper.getTestCloudTasksUtils(); action.projectId = PROJECT_ID; action.jobId = JOB_ID; action.chainedQueueName = () -> CHAINED_QUEUE_NAME; JdkLoggerConfig.getConfig(BigqueryPollJobAction.class).addHandler(logHandler); } - private static TaskMatcher newPollJobTaskMatcher(String method) { - return new TaskMatcher() - .method(method) - .url(BigqueryPollJobAction.PATH) - .header(BigqueryPollJobAction.PROJECT_ID_HEADER, PROJECT_ID) - .header(BigqueryPollJobAction.JOB_ID_HEADER, JOB_ID); - } - - @Test - void testSuccess_enqueuePollTask() { - new BigqueryPollJobEnqueuer(TASK_QUEUE_UTILS).enqueuePollTask( - new JobReference().setProjectId(PROJECT_ID).setJobId(JOB_ID)); - assertTasksEnqueued(BigqueryPollJobAction.QUEUE, newPollJobTaskMatcher("GET")); - } - - @Test - void testSuccess_enqueuePollTask_withChainedTask() throws Exception { - TaskOptions chainedTask = TaskOptions.Builder - .withUrl("/_dr/something") - .method(Method.POST) - .header("X-Testing", "foo") - .param("testing", "bar"); - new BigqueryPollJobEnqueuer(TASK_QUEUE_UTILS).enqueuePollTask( - new JobReference().setProjectId(PROJECT_ID).setJobId(JOB_ID), - chainedTask, - getQueue(CHAINED_QUEUE_NAME)); - assertTasksEnqueued(BigqueryPollJobAction.QUEUE, newPollJobTaskMatcher("POST")); - TaskStateInfo taskInfo = getOnlyElement( - TaskQueueHelper.getQueueInfo(BigqueryPollJobAction.QUEUE).getTaskInfo()); - ByteArrayInputStream taskBodyBytes = new ByteArrayInputStream(taskInfo.getBodyAsBytes()); - TaskOptions taskOptions = (TaskOptions) new ObjectInputStream(taskBodyBytes).readObject(); - assertThat(taskOptions).isEqualTo(chainedTask); - } - @Test void testSuccess_jobCompletedSuccessfully() throws Exception { when(bigqueryJobsGet.execute()).thenReturn( @@ -136,15 +92,20 @@ public class BigqueryPollJobActionTest { when(bigqueryJobsGet.execute()).thenReturn( new Job().setStatus(new JobStatus().setState("DONE"))); - TaskOptions chainedTask = - TaskOptions.Builder.withUrl("/_dr/something") - .method(Method.POST) - .header("X-Testing", "foo") - .param("testing", "bar") - .taskName("my_task_name"); + Task chainedTask = + Task.newBuilder() + .setName("my_task_name") + .setAppEngineHttpRequest( + AppEngineHttpRequest.newBuilder() + .setHttpMethod(HttpMethod.POST) + .setRelativeUri("/_dr/something") + .putHeaders("X-Test", "foo") + .putHeaders(HttpHeaders.CONTENT_TYPE, MediaType.FORM_DATA.toString()) + .setBody(ByteString.copyFromUtf8("testing=bar"))) + .build(); ByteArrayOutputStream bytes = new ByteArrayOutputStream(); new ObjectOutputStream(bytes).writeObject(chainedTask); - action.payload = bytes.toByteArray(); + action.payload = ByteString.copyFrom(bytes.toByteArray()); action.run(); assertLogMessage( @@ -153,14 +114,15 @@ public class BigqueryPollJobActionTest { logHandler, INFO, "Added chained task my_task_name for /_dr/something to queue " + CHAINED_QUEUE_NAME); - assertTasksEnqueued( + cloudTasksHelper.assertTasksEnqueued( CHAINED_QUEUE_NAME, new TaskMatcher() .url("/_dr/something") - .method("POST") - .header("X-Testing", "foo") + .header("X-Test", "foo") + .header(HttpHeaders.CONTENT_TYPE, MediaType.FORM_DATA.toString()) .param("testing", "bar") - .taskName("my_task_name")); + .taskName("my_task_name") + .method(HttpMethod.POST)); } @Test @@ -172,7 +134,7 @@ public class BigqueryPollJobActionTest { action.run(); assertLogMessage( logHandler, SEVERE, String.format("Bigquery job failed - %s:%s", PROJECT_ID, JOB_ID)); - assertNoTasksEnqueued(CHAINED_QUEUE_NAME); + cloudTasksHelper.assertNoTasksEnqueued(CHAINED_QUEUE_NAME); } @Test @@ -192,7 +154,7 @@ public class BigqueryPollJobActionTest { void testFailure_badChainedTaskPayload() throws Exception { when(bigqueryJobsGet.execute()).thenReturn( new Job().setStatus(new JobStatus().setState("DONE"))); - action.payload = "payload".getBytes(UTF_8); + action.payload = ByteString.copyFrom("payload".getBytes(UTF_8)); BadRequestException thrown = assertThrows(BadRequestException.class, action::run); assertThat(thrown).hasMessageThat().contains("Cannot deserialize task from payload"); } diff --git a/core/src/test/java/google/registry/export/CheckBackupActionTest.java b/core/src/test/java/google/registry/export/CheckBackupActionTest.java index 5f532eba0..0c133fc75 100644 --- a/core/src/test/java/google/registry/export/CheckBackupActionTest.java +++ b/core/src/test/java/google/registry/export/CheckBackupActionTest.java @@ -15,10 +15,6 @@ package google.registry.export; import static com.google.common.truth.Truth.assertThat; -import static google.registry.export.CheckBackupAction.CHECK_BACKUP_KINDS_TO_LOAD_PARAM; -import static google.registry.export.CheckBackupAction.CHECK_BACKUP_NAME_PARAM; -import static google.registry.testing.TaskQueueHelper.assertNoTasksEnqueued; -import static google.registry.testing.TaskQueueHelper.assertTasksEnqueued; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.when; @@ -27,7 +23,7 @@ import com.google.api.client.googleapis.json.GoogleJsonResponseException; import com.google.api.client.http.HttpHeaders; import com.google.api.client.json.JsonFactory; import com.google.api.client.json.jackson2.JacksonFactory; -import com.google.common.collect.ImmutableSet; +import com.google.cloud.tasks.v2.HttpMethod; import google.registry.export.datastore.DatastoreAdmin; import google.registry.export.datastore.DatastoreAdmin.Get; import google.registry.export.datastore.Operation; @@ -36,9 +32,10 @@ import google.registry.request.HttpException.BadRequestException; import google.registry.request.HttpException.NoContentException; import google.registry.request.HttpException.NotModifiedException; import google.registry.testing.AppEngineExtension; +import google.registry.testing.CloudTasksHelper; +import google.registry.testing.CloudTasksHelper.TaskMatcher; import google.registry.testing.FakeClock; import google.registry.testing.FakeResponse; -import google.registry.testing.TaskQueueHelper.TaskMatcher; import google.registry.testing.TestDataHelper; import org.joda.time.DateTime; import org.joda.time.Duration; @@ -57,7 +54,6 @@ public class CheckBackupActionTest { private static final DateTime START_TIME = DateTime.parse("2014-08-01T01:02:03Z"); private static final DateTime COMPLETE_TIME = START_TIME.plus(Duration.standardMinutes(30)); - private static final JsonFactory JSON_FACTORY = JacksonFactory.getDefaultInstance(); @RegisterExtension @@ -71,6 +67,7 @@ public class CheckBackupActionTest { private final FakeResponse response = new FakeResponse(); private final FakeClock clock = new FakeClock(COMPLETE_TIME.plusMillis(1000)); private final CheckBackupAction action = new CheckBackupAction(); + private final CloudTasksHelper cloudTasksHelper = new CloudTasksHelper(); @BeforeEach void beforeEach() throws Exception { @@ -80,6 +77,7 @@ public class CheckBackupActionTest { action.backupName = "some_backup"; action.kindsToLoadParam = "one,two"; action.response = response; + action.cloudTasksUtils = cloudTasksHelper.getTestCloudTasksUtils(); when(datastoreAdmin.get(anyString())).thenReturn(getBackupProgressRequest); when(getBackupProgressRequest.execute()).thenAnswer(arg -> backupOperation); @@ -110,30 +108,17 @@ public class CheckBackupActionTest { null)); } - private static void assertLoadTaskEnqueued(String id, String folder, String kinds) { - assertTasksEnqueued( + private void assertLoadTaskEnqueued(String id, String folder, String kinds) { + cloudTasksHelper.assertTasksEnqueued( "export-snapshot", new TaskMatcher() .url("/_dr/task/uploadDatastoreBackup") - .method("POST") + .method(HttpMethod.POST) .param("id", id) .param("folder", folder) .param("kinds", kinds)); } - @MockitoSettings(strictness = Strictness.LENIENT) - @Test - void testSuccess_enqueuePollTask() { - CheckBackupAction.enqueuePollTask("some_backup_name", ImmutableSet.of("one", "two", "three")); - assertTasksEnqueued( - CheckBackupAction.QUEUE, - new TaskMatcher() - .url(CheckBackupAction.PATH) - .param(CHECK_BACKUP_NAME_PARAM, "some_backup_name") - .param(CHECK_BACKUP_KINDS_TO_LOAD_PARAM, "one,two,three") - .method("POST")); - } - @Test void testPost_forPendingBackup_returnsNotModified() throws Exception { setPendingBackup(); @@ -189,7 +174,7 @@ public class CheckBackupActionTest { action.kindsToLoadParam = ""; action.run(); - assertNoTasksEnqueued("export-snapshot"); + cloudTasksHelper.assertNoTasksEnqueued("export-snapshot"); } @MockitoSettings(strictness = Strictness.LENIENT) diff --git a/core/src/test/java/google/registry/export/UpdateSnapshotViewActionTest.java b/core/src/test/java/google/registry/export/UpdateSnapshotViewActionTest.java index a52a9dd62..e435b92ad 100644 --- a/core/src/test/java/google/registry/export/UpdateSnapshotViewActionTest.java +++ b/core/src/test/java/google/registry/export/UpdateSnapshotViewActionTest.java @@ -14,15 +14,7 @@ package google.registry.export; -import static com.google.appengine.api.taskqueue.QueueFactory.getQueue; import static com.google.common.truth.Truth.assertThat; -import static google.registry.export.UpdateSnapshotViewAction.QUEUE; -import static google.registry.export.UpdateSnapshotViewAction.UPDATE_SNAPSHOT_DATASET_ID_PARAM; -import static google.registry.export.UpdateSnapshotViewAction.UPDATE_SNAPSHOT_KIND_PARAM; -import static google.registry.export.UpdateSnapshotViewAction.UPDATE_SNAPSHOT_TABLE_ID_PARAM; -import static google.registry.export.UpdateSnapshotViewAction.UPDATE_SNAPSHOT_VIEWNAME_PARAM; -import static google.registry.export.UpdateSnapshotViewAction.createViewUpdateTask; -import static google.registry.testing.TaskQueueHelper.assertTasksEnqueued; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; @@ -38,7 +30,6 @@ import com.google.common.collect.Iterables; import google.registry.bigquery.CheckedBigquery; import google.registry.request.HttpException.InternalServerErrorException; import google.registry.testing.AppEngineExtension; -import google.registry.testing.TaskQueueHelper.TaskMatcher; import java.io.IOException; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -81,23 +72,6 @@ public class UpdateSnapshotViewActionTest { action.tableId = "12345_fookind"; } - @Test - void testSuccess_createViewUpdateTask() { - getQueue(QUEUE) - .add( - createViewUpdateTask( - "some_dataset", "12345_fookind", "fookind", "latest_datastore_export")); - assertTasksEnqueued( - QUEUE, - new TaskMatcher() - .url(UpdateSnapshotViewAction.PATH) - .method("POST") - .param(UPDATE_SNAPSHOT_DATASET_ID_PARAM, "some_dataset") - .param(UPDATE_SNAPSHOT_TABLE_ID_PARAM, "12345_fookind") - .param(UPDATE_SNAPSHOT_KIND_PARAM, "fookind") - .param(UPDATE_SNAPSHOT_VIEWNAME_PARAM, "latest_datastore_export")); - } - @Test void testSuccess_doPost() throws Exception { action.run(); diff --git a/core/src/test/java/google/registry/export/UploadDatastoreBackupActionTest.java b/core/src/test/java/google/registry/export/UploadDatastoreBackupActionTest.java index d8c9fce4f..b6220100d 100644 --- a/core/src/test/java/google/registry/export/UploadDatastoreBackupActionTest.java +++ b/core/src/test/java/google/registry/export/UploadDatastoreBackupActionTest.java @@ -16,16 +16,16 @@ package google.registry.export; import static com.google.common.collect.Iterables.transform; import static com.google.common.truth.Truth.assertThat; +import static com.google.common.truth.Truth8.assertThat; +import static google.registry.export.BigqueryPollJobAction.CHAINED_TASK_QUEUE_HEADER; +import static google.registry.export.BigqueryPollJobAction.JOB_ID_HEADER; +import static google.registry.export.BigqueryPollJobAction.PROJECT_ID_HEADER; +import static google.registry.export.UpdateSnapshotViewAction.UPDATE_SNAPSHOT_DATASET_ID_PARAM; +import static google.registry.export.UpdateSnapshotViewAction.UPDATE_SNAPSHOT_KIND_PARAM; +import static google.registry.export.UpdateSnapshotViewAction.UPDATE_SNAPSHOT_TABLE_ID_PARAM; +import static google.registry.export.UpdateSnapshotViewAction.UPDATE_SNAPSHOT_VIEWNAME_PARAM; import static google.registry.export.UploadDatastoreBackupAction.BACKUP_DATASET; -import static google.registry.export.UploadDatastoreBackupAction.LATEST_BACKUP_VIEW_NAME; -import static google.registry.export.UploadDatastoreBackupAction.PATH; -import static google.registry.export.UploadDatastoreBackupAction.QUEUE; -import static google.registry.export.UploadDatastoreBackupAction.UPLOAD_BACKUP_FOLDER_PARAM; -import static google.registry.export.UploadDatastoreBackupAction.UPLOAD_BACKUP_ID_PARAM; -import static google.registry.export.UploadDatastoreBackupAction.UPLOAD_BACKUP_KINDS_PARAM; -import static google.registry.export.UploadDatastoreBackupAction.enqueueUploadBackupTask; import static google.registry.export.UploadDatastoreBackupAction.getBackupInfoFileForKind; -import static google.registry.testing.TaskQueueHelper.assertTasksEnqueued; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; @@ -38,16 +38,20 @@ import com.google.api.services.bigquery.Bigquery; import com.google.api.services.bigquery.model.Dataset; import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobConfigurationLoad; -import com.google.api.services.bigquery.model.JobReference; -import com.google.appengine.api.taskqueue.QueueFactory; -import com.google.common.collect.ImmutableSet; +import com.google.cloud.tasks.v2.HttpMethod; +import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.Iterables; +import com.google.protobuf.util.Timestamps; import google.registry.bigquery.CheckedBigquery; -import google.registry.export.BigqueryPollJobAction.BigqueryPollJobEnqueuer; import google.registry.request.HttpException.InternalServerErrorException; import google.registry.testing.AppEngineExtension; -import google.registry.testing.TaskQueueHelper.TaskMatcher; +import google.registry.testing.CloudTasksHelper; +import google.registry.testing.CloudTasksHelper.TaskMatcher; +import google.registry.testing.FakeClock; +import google.registry.util.CloudTasksUtils; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.ObjectInputStream; import java.util.List; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -67,8 +71,9 @@ public class UploadDatastoreBackupActionTest { private final Bigquery.Datasets bigqueryDatasets = mock(Bigquery.Datasets.class); private final Bigquery.Datasets.Insert bigqueryDatasetsInsert = mock(Bigquery.Datasets.Insert.class); - private final BigqueryPollJobEnqueuer bigqueryPollEnqueuer = mock(BigqueryPollJobEnqueuer.class); private UploadDatastoreBackupAction action; + private CloudTasksHelper cloudTasksHelper = new CloudTasksHelper(); + private CloudTasksUtils cloudTasksUtils = cloudTasksHelper.getTestCloudTasksUtils(); @BeforeEach void beforeEach() throws Exception { @@ -80,25 +85,14 @@ public class UploadDatastoreBackupActionTest { .thenReturn(bigqueryDatasetsInsert); action = new UploadDatastoreBackupAction(); action.checkedBigquery = checkedBigquery; - action.bigqueryPollEnqueuer = bigqueryPollEnqueuer; action.projectId = "Project-Id"; action.backupFolderUrl = "gs://bucket/path"; action.backupId = "2018-12-05T17:46:39_92612"; action.backupKinds = "one,two,three"; + action.cloudTasksUtils = cloudTasksHelper.getTestCloudTasksUtils(); + action.clock = new FakeClock(); } - @Test - void testSuccess_enqueueLoadTask() { - enqueueUploadBackupTask("id12345", "gs://bucket/path", ImmutableSet.of("one", "two", "three")); - assertTasksEnqueued( - QUEUE, - new TaskMatcher() - .url(PATH) - .method("POST") - .param(UPLOAD_BACKUP_ID_PARAM, "id12345") - .param(UPLOAD_BACKUP_FOLDER_PARAM, "gs://bucket/path") - .param(UPLOAD_BACKUP_KINDS_PARAM, "one,two,three")); - } @Test void testSuccess_doPost() throws Exception { @@ -153,33 +147,91 @@ public class UploadDatastoreBackupActionTest { verify(bigqueryJobsInsert, times(3)).execute(); // Check that the poll tasks for each load job were enqueued. - verify(bigqueryPollEnqueuer) - .enqueuePollTask( - new JobReference() - .setProjectId("Project-Id") - .setJobId("load-backup-2018_12_05T17_46_39_92612-one"), - UpdateSnapshotViewAction.createViewUpdateTask( - BACKUP_DATASET, "2018_12_05T17_46_39_92612_one", "one", LATEST_BACKUP_VIEW_NAME), - QueueFactory.getQueue(UpdateSnapshotViewAction.QUEUE)); - verify(bigqueryPollEnqueuer) - .enqueuePollTask( - new JobReference() - .setProjectId("Project-Id") - .setJobId("load-backup-2018_12_05T17_46_39_92612-two"), - UpdateSnapshotViewAction.createViewUpdateTask( - BACKUP_DATASET, "2018_12_05T17_46_39_92612_two", "two", LATEST_BACKUP_VIEW_NAME), - QueueFactory.getQueue(UpdateSnapshotViewAction.QUEUE)); - verify(bigqueryPollEnqueuer) - .enqueuePollTask( - new JobReference() - .setProjectId("Project-Id") - .setJobId("load-backup-2018_12_05T17_46_39_92612-three"), - UpdateSnapshotViewAction.createViewUpdateTask( - BACKUP_DATASET, - "2018_12_05T17_46_39_92612_three", - "three", - LATEST_BACKUP_VIEW_NAME), - QueueFactory.getQueue(UpdateSnapshotViewAction.QUEUE)); + cloudTasksHelper.assertTasksEnqueued( + BigqueryPollJobAction.QUEUE, + new TaskMatcher() + .method(HttpMethod.POST) + .header(PROJECT_ID_HEADER, "Project-Id") + .header(JOB_ID_HEADER, "load-backup-2018_12_05T17_46_39_92612-one") + .header(CHAINED_TASK_QUEUE_HEADER, UpdateSnapshotViewAction.QUEUE) + .scheduleTime( + Timestamps.fromMillis( + action.clock.nowUtc().plus(BigqueryPollJobAction.POLL_COUNTDOWN).getMillis())), + new TaskMatcher() + .method(HttpMethod.POST) + .header(PROJECT_ID_HEADER, "Project-Id") + .header(JOB_ID_HEADER, "load-backup-2018_12_05T17_46_39_92612-two") + .header(CHAINED_TASK_QUEUE_HEADER, UpdateSnapshotViewAction.QUEUE) + .scheduleTime( + Timestamps.fromMillis( + action.clock.nowUtc().plus(BigqueryPollJobAction.POLL_COUNTDOWN).getMillis())), + new TaskMatcher() + .method(HttpMethod.POST) + .header(PROJECT_ID_HEADER, "Project-Id") + .header(JOB_ID_HEADER, "load-backup-2018_12_05T17_46_39_92612-three") + .header(CHAINED_TASK_QUEUE_HEADER, UpdateSnapshotViewAction.QUEUE) + .scheduleTime( + Timestamps.fromMillis( + action.clock.nowUtc().plus(BigqueryPollJobAction.POLL_COUNTDOWN).getMillis()))); + + // assert the chained task of each enqueud task is correct + assertThat( + cloudTasksHelper.getTestTasksFor(BigqueryPollJobAction.QUEUE).stream() + .map( + testTask -> { + try { + return new ObjectInputStream( + new ByteArrayInputStream( + testTask.getAppEngineHttpRequest().getBody().toByteArray())) + .readObject(); + } catch (ClassNotFoundException | IOException e) { + return null; + } + })) + .containsExactly( + cloudTasksHelper + .getTestCloudTasksUtils() + .createPostTask( + UpdateSnapshotViewAction.PATH, + "BACKEND", + ImmutableMultimap.of( + UPDATE_SNAPSHOT_DATASET_ID_PARAM, + "datastore_backups", + UPDATE_SNAPSHOT_TABLE_ID_PARAM, + "2018_12_05T17_46_39_92612_one", + UPDATE_SNAPSHOT_KIND_PARAM, + "one", + UPDATE_SNAPSHOT_VIEWNAME_PARAM, + "latest_datastore_export")), + cloudTasksHelper + .getTestCloudTasksUtils() + .createPostTask( + UpdateSnapshotViewAction.PATH, + "BACKEND", + ImmutableMultimap.of( + UPDATE_SNAPSHOT_DATASET_ID_PARAM, + "datastore_backups", + UPDATE_SNAPSHOT_TABLE_ID_PARAM, + "2018_12_05T17_46_39_92612_two", + UPDATE_SNAPSHOT_KIND_PARAM, + "two", + UPDATE_SNAPSHOT_VIEWNAME_PARAM, + "latest_datastore_export")), + cloudTasksHelper + .getTestCloudTasksUtils() + .createPostTask( + UpdateSnapshotViewAction.PATH, + "BACKEND", + ImmutableMultimap.of( + UPDATE_SNAPSHOT_DATASET_ID_PARAM, + "datastore_backups", + UPDATE_SNAPSHOT_TABLE_ID_PARAM, + "2018_12_05T17_46_39_92612_three", + UPDATE_SNAPSHOT_KIND_PARAM, + "three", + UPDATE_SNAPSHOT_VIEWNAME_PARAM, + "latest_datastore_export"))) + .inOrder(); } @Test