mirror of
https://github.com/google/nomulus.git
synced 2025-08-15 05:54:06 +02:00
Remove static methods in back up actions (#1481)
* Remove static methods in back up actions * Remove BigqueryPollJob helper class * Add schedule time in task comparison * Change payload type from byte[] to ByteString
This commit is contained in:
parent
c24e0053c8
commit
ea4d60c830
11 changed files with 277 additions and 298 deletions
|
@ -14,18 +14,22 @@
|
||||||
|
|
||||||
package google.registry.export;
|
package google.registry.export;
|
||||||
|
|
||||||
import static google.registry.export.CheckBackupAction.enqueuePollTask;
|
|
||||||
import static google.registry.request.Action.Method.POST;
|
import static google.registry.request.Action.Method.POST;
|
||||||
|
|
||||||
|
import com.google.common.base.Joiner;
|
||||||
|
import com.google.common.collect.ImmutableMultimap;
|
||||||
import com.google.common.flogger.FluentLogger;
|
import com.google.common.flogger.FluentLogger;
|
||||||
import google.registry.config.RegistryConfig;
|
import google.registry.config.RegistryConfig;
|
||||||
import google.registry.export.datastore.DatastoreAdmin;
|
import google.registry.export.datastore.DatastoreAdmin;
|
||||||
import google.registry.export.datastore.Operation;
|
import google.registry.export.datastore.Operation;
|
||||||
import google.registry.model.annotations.DeleteAfterMigration;
|
import google.registry.model.annotations.DeleteAfterMigration;
|
||||||
import google.registry.request.Action;
|
import google.registry.request.Action;
|
||||||
|
import google.registry.request.Action.Service;
|
||||||
import google.registry.request.HttpException.InternalServerErrorException;
|
import google.registry.request.HttpException.InternalServerErrorException;
|
||||||
import google.registry.request.Response;
|
import google.registry.request.Response;
|
||||||
import google.registry.request.auth.Auth;
|
import google.registry.request.auth.Auth;
|
||||||
|
import google.registry.util.Clock;
|
||||||
|
import google.registry.util.CloudTasksUtils;
|
||||||
import javax.inject.Inject;
|
import javax.inject.Inject;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -59,6 +63,8 @@ public class BackupDatastoreAction implements Runnable {
|
||||||
|
|
||||||
@Inject DatastoreAdmin datastoreAdmin;
|
@Inject DatastoreAdmin datastoreAdmin;
|
||||||
@Inject Response response;
|
@Inject Response response;
|
||||||
|
@Inject Clock clock;
|
||||||
|
@Inject CloudTasksUtils cloudTasksUtils;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
BackupDatastoreAction() {}
|
BackupDatastoreAction() {}
|
||||||
|
@ -73,8 +79,19 @@ public class BackupDatastoreAction implements Runnable {
|
||||||
.execute();
|
.execute();
|
||||||
|
|
||||||
String backupName = backup.getName();
|
String backupName = backup.getName();
|
||||||
// Enqueue a poll task to monitor the backup and load REPORTING-related kinds into bigquery.
|
// Enqueue a poll task to monitor the backup for completion and load reporting-related kinds
|
||||||
enqueuePollTask(backupName, AnnotatedEntities.getReportingKinds());
|
// into bigquery.
|
||||||
|
cloudTasksUtils.enqueue(
|
||||||
|
CheckBackupAction.QUEUE,
|
||||||
|
cloudTasksUtils.createPostTaskWithDelay(
|
||||||
|
CheckBackupAction.PATH,
|
||||||
|
Service.BACKEND.toString(),
|
||||||
|
ImmutableMultimap.of(
|
||||||
|
CheckBackupAction.CHECK_BACKUP_NAME_PARAM,
|
||||||
|
backupName,
|
||||||
|
CheckBackupAction.CHECK_BACKUP_KINDS_TO_LOAD_PARAM,
|
||||||
|
Joiner.on(',').join(AnnotatedEntities.getReportingKinds())),
|
||||||
|
CheckBackupAction.POLL_COUNTDOWN));
|
||||||
String message =
|
String message =
|
||||||
String.format(
|
String.format(
|
||||||
"Datastore backup started with name: %s\nSaving to %s",
|
"Datastore backup started with name: %s\nSaving to %s",
|
||||||
|
|
|
@ -14,18 +14,14 @@
|
||||||
|
|
||||||
package google.registry.export;
|
package google.registry.export;
|
||||||
|
|
||||||
import static com.google.appengine.api.taskqueue.QueueFactory.getQueue;
|
|
||||||
import static com.google.appengine.api.taskqueue.TaskOptions.Builder.withUrl;
|
|
||||||
import static google.registry.bigquery.BigqueryUtils.toJobReferenceString;
|
import static google.registry.bigquery.BigqueryUtils.toJobReferenceString;
|
||||||
|
|
||||||
import com.google.api.services.bigquery.Bigquery;
|
import com.google.api.services.bigquery.Bigquery;
|
||||||
import com.google.api.services.bigquery.model.Job;
|
import com.google.api.services.bigquery.model.Job;
|
||||||
import com.google.api.services.bigquery.model.JobReference;
|
import com.google.api.services.bigquery.model.JobReference;
|
||||||
import com.google.appengine.api.taskqueue.Queue;
|
import com.google.cloud.tasks.v2.Task;
|
||||||
import com.google.appengine.api.taskqueue.TaskHandle;
|
|
||||||
import com.google.appengine.api.taskqueue.TaskOptions;
|
|
||||||
import com.google.appengine.api.taskqueue.TaskOptions.Method;
|
|
||||||
import com.google.common.flogger.FluentLogger;
|
import com.google.common.flogger.FluentLogger;
|
||||||
|
import com.google.protobuf.ByteString;
|
||||||
import dagger.Lazy;
|
import dagger.Lazy;
|
||||||
import google.registry.request.Action;
|
import google.registry.request.Action;
|
||||||
import google.registry.request.Header;
|
import google.registry.request.Header;
|
||||||
|
@ -33,12 +29,10 @@ import google.registry.request.HttpException.BadRequestException;
|
||||||
import google.registry.request.HttpException.NotModifiedException;
|
import google.registry.request.HttpException.NotModifiedException;
|
||||||
import google.registry.request.Payload;
|
import google.registry.request.Payload;
|
||||||
import google.registry.request.auth.Auth;
|
import google.registry.request.auth.Auth;
|
||||||
import google.registry.util.TaskQueueUtils;
|
import google.registry.util.CloudTasksUtils;
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.ByteArrayOutputStream;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.ObjectInputStream;
|
import java.io.ObjectInputStream;
|
||||||
import java.io.ObjectOutputStream;
|
|
||||||
import javax.inject.Inject;
|
import javax.inject.Inject;
|
||||||
import org.joda.time.Duration;
|
import org.joda.time.Duration;
|
||||||
|
|
||||||
|
@ -67,11 +61,14 @@ public class BigqueryPollJobAction implements Runnable {
|
||||||
static final Duration POLL_COUNTDOWN = Duration.standardSeconds(20);
|
static final Duration POLL_COUNTDOWN = Duration.standardSeconds(20);
|
||||||
|
|
||||||
@Inject Bigquery bigquery;
|
@Inject Bigquery bigquery;
|
||||||
@Inject TaskQueueUtils taskQueueUtils;
|
@Inject CloudTasksUtils cloudTasksUtils;
|
||||||
|
|
||||||
@Inject @Header(CHAINED_TASK_QUEUE_HEADER) Lazy<String> chainedQueueName;
|
@Inject @Header(CHAINED_TASK_QUEUE_HEADER) Lazy<String> chainedQueueName;
|
||||||
@Inject @Header(PROJECT_ID_HEADER) String projectId;
|
@Inject @Header(PROJECT_ID_HEADER) String projectId;
|
||||||
@Inject @Header(JOB_ID_HEADER) String jobId;
|
@Inject @Header(JOB_ID_HEADER) String jobId;
|
||||||
@Inject @Payload byte[] payload;
|
|
||||||
|
@Inject @Payload ByteString payload;
|
||||||
|
|
||||||
@Inject BigqueryPollJobAction() {}
|
@Inject BigqueryPollJobAction() {}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -79,20 +76,25 @@ public class BigqueryPollJobAction implements Runnable {
|
||||||
boolean jobOutcome =
|
boolean jobOutcome =
|
||||||
checkJobOutcome(); // Throws a NotModifiedException if the job hasn't completed.
|
checkJobOutcome(); // Throws a NotModifiedException if the job hasn't completed.
|
||||||
// If the job failed, do not enqueue the next step.
|
// If the job failed, do not enqueue the next step.
|
||||||
if (!jobOutcome || payload == null || payload.length == 0) {
|
if (!jobOutcome || payload == null || payload.size() == 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// If there is a payload, it's a chained task, so enqueue it.
|
// If there is a payload, it's a chained task, so enqueue it.
|
||||||
TaskOptions task;
|
Task task;
|
||||||
try {
|
try {
|
||||||
task = (TaskOptions) new ObjectInputStream(new ByteArrayInputStream(payload)).readObject();
|
task =
|
||||||
|
(Task)
|
||||||
|
new ObjectInputStream(new ByteArrayInputStream(payload.toByteArray())).readObject();
|
||||||
} catch (ClassNotFoundException | IOException e) {
|
} catch (ClassNotFoundException | IOException e) {
|
||||||
throw new BadRequestException("Cannot deserialize task from payload", e);
|
throw new BadRequestException("Cannot deserialize task from payload", e);
|
||||||
}
|
}
|
||||||
String taskName = taskQueueUtils.enqueue(getQueue(chainedQueueName.get()), task).getName();
|
Task enqueuedTask = cloudTasksUtils.enqueue(chainedQueueName.get(), task);
|
||||||
logger.atInfo().log(
|
logger.atInfo().log(
|
||||||
"Added chained task %s for %s to queue %s: %s",
|
"Added chained task %s for %s to queue %s: %s",
|
||||||
taskName, task.getUrl(), chainedQueueName.get(), task);
|
enqueuedTask.getName(),
|
||||||
|
enqueuedTask.getAppEngineHttpRequest().getRelativeUri(),
|
||||||
|
chainedQueueName.get(),
|
||||||
|
enqueuedTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -124,51 +126,4 @@ public class BigqueryPollJobAction implements Runnable {
|
||||||
logger.atInfo().log("Bigquery job succeeded - %s.", jobRefString);
|
logger.atInfo().log("Bigquery job succeeded - %s.", jobRefString);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/** Helper class to enqueue a bigquery poll job. */
|
|
||||||
public static class BigqueryPollJobEnqueuer {
|
|
||||||
|
|
||||||
private final TaskQueueUtils taskQueueUtils;
|
|
||||||
|
|
||||||
@Inject
|
|
||||||
BigqueryPollJobEnqueuer(TaskQueueUtils taskQueueUtils) {
|
|
||||||
this.taskQueueUtils = taskQueueUtils;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Enqueue a task to poll for the success or failure of the referenced BigQuery job. */
|
|
||||||
public TaskHandle enqueuePollTask(JobReference jobRef) {
|
|
||||||
return taskQueueUtils.enqueue(
|
|
||||||
getQueue(QUEUE), createCommonPollTask(jobRef).method(Method.GET));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Enqueue a task to poll for the success or failure of the referenced BigQuery job and to
|
|
||||||
* launch the provided task in the specified queue if the job succeeds.
|
|
||||||
*/
|
|
||||||
public TaskHandle enqueuePollTask(
|
|
||||||
JobReference jobRef, TaskOptions chainedTask, Queue chainedTaskQueue) throws IOException {
|
|
||||||
// Serialize the chainedTask into a byte array to put in the task payload.
|
|
||||||
ByteArrayOutputStream taskBytes = new ByteArrayOutputStream();
|
|
||||||
new ObjectOutputStream(taskBytes).writeObject(chainedTask);
|
|
||||||
return taskQueueUtils.enqueue(
|
|
||||||
getQueue(QUEUE),
|
|
||||||
createCommonPollTask(jobRef)
|
|
||||||
.method(Method.POST)
|
|
||||||
.header(CHAINED_TASK_QUEUE_HEADER, chainedTaskQueue.getQueueName())
|
|
||||||
.payload(taskBytes.toByteArray()));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Enqueue a task to poll for the success or failure of the referenced BigQuery job and to
|
|
||||||
* launch the provided task in the specified queue if the job succeeds.
|
|
||||||
*/
|
|
||||||
private static TaskOptions createCommonPollTask(JobReference jobRef) {
|
|
||||||
// Omit host header so that task will be run on the current backend/module.
|
|
||||||
return withUrl(PATH)
|
|
||||||
.countdownMillis(POLL_COUNTDOWN.getMillis())
|
|
||||||
.header(PROJECT_ID_HEADER, jobRef.getProjectId())
|
|
||||||
.header(JOB_ID_HEADER, jobRef.getJobId());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,18 +16,14 @@ package google.registry.export;
|
||||||
|
|
||||||
import static com.google.common.base.Preconditions.checkArgument;
|
import static com.google.common.base.Preconditions.checkArgument;
|
||||||
import static com.google.common.collect.Sets.intersection;
|
import static com.google.common.collect.Sets.intersection;
|
||||||
import static google.registry.export.UploadDatastoreBackupAction.enqueueUploadBackupTask;
|
|
||||||
import static google.registry.request.Action.Method.GET;
|
import static google.registry.request.Action.Method.GET;
|
||||||
import static google.registry.request.Action.Method.POST;
|
import static google.registry.request.Action.Method.POST;
|
||||||
import static javax.servlet.http.HttpServletResponse.SC_NOT_FOUND;
|
import static javax.servlet.http.HttpServletResponse.SC_NOT_FOUND;
|
||||||
|
|
||||||
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
|
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
|
||||||
import com.google.appengine.api.taskqueue.QueueFactory;
|
|
||||||
import com.google.appengine.api.taskqueue.TaskHandle;
|
|
||||||
import com.google.appengine.api.taskqueue.TaskOptions;
|
|
||||||
import com.google.appengine.api.taskqueue.TaskOptions.Method;
|
|
||||||
import com.google.common.base.Joiner;
|
import com.google.common.base.Joiner;
|
||||||
import com.google.common.base.Splitter;
|
import com.google.common.base.Splitter;
|
||||||
|
import com.google.common.collect.ImmutableMultimap;
|
||||||
import com.google.common.collect.ImmutableSet;
|
import com.google.common.collect.ImmutableSet;
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
import com.google.common.flogger.FluentLogger;
|
import com.google.common.flogger.FluentLogger;
|
||||||
|
@ -35,6 +31,7 @@ import google.registry.export.datastore.DatastoreAdmin;
|
||||||
import google.registry.export.datastore.Operation;
|
import google.registry.export.datastore.Operation;
|
||||||
import google.registry.model.annotations.DeleteAfterMigration;
|
import google.registry.model.annotations.DeleteAfterMigration;
|
||||||
import google.registry.request.Action;
|
import google.registry.request.Action;
|
||||||
|
import google.registry.request.Action.Service;
|
||||||
import google.registry.request.HttpException;
|
import google.registry.request.HttpException;
|
||||||
import google.registry.request.HttpException.BadRequestException;
|
import google.registry.request.HttpException.BadRequestException;
|
||||||
import google.registry.request.HttpException.InternalServerErrorException;
|
import google.registry.request.HttpException.InternalServerErrorException;
|
||||||
|
@ -45,6 +42,7 @@ import google.registry.request.RequestMethod;
|
||||||
import google.registry.request.Response;
|
import google.registry.request.Response;
|
||||||
import google.registry.request.auth.Auth;
|
import google.registry.request.auth.Auth;
|
||||||
import google.registry.util.Clock;
|
import google.registry.util.Clock;
|
||||||
|
import google.registry.util.CloudTasksUtils;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import javax.inject.Inject;
|
import javax.inject.Inject;
|
||||||
|
@ -83,6 +81,7 @@ public class CheckBackupAction implements Runnable {
|
||||||
@Inject DatastoreAdmin datastoreAdmin;
|
@Inject DatastoreAdmin datastoreAdmin;
|
||||||
@Inject Clock clock;
|
@Inject Clock clock;
|
||||||
@Inject Response response;
|
@Inject Response response;
|
||||||
|
@Inject CloudTasksUtils cloudTasksUtils;
|
||||||
@Inject @RequestMethod Action.Method requestMethod;
|
@Inject @RequestMethod Action.Method requestMethod;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
|
@ -175,21 +174,22 @@ public class CheckBackupAction implements Runnable {
|
||||||
if (exportedKindsToLoad.isEmpty()) {
|
if (exportedKindsToLoad.isEmpty()) {
|
||||||
message += "no kinds to load into BigQuery.";
|
message += "no kinds to load into BigQuery.";
|
||||||
} else {
|
} else {
|
||||||
enqueueUploadBackupTask(backupId, backup.getExportFolderUrl(), exportedKindsToLoad);
|
/** Enqueue a task for starting a backup load. */
|
||||||
|
cloudTasksUtils.enqueue(
|
||||||
|
UploadDatastoreBackupAction.QUEUE,
|
||||||
|
cloudTasksUtils.createPostTask(
|
||||||
|
UploadDatastoreBackupAction.PATH,
|
||||||
|
Service.BACKEND.toString(),
|
||||||
|
ImmutableMultimap.of(
|
||||||
|
UploadDatastoreBackupAction.UPLOAD_BACKUP_ID_PARAM,
|
||||||
|
backupId,
|
||||||
|
UploadDatastoreBackupAction.UPLOAD_BACKUP_FOLDER_PARAM,
|
||||||
|
backup.getExportFolderUrl(),
|
||||||
|
UploadDatastoreBackupAction.UPLOAD_BACKUP_KINDS_PARAM,
|
||||||
|
Joiner.on(',').join(exportedKindsToLoad))));
|
||||||
message += "BigQuery load task enqueued.";
|
message += "BigQuery load task enqueued.";
|
||||||
}
|
}
|
||||||
logger.atInfo().log(message);
|
logger.atInfo().log(message);
|
||||||
response.setPayload(message);
|
response.setPayload(message);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Enqueue a poll task to monitor the named backup for completion. */
|
|
||||||
static TaskHandle enqueuePollTask(String backupId, ImmutableSet<String> kindsToLoad) {
|
|
||||||
return QueueFactory.getQueue(QUEUE)
|
|
||||||
.add(
|
|
||||||
TaskOptions.Builder.withUrl(PATH)
|
|
||||||
.method(Method.POST)
|
|
||||||
.countdownMillis(POLL_COUNTDOWN.getMillis())
|
|
||||||
.param(CHECK_BACKUP_NAME_PARAM, backupId)
|
|
||||||
.param(CHECK_BACKUP_KINDS_TO_LOAD_PARAM, Joiner.on(',').join(kindsToLoad)));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,8 +21,6 @@ import com.google.api.services.bigquery.Bigquery;
|
||||||
import com.google.api.services.bigquery.model.Table;
|
import com.google.api.services.bigquery.model.Table;
|
||||||
import com.google.api.services.bigquery.model.TableReference;
|
import com.google.api.services.bigquery.model.TableReference;
|
||||||
import com.google.api.services.bigquery.model.ViewDefinition;
|
import com.google.api.services.bigquery.model.ViewDefinition;
|
||||||
import com.google.appengine.api.taskqueue.TaskOptions;
|
|
||||||
import com.google.appengine.api.taskqueue.TaskOptions.Method;
|
|
||||||
import com.google.common.flogger.FluentLogger;
|
import com.google.common.flogger.FluentLogger;
|
||||||
import google.registry.bigquery.CheckedBigquery;
|
import google.registry.bigquery.CheckedBigquery;
|
||||||
import google.registry.config.RegistryConfig.Config;
|
import google.registry.config.RegistryConfig.Config;
|
||||||
|
@ -85,17 +83,6 @@ public class UpdateSnapshotViewAction implements Runnable {
|
||||||
@Inject
|
@Inject
|
||||||
UpdateSnapshotViewAction() {}
|
UpdateSnapshotViewAction() {}
|
||||||
|
|
||||||
/** Create a task for updating a snapshot view. */
|
|
||||||
static TaskOptions createViewUpdateTask(
|
|
||||||
String datasetId, String tableId, String kindName, String viewName) {
|
|
||||||
return TaskOptions.Builder.withUrl(PATH)
|
|
||||||
.method(Method.POST)
|
|
||||||
.param(UPDATE_SNAPSHOT_DATASET_ID_PARAM, datasetId)
|
|
||||||
.param(UPDATE_SNAPSHOT_TABLE_ID_PARAM, tableId)
|
|
||||||
.param(UPDATE_SNAPSHOT_KIND_PARAM, kindName)
|
|
||||||
.param(UPDATE_SNAPSHOT_VIEWNAME_PARAM, viewName);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -14,9 +14,7 @@
|
||||||
|
|
||||||
package google.registry.export;
|
package google.registry.export;
|
||||||
|
|
||||||
import static com.google.appengine.api.taskqueue.QueueFactory.getQueue;
|
|
||||||
import static com.google.common.base.MoreObjects.firstNonNull;
|
import static com.google.common.base.MoreObjects.firstNonNull;
|
||||||
import static google.registry.export.UpdateSnapshotViewAction.createViewUpdateTask;
|
|
||||||
import static google.registry.request.Action.Method.POST;
|
import static google.registry.request.Action.Method.POST;
|
||||||
|
|
||||||
import com.google.api.services.bigquery.Bigquery;
|
import com.google.api.services.bigquery.Bigquery;
|
||||||
|
@ -25,27 +23,33 @@ import com.google.api.services.bigquery.model.JobConfiguration;
|
||||||
import com.google.api.services.bigquery.model.JobConfigurationLoad;
|
import com.google.api.services.bigquery.model.JobConfigurationLoad;
|
||||||
import com.google.api.services.bigquery.model.JobReference;
|
import com.google.api.services.bigquery.model.JobReference;
|
||||||
import com.google.api.services.bigquery.model.TableReference;
|
import com.google.api.services.bigquery.model.TableReference;
|
||||||
import com.google.appengine.api.taskqueue.TaskHandle;
|
import com.google.cloud.tasks.v2.Task;
|
||||||
import com.google.appengine.api.taskqueue.TaskOptions;
|
|
||||||
import com.google.appengine.api.taskqueue.TaskOptions.Method;
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Joiner;
|
import com.google.common.base.Joiner;
|
||||||
import com.google.common.base.Splitter;
|
import com.google.common.base.Splitter;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.ImmutableSet;
|
import com.google.common.collect.ImmutableMultimap;
|
||||||
import com.google.common.flogger.FluentLogger;
|
import com.google.common.flogger.FluentLogger;
|
||||||
|
import com.google.common.net.HttpHeaders;
|
||||||
|
import com.google.common.net.MediaType;
|
||||||
|
import com.google.protobuf.ByteString;
|
||||||
|
import com.google.protobuf.util.Timestamps;
|
||||||
import google.registry.bigquery.BigqueryUtils.SourceFormat;
|
import google.registry.bigquery.BigqueryUtils.SourceFormat;
|
||||||
import google.registry.bigquery.BigqueryUtils.WriteDisposition;
|
import google.registry.bigquery.BigqueryUtils.WriteDisposition;
|
||||||
import google.registry.bigquery.CheckedBigquery;
|
import google.registry.bigquery.CheckedBigquery;
|
||||||
import google.registry.config.RegistryConfig.Config;
|
import google.registry.config.RegistryConfig.Config;
|
||||||
import google.registry.export.BigqueryPollJobAction.BigqueryPollJobEnqueuer;
|
|
||||||
import google.registry.model.annotations.DeleteAfterMigration;
|
import google.registry.model.annotations.DeleteAfterMigration;
|
||||||
import google.registry.request.Action;
|
import google.registry.request.Action;
|
||||||
|
import google.registry.request.Action.Service;
|
||||||
import google.registry.request.HttpException.BadRequestException;
|
import google.registry.request.HttpException.BadRequestException;
|
||||||
import google.registry.request.HttpException.InternalServerErrorException;
|
import google.registry.request.HttpException.InternalServerErrorException;
|
||||||
import google.registry.request.Parameter;
|
import google.registry.request.Parameter;
|
||||||
import google.registry.request.auth.Auth;
|
import google.registry.request.auth.Auth;
|
||||||
|
import google.registry.util.Clock;
|
||||||
|
import google.registry.util.CloudTasksUtils;
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.ObjectOutputStream;
|
||||||
import javax.inject.Inject;
|
import javax.inject.Inject;
|
||||||
|
|
||||||
/** Action to load a Datastore backup from Google Cloud Storage into BigQuery. */
|
/** Action to load a Datastore backup from Google Cloud Storage into BigQuery. */
|
||||||
|
@ -75,7 +79,9 @@ public class UploadDatastoreBackupAction implements Runnable {
|
||||||
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
|
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
|
||||||
|
|
||||||
@Inject CheckedBigquery checkedBigquery;
|
@Inject CheckedBigquery checkedBigquery;
|
||||||
@Inject BigqueryPollJobEnqueuer bigqueryPollEnqueuer;
|
@Inject CloudTasksUtils cloudTasksUtils;
|
||||||
|
@Inject Clock clock;
|
||||||
|
|
||||||
@Inject @Config("projectId") String projectId;
|
@Inject @Config("projectId") String projectId;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
|
@ -93,18 +99,6 @@ public class UploadDatastoreBackupAction implements Runnable {
|
||||||
@Inject
|
@Inject
|
||||||
UploadDatastoreBackupAction() {}
|
UploadDatastoreBackupAction() {}
|
||||||
|
|
||||||
/** Enqueue a task for starting a backup load. */
|
|
||||||
public static TaskHandle enqueueUploadBackupTask(
|
|
||||||
String backupId, String gcsFile, ImmutableSet<String> kinds) {
|
|
||||||
return getQueue(QUEUE)
|
|
||||||
.add(
|
|
||||||
TaskOptions.Builder.withUrl(PATH)
|
|
||||||
.method(Method.POST)
|
|
||||||
.param(UPLOAD_BACKUP_ID_PARAM, backupId)
|
|
||||||
.param(UPLOAD_BACKUP_FOLDER_PARAM, gcsFile)
|
|
||||||
.param(UPLOAD_BACKUP_KINDS_PARAM, Joiner.on(',').join(kinds)));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
|
@ -142,12 +136,46 @@ public class UploadDatastoreBackupAction implements Runnable {
|
||||||
Job job = makeLoadJob(jobRef, sourceUri, tableId);
|
Job job = makeLoadJob(jobRef, sourceUri, tableId);
|
||||||
bigquery.jobs().insert(projectId, job).execute();
|
bigquery.jobs().insert(projectId, job).execute();
|
||||||
|
|
||||||
// Enqueue a task to check on the load job's completion, and if it succeeds, to update a
|
// Serialize the chainedTask into a byte array to put in the task payload.
|
||||||
// well-known view in BigQuery to point at the newly loaded backup table for this kind.
|
ByteArrayOutputStream taskBytes = new ByteArrayOutputStream();
|
||||||
bigqueryPollEnqueuer.enqueuePollTask(
|
new ObjectOutputStream(taskBytes)
|
||||||
jobRef,
|
.writeObject(
|
||||||
createViewUpdateTask(BACKUP_DATASET, tableId, kindName, LATEST_BACKUP_VIEW_NAME),
|
cloudTasksUtils.createPostTask(
|
||||||
getQueue(UpdateSnapshotViewAction.QUEUE));
|
UpdateSnapshotViewAction.PATH,
|
||||||
|
Service.BACKEND.toString(),
|
||||||
|
ImmutableMultimap.of(
|
||||||
|
UpdateSnapshotViewAction.UPDATE_SNAPSHOT_DATASET_ID_PARAM,
|
||||||
|
BACKUP_DATASET,
|
||||||
|
UpdateSnapshotViewAction.UPDATE_SNAPSHOT_TABLE_ID_PARAM,
|
||||||
|
tableId,
|
||||||
|
UpdateSnapshotViewAction.UPDATE_SNAPSHOT_KIND_PARAM,
|
||||||
|
kindName,
|
||||||
|
UpdateSnapshotViewAction.UPDATE_SNAPSHOT_VIEWNAME_PARAM,
|
||||||
|
LATEST_BACKUP_VIEW_NAME)));
|
||||||
|
|
||||||
|
// Enqueues a task to poll for the success or failure of the referenced BigQuery job and to
|
||||||
|
// launch the provided task in the specified queue if the job succeeds.
|
||||||
|
cloudTasksUtils.enqueue(
|
||||||
|
BigqueryPollJobAction.QUEUE,
|
||||||
|
Task.newBuilder()
|
||||||
|
.setAppEngineHttpRequest(
|
||||||
|
cloudTasksUtils
|
||||||
|
.createPostTask(BigqueryPollJobAction.PATH, Service.BACKEND.toString(), null)
|
||||||
|
.getAppEngineHttpRequest()
|
||||||
|
.toBuilder()
|
||||||
|
.putHeaders(BigqueryPollJobAction.PROJECT_ID_HEADER, jobRef.getProjectId())
|
||||||
|
.putHeaders(BigqueryPollJobAction.JOB_ID_HEADER, jobRef.getJobId())
|
||||||
|
.putHeaders(
|
||||||
|
BigqueryPollJobAction.CHAINED_TASK_QUEUE_HEADER,
|
||||||
|
UpdateSnapshotViewAction.QUEUE)
|
||||||
|
// need to include CONTENT_TYPE in header when body is not empty
|
||||||
|
.putHeaders(HttpHeaders.CONTENT_TYPE, MediaType.FORM_DATA.toString())
|
||||||
|
.setBody(ByteString.copyFrom(taskBytes.toByteArray()))
|
||||||
|
.build())
|
||||||
|
.setScheduleTime(
|
||||||
|
Timestamps.fromMillis(
|
||||||
|
clock.nowUtc().plus(BigqueryPollJobAction.POLL_COUNTDOWN).getMillis()))
|
||||||
|
.build());
|
||||||
|
|
||||||
builder.append(String.format(" - %s:%s\n", projectId, jobId));
|
builder.append(String.format(" - %s:%s\n", projectId, jobId));
|
||||||
logger.atInfo().log("Submitted load job %s:%s.", projectId, jobId);
|
logger.atInfo().log("Submitted load job %s:%s.", projectId, jobId);
|
||||||
|
|
|
@ -30,6 +30,7 @@ import com.google.common.collect.ImmutableSet;
|
||||||
import com.google.common.io.ByteStreams;
|
import com.google.common.io.ByteStreams;
|
||||||
import com.google.common.io.CharStreams;
|
import com.google.common.io.CharStreams;
|
||||||
import com.google.common.net.MediaType;
|
import com.google.common.net.MediaType;
|
||||||
|
import com.google.protobuf.ByteString;
|
||||||
import dagger.Module;
|
import dagger.Module;
|
||||||
import dagger.Provides;
|
import dagger.Provides;
|
||||||
import google.registry.model.common.DatabaseMigrationStateSchedule.PrimaryDatabase;
|
import google.registry.model.common.DatabaseMigrationStateSchedule.PrimaryDatabase;
|
||||||
|
@ -184,6 +185,16 @@ public final class RequestModule {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Provides
|
||||||
|
@Payload
|
||||||
|
static ByteString providePayloadAsByteString(HttpServletRequest req) {
|
||||||
|
try {
|
||||||
|
return ByteString.copyFrom(ByteStreams.toByteArray(req.getInputStream()));
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Provides
|
@Provides
|
||||||
static LockHandler provideLockHandler(LockHandlerImpl lockHandler) {
|
static LockHandler provideLockHandler(LockHandlerImpl lockHandler) {
|
||||||
return lockHandler;
|
return lockHandler;
|
||||||
|
|
|
@ -17,16 +17,19 @@ package google.registry.export;
|
||||||
import static com.google.common.truth.Truth.assertThat;
|
import static com.google.common.truth.Truth.assertThat;
|
||||||
import static google.registry.export.CheckBackupAction.CHECK_BACKUP_KINDS_TO_LOAD_PARAM;
|
import static google.registry.export.CheckBackupAction.CHECK_BACKUP_KINDS_TO_LOAD_PARAM;
|
||||||
import static google.registry.export.CheckBackupAction.CHECK_BACKUP_NAME_PARAM;
|
import static google.registry.export.CheckBackupAction.CHECK_BACKUP_NAME_PARAM;
|
||||||
import static google.registry.testing.TaskQueueHelper.assertTasksEnqueued;
|
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import com.google.cloud.tasks.v2.HttpMethod;
|
||||||
import com.google.common.base.Joiner;
|
import com.google.common.base.Joiner;
|
||||||
|
import com.google.protobuf.util.Timestamps;
|
||||||
import google.registry.export.datastore.DatastoreAdmin;
|
import google.registry.export.datastore.DatastoreAdmin;
|
||||||
import google.registry.export.datastore.DatastoreAdmin.Export;
|
import google.registry.export.datastore.DatastoreAdmin.Export;
|
||||||
import google.registry.export.datastore.Operation;
|
import google.registry.export.datastore.Operation;
|
||||||
import google.registry.testing.AppEngineExtension;
|
import google.registry.testing.AppEngineExtension;
|
||||||
|
import google.registry.testing.CloudTasksHelper;
|
||||||
|
import google.registry.testing.CloudTasksHelper.TaskMatcher;
|
||||||
|
import google.registry.testing.FakeClock;
|
||||||
import google.registry.testing.FakeResponse;
|
import google.registry.testing.FakeResponse;
|
||||||
import google.registry.testing.TaskQueueHelper.TaskMatcher;
|
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.junit.jupiter.api.extension.ExtendWith;
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
|
@ -46,13 +49,15 @@ public class BackupDatastoreActionTest {
|
||||||
@Mock private Operation backupOperation;
|
@Mock private Operation backupOperation;
|
||||||
|
|
||||||
private final FakeResponse response = new FakeResponse();
|
private final FakeResponse response = new FakeResponse();
|
||||||
|
private CloudTasksHelper cloudTasksHelper = new CloudTasksHelper();
|
||||||
private final BackupDatastoreAction action = new BackupDatastoreAction();
|
private final BackupDatastoreAction action = new BackupDatastoreAction();
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
void beforeEach() throws Exception {
|
void beforeEach() throws Exception {
|
||||||
action.datastoreAdmin = datastoreAdmin;
|
action.datastoreAdmin = datastoreAdmin;
|
||||||
action.response = response;
|
action.response = response;
|
||||||
|
action.cloudTasksUtils = cloudTasksHelper.getTestCloudTasksUtils();
|
||||||
|
action.clock = new FakeClock();
|
||||||
when(datastoreAdmin.export(
|
when(datastoreAdmin.export(
|
||||||
"gs://registry-project-id-datastore-backups", AnnotatedEntities.getBackupKinds()))
|
"gs://registry-project-id-datastore-backups", AnnotatedEntities.getBackupKinds()))
|
||||||
.thenReturn(exportRequest);
|
.thenReturn(exportRequest);
|
||||||
|
@ -66,7 +71,7 @@ public class BackupDatastoreActionTest {
|
||||||
@Test
|
@Test
|
||||||
void testBackup_enqueuesPollTask() {
|
void testBackup_enqueuesPollTask() {
|
||||||
action.run();
|
action.run();
|
||||||
assertTasksEnqueued(
|
cloudTasksHelper.assertTasksEnqueued(
|
||||||
CheckBackupAction.QUEUE,
|
CheckBackupAction.QUEUE,
|
||||||
new TaskMatcher()
|
new TaskMatcher()
|
||||||
.url(CheckBackupAction.PATH)
|
.url(CheckBackupAction.PATH)
|
||||||
|
@ -74,7 +79,10 @@ public class BackupDatastoreActionTest {
|
||||||
.param(
|
.param(
|
||||||
CHECK_BACKUP_KINDS_TO_LOAD_PARAM,
|
CHECK_BACKUP_KINDS_TO_LOAD_PARAM,
|
||||||
Joiner.on(",").join(AnnotatedEntities.getReportingKinds()))
|
Joiner.on(",").join(AnnotatedEntities.getReportingKinds()))
|
||||||
.method("POST"));
|
.method(HttpMethod.POST)
|
||||||
|
.scheduleTime(
|
||||||
|
Timestamps.fromMillis(
|
||||||
|
action.clock.nowUtc().plus(CheckBackupAction.POLL_COUNTDOWN).getMillis())));
|
||||||
assertThat(response.getPayload())
|
assertThat(response.getPayload())
|
||||||
.isEqualTo(
|
.isEqualTo(
|
||||||
"Datastore backup started with name: "
|
"Datastore backup started with name: "
|
||||||
|
|
|
@ -14,11 +14,7 @@
|
||||||
|
|
||||||
package google.registry.export;
|
package google.registry.export;
|
||||||
|
|
||||||
import static com.google.appengine.api.taskqueue.QueueFactory.getQueue;
|
|
||||||
import static com.google.common.collect.Iterables.getOnlyElement;
|
|
||||||
import static com.google.common.truth.Truth.assertThat;
|
import static com.google.common.truth.Truth.assertThat;
|
||||||
import static google.registry.testing.TaskQueueHelper.assertNoTasksEnqueued;
|
|
||||||
import static google.registry.testing.TaskQueueHelper.assertTasksEnqueued;
|
|
||||||
import static google.registry.testing.TestLogHandlerUtils.assertLogMessage;
|
import static google.registry.testing.TestLogHandlerUtils.assertLogMessage;
|
||||||
import static java.nio.charset.StandardCharsets.UTF_8;
|
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||||
import static java.util.logging.Level.INFO;
|
import static java.util.logging.Level.INFO;
|
||||||
|
@ -30,27 +26,22 @@ import static org.mockito.Mockito.when;
|
||||||
import com.google.api.services.bigquery.Bigquery;
|
import com.google.api.services.bigquery.Bigquery;
|
||||||
import com.google.api.services.bigquery.model.ErrorProto;
|
import com.google.api.services.bigquery.model.ErrorProto;
|
||||||
import com.google.api.services.bigquery.model.Job;
|
import com.google.api.services.bigquery.model.Job;
|
||||||
import com.google.api.services.bigquery.model.JobReference;
|
|
||||||
import com.google.api.services.bigquery.model.JobStatus;
|
import com.google.api.services.bigquery.model.JobStatus;
|
||||||
import com.google.appengine.api.taskqueue.TaskOptions;
|
import com.google.cloud.tasks.v2.AppEngineHttpRequest;
|
||||||
import com.google.appengine.api.taskqueue.TaskOptions.Method;
|
import com.google.cloud.tasks.v2.HttpMethod;
|
||||||
import com.google.appengine.api.taskqueue.dev.QueueStateInfo.TaskStateInfo;
|
import com.google.cloud.tasks.v2.Task;
|
||||||
import google.registry.export.BigqueryPollJobAction.BigqueryPollJobEnqueuer;
|
import com.google.common.net.HttpHeaders;
|
||||||
|
import com.google.common.net.MediaType;
|
||||||
|
import com.google.protobuf.ByteString;
|
||||||
import google.registry.request.HttpException.BadRequestException;
|
import google.registry.request.HttpException.BadRequestException;
|
||||||
import google.registry.request.HttpException.NotModifiedException;
|
import google.registry.request.HttpException.NotModifiedException;
|
||||||
import google.registry.testing.AppEngineExtension;
|
import google.registry.testing.AppEngineExtension;
|
||||||
import google.registry.testing.FakeClock;
|
import google.registry.testing.CloudTasksHelper;
|
||||||
import google.registry.testing.FakeSleeper;
|
import google.registry.testing.CloudTasksHelper.TaskMatcher;
|
||||||
import google.registry.testing.TaskQueueHelper;
|
|
||||||
import google.registry.testing.TaskQueueHelper.TaskMatcher;
|
|
||||||
import google.registry.util.CapturingLogHandler;
|
import google.registry.util.CapturingLogHandler;
|
||||||
import google.registry.util.JdkLoggerConfig;
|
import google.registry.util.JdkLoggerConfig;
|
||||||
import google.registry.util.Retrier;
|
|
||||||
import google.registry.util.TaskQueueUtils;
|
|
||||||
import java.io.ByteArrayInputStream;
|
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.ObjectInputStream;
|
|
||||||
import java.io.ObjectOutputStream;
|
import java.io.ObjectOutputStream;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
@ -66,8 +57,6 @@ public class BigqueryPollJobActionTest {
|
||||||
private static final String PROJECT_ID = "project_id";
|
private static final String PROJECT_ID = "project_id";
|
||||||
private static final String JOB_ID = "job_id";
|
private static final String JOB_ID = "job_id";
|
||||||
private static final String CHAINED_QUEUE_NAME = UpdateSnapshotViewAction.QUEUE;
|
private static final String CHAINED_QUEUE_NAME = UpdateSnapshotViewAction.QUEUE;
|
||||||
private static final TaskQueueUtils TASK_QUEUE_UTILS =
|
|
||||||
new TaskQueueUtils(new Retrier(new FakeSleeper(new FakeClock()), 1));
|
|
||||||
|
|
||||||
private final Bigquery bigquery = mock(Bigquery.class);
|
private final Bigquery bigquery = mock(Bigquery.class);
|
||||||
private final Bigquery.Jobs bigqueryJobs = mock(Bigquery.Jobs.class);
|
private final Bigquery.Jobs bigqueryJobs = mock(Bigquery.Jobs.class);
|
||||||
|
@ -75,53 +64,20 @@ public class BigqueryPollJobActionTest {
|
||||||
|
|
||||||
private final CapturingLogHandler logHandler = new CapturingLogHandler();
|
private final CapturingLogHandler logHandler = new CapturingLogHandler();
|
||||||
private BigqueryPollJobAction action = new BigqueryPollJobAction();
|
private BigqueryPollJobAction action = new BigqueryPollJobAction();
|
||||||
|
private CloudTasksHelper cloudTasksHelper = new CloudTasksHelper();
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
void beforeEach() throws Exception {
|
void beforeEach() throws Exception {
|
||||||
action.bigquery = bigquery;
|
action.bigquery = bigquery;
|
||||||
when(bigquery.jobs()).thenReturn(bigqueryJobs);
|
when(bigquery.jobs()).thenReturn(bigqueryJobs);
|
||||||
when(bigqueryJobs.get(PROJECT_ID, JOB_ID)).thenReturn(bigqueryJobsGet);
|
when(bigqueryJobs.get(PROJECT_ID, JOB_ID)).thenReturn(bigqueryJobsGet);
|
||||||
action.taskQueueUtils = TASK_QUEUE_UTILS;
|
action.cloudTasksUtils = cloudTasksHelper.getTestCloudTasksUtils();
|
||||||
action.projectId = PROJECT_ID;
|
action.projectId = PROJECT_ID;
|
||||||
action.jobId = JOB_ID;
|
action.jobId = JOB_ID;
|
||||||
action.chainedQueueName = () -> CHAINED_QUEUE_NAME;
|
action.chainedQueueName = () -> CHAINED_QUEUE_NAME;
|
||||||
JdkLoggerConfig.getConfig(BigqueryPollJobAction.class).addHandler(logHandler);
|
JdkLoggerConfig.getConfig(BigqueryPollJobAction.class).addHandler(logHandler);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static TaskMatcher newPollJobTaskMatcher(String method) {
|
|
||||||
return new TaskMatcher()
|
|
||||||
.method(method)
|
|
||||||
.url(BigqueryPollJobAction.PATH)
|
|
||||||
.header(BigqueryPollJobAction.PROJECT_ID_HEADER, PROJECT_ID)
|
|
||||||
.header(BigqueryPollJobAction.JOB_ID_HEADER, JOB_ID);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
void testSuccess_enqueuePollTask() {
|
|
||||||
new BigqueryPollJobEnqueuer(TASK_QUEUE_UTILS).enqueuePollTask(
|
|
||||||
new JobReference().setProjectId(PROJECT_ID).setJobId(JOB_ID));
|
|
||||||
assertTasksEnqueued(BigqueryPollJobAction.QUEUE, newPollJobTaskMatcher("GET"));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
void testSuccess_enqueuePollTask_withChainedTask() throws Exception {
|
|
||||||
TaskOptions chainedTask = TaskOptions.Builder
|
|
||||||
.withUrl("/_dr/something")
|
|
||||||
.method(Method.POST)
|
|
||||||
.header("X-Testing", "foo")
|
|
||||||
.param("testing", "bar");
|
|
||||||
new BigqueryPollJobEnqueuer(TASK_QUEUE_UTILS).enqueuePollTask(
|
|
||||||
new JobReference().setProjectId(PROJECT_ID).setJobId(JOB_ID),
|
|
||||||
chainedTask,
|
|
||||||
getQueue(CHAINED_QUEUE_NAME));
|
|
||||||
assertTasksEnqueued(BigqueryPollJobAction.QUEUE, newPollJobTaskMatcher("POST"));
|
|
||||||
TaskStateInfo taskInfo = getOnlyElement(
|
|
||||||
TaskQueueHelper.getQueueInfo(BigqueryPollJobAction.QUEUE).getTaskInfo());
|
|
||||||
ByteArrayInputStream taskBodyBytes = new ByteArrayInputStream(taskInfo.getBodyAsBytes());
|
|
||||||
TaskOptions taskOptions = (TaskOptions) new ObjectInputStream(taskBodyBytes).readObject();
|
|
||||||
assertThat(taskOptions).isEqualTo(chainedTask);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void testSuccess_jobCompletedSuccessfully() throws Exception {
|
void testSuccess_jobCompletedSuccessfully() throws Exception {
|
||||||
when(bigqueryJobsGet.execute()).thenReturn(
|
when(bigqueryJobsGet.execute()).thenReturn(
|
||||||
|
@ -136,15 +92,20 @@ public class BigqueryPollJobActionTest {
|
||||||
when(bigqueryJobsGet.execute()).thenReturn(
|
when(bigqueryJobsGet.execute()).thenReturn(
|
||||||
new Job().setStatus(new JobStatus().setState("DONE")));
|
new Job().setStatus(new JobStatus().setState("DONE")));
|
||||||
|
|
||||||
TaskOptions chainedTask =
|
Task chainedTask =
|
||||||
TaskOptions.Builder.withUrl("/_dr/something")
|
Task.newBuilder()
|
||||||
.method(Method.POST)
|
.setName("my_task_name")
|
||||||
.header("X-Testing", "foo")
|
.setAppEngineHttpRequest(
|
||||||
.param("testing", "bar")
|
AppEngineHttpRequest.newBuilder()
|
||||||
.taskName("my_task_name");
|
.setHttpMethod(HttpMethod.POST)
|
||||||
|
.setRelativeUri("/_dr/something")
|
||||||
|
.putHeaders("X-Test", "foo")
|
||||||
|
.putHeaders(HttpHeaders.CONTENT_TYPE, MediaType.FORM_DATA.toString())
|
||||||
|
.setBody(ByteString.copyFromUtf8("testing=bar")))
|
||||||
|
.build();
|
||||||
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
|
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
|
||||||
new ObjectOutputStream(bytes).writeObject(chainedTask);
|
new ObjectOutputStream(bytes).writeObject(chainedTask);
|
||||||
action.payload = bytes.toByteArray();
|
action.payload = ByteString.copyFrom(bytes.toByteArray());
|
||||||
|
|
||||||
action.run();
|
action.run();
|
||||||
assertLogMessage(
|
assertLogMessage(
|
||||||
|
@ -153,14 +114,15 @@ public class BigqueryPollJobActionTest {
|
||||||
logHandler,
|
logHandler,
|
||||||
INFO,
|
INFO,
|
||||||
"Added chained task my_task_name for /_dr/something to queue " + CHAINED_QUEUE_NAME);
|
"Added chained task my_task_name for /_dr/something to queue " + CHAINED_QUEUE_NAME);
|
||||||
assertTasksEnqueued(
|
cloudTasksHelper.assertTasksEnqueued(
|
||||||
CHAINED_QUEUE_NAME,
|
CHAINED_QUEUE_NAME,
|
||||||
new TaskMatcher()
|
new TaskMatcher()
|
||||||
.url("/_dr/something")
|
.url("/_dr/something")
|
||||||
.method("POST")
|
.header("X-Test", "foo")
|
||||||
.header("X-Testing", "foo")
|
.header(HttpHeaders.CONTENT_TYPE, MediaType.FORM_DATA.toString())
|
||||||
.param("testing", "bar")
|
.param("testing", "bar")
|
||||||
.taskName("my_task_name"));
|
.taskName("my_task_name")
|
||||||
|
.method(HttpMethod.POST));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -172,7 +134,7 @@ public class BigqueryPollJobActionTest {
|
||||||
action.run();
|
action.run();
|
||||||
assertLogMessage(
|
assertLogMessage(
|
||||||
logHandler, SEVERE, String.format("Bigquery job failed - %s:%s", PROJECT_ID, JOB_ID));
|
logHandler, SEVERE, String.format("Bigquery job failed - %s:%s", PROJECT_ID, JOB_ID));
|
||||||
assertNoTasksEnqueued(CHAINED_QUEUE_NAME);
|
cloudTasksHelper.assertNoTasksEnqueued(CHAINED_QUEUE_NAME);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -192,7 +154,7 @@ public class BigqueryPollJobActionTest {
|
||||||
void testFailure_badChainedTaskPayload() throws Exception {
|
void testFailure_badChainedTaskPayload() throws Exception {
|
||||||
when(bigqueryJobsGet.execute()).thenReturn(
|
when(bigqueryJobsGet.execute()).thenReturn(
|
||||||
new Job().setStatus(new JobStatus().setState("DONE")));
|
new Job().setStatus(new JobStatus().setState("DONE")));
|
||||||
action.payload = "payload".getBytes(UTF_8);
|
action.payload = ByteString.copyFrom("payload".getBytes(UTF_8));
|
||||||
BadRequestException thrown = assertThrows(BadRequestException.class, action::run);
|
BadRequestException thrown = assertThrows(BadRequestException.class, action::run);
|
||||||
assertThat(thrown).hasMessageThat().contains("Cannot deserialize task from payload");
|
assertThat(thrown).hasMessageThat().contains("Cannot deserialize task from payload");
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,10 +15,6 @@
|
||||||
package google.registry.export;
|
package google.registry.export;
|
||||||
|
|
||||||
import static com.google.common.truth.Truth.assertThat;
|
import static com.google.common.truth.Truth.assertThat;
|
||||||
import static google.registry.export.CheckBackupAction.CHECK_BACKUP_KINDS_TO_LOAD_PARAM;
|
|
||||||
import static google.registry.export.CheckBackupAction.CHECK_BACKUP_NAME_PARAM;
|
|
||||||
import static google.registry.testing.TaskQueueHelper.assertNoTasksEnqueued;
|
|
||||||
import static google.registry.testing.TaskQueueHelper.assertTasksEnqueued;
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
import static org.mockito.ArgumentMatchers.anyString;
|
import static org.mockito.ArgumentMatchers.anyString;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
@ -27,7 +23,7 @@ import com.google.api.client.googleapis.json.GoogleJsonResponseException;
|
||||||
import com.google.api.client.http.HttpHeaders;
|
import com.google.api.client.http.HttpHeaders;
|
||||||
import com.google.api.client.json.JsonFactory;
|
import com.google.api.client.json.JsonFactory;
|
||||||
import com.google.api.client.json.jackson2.JacksonFactory;
|
import com.google.api.client.json.jackson2.JacksonFactory;
|
||||||
import com.google.common.collect.ImmutableSet;
|
import com.google.cloud.tasks.v2.HttpMethod;
|
||||||
import google.registry.export.datastore.DatastoreAdmin;
|
import google.registry.export.datastore.DatastoreAdmin;
|
||||||
import google.registry.export.datastore.DatastoreAdmin.Get;
|
import google.registry.export.datastore.DatastoreAdmin.Get;
|
||||||
import google.registry.export.datastore.Operation;
|
import google.registry.export.datastore.Operation;
|
||||||
|
@ -36,9 +32,10 @@ import google.registry.request.HttpException.BadRequestException;
|
||||||
import google.registry.request.HttpException.NoContentException;
|
import google.registry.request.HttpException.NoContentException;
|
||||||
import google.registry.request.HttpException.NotModifiedException;
|
import google.registry.request.HttpException.NotModifiedException;
|
||||||
import google.registry.testing.AppEngineExtension;
|
import google.registry.testing.AppEngineExtension;
|
||||||
|
import google.registry.testing.CloudTasksHelper;
|
||||||
|
import google.registry.testing.CloudTasksHelper.TaskMatcher;
|
||||||
import google.registry.testing.FakeClock;
|
import google.registry.testing.FakeClock;
|
||||||
import google.registry.testing.FakeResponse;
|
import google.registry.testing.FakeResponse;
|
||||||
import google.registry.testing.TaskQueueHelper.TaskMatcher;
|
|
||||||
import google.registry.testing.TestDataHelper;
|
import google.registry.testing.TestDataHelper;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
import org.joda.time.Duration;
|
import org.joda.time.Duration;
|
||||||
|
@ -57,7 +54,6 @@ public class CheckBackupActionTest {
|
||||||
|
|
||||||
private static final DateTime START_TIME = DateTime.parse("2014-08-01T01:02:03Z");
|
private static final DateTime START_TIME = DateTime.parse("2014-08-01T01:02:03Z");
|
||||||
private static final DateTime COMPLETE_TIME = START_TIME.plus(Duration.standardMinutes(30));
|
private static final DateTime COMPLETE_TIME = START_TIME.plus(Duration.standardMinutes(30));
|
||||||
|
|
||||||
private static final JsonFactory JSON_FACTORY = JacksonFactory.getDefaultInstance();
|
private static final JsonFactory JSON_FACTORY = JacksonFactory.getDefaultInstance();
|
||||||
|
|
||||||
@RegisterExtension
|
@RegisterExtension
|
||||||
|
@ -71,6 +67,7 @@ public class CheckBackupActionTest {
|
||||||
private final FakeResponse response = new FakeResponse();
|
private final FakeResponse response = new FakeResponse();
|
||||||
private final FakeClock clock = new FakeClock(COMPLETE_TIME.plusMillis(1000));
|
private final FakeClock clock = new FakeClock(COMPLETE_TIME.plusMillis(1000));
|
||||||
private final CheckBackupAction action = new CheckBackupAction();
|
private final CheckBackupAction action = new CheckBackupAction();
|
||||||
|
private final CloudTasksHelper cloudTasksHelper = new CloudTasksHelper();
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
void beforeEach() throws Exception {
|
void beforeEach() throws Exception {
|
||||||
|
@ -80,6 +77,7 @@ public class CheckBackupActionTest {
|
||||||
action.backupName = "some_backup";
|
action.backupName = "some_backup";
|
||||||
action.kindsToLoadParam = "one,two";
|
action.kindsToLoadParam = "one,two";
|
||||||
action.response = response;
|
action.response = response;
|
||||||
|
action.cloudTasksUtils = cloudTasksHelper.getTestCloudTasksUtils();
|
||||||
|
|
||||||
when(datastoreAdmin.get(anyString())).thenReturn(getBackupProgressRequest);
|
when(datastoreAdmin.get(anyString())).thenReturn(getBackupProgressRequest);
|
||||||
when(getBackupProgressRequest.execute()).thenAnswer(arg -> backupOperation);
|
when(getBackupProgressRequest.execute()).thenAnswer(arg -> backupOperation);
|
||||||
|
@ -110,30 +108,17 @@ public class CheckBackupActionTest {
|
||||||
null));
|
null));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void assertLoadTaskEnqueued(String id, String folder, String kinds) {
|
private void assertLoadTaskEnqueued(String id, String folder, String kinds) {
|
||||||
assertTasksEnqueued(
|
cloudTasksHelper.assertTasksEnqueued(
|
||||||
"export-snapshot",
|
"export-snapshot",
|
||||||
new TaskMatcher()
|
new TaskMatcher()
|
||||||
.url("/_dr/task/uploadDatastoreBackup")
|
.url("/_dr/task/uploadDatastoreBackup")
|
||||||
.method("POST")
|
.method(HttpMethod.POST)
|
||||||
.param("id", id)
|
.param("id", id)
|
||||||
.param("folder", folder)
|
.param("folder", folder)
|
||||||
.param("kinds", kinds));
|
.param("kinds", kinds));
|
||||||
}
|
}
|
||||||
|
|
||||||
@MockitoSettings(strictness = Strictness.LENIENT)
|
|
||||||
@Test
|
|
||||||
void testSuccess_enqueuePollTask() {
|
|
||||||
CheckBackupAction.enqueuePollTask("some_backup_name", ImmutableSet.of("one", "two", "three"));
|
|
||||||
assertTasksEnqueued(
|
|
||||||
CheckBackupAction.QUEUE,
|
|
||||||
new TaskMatcher()
|
|
||||||
.url(CheckBackupAction.PATH)
|
|
||||||
.param(CHECK_BACKUP_NAME_PARAM, "some_backup_name")
|
|
||||||
.param(CHECK_BACKUP_KINDS_TO_LOAD_PARAM, "one,two,three")
|
|
||||||
.method("POST"));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void testPost_forPendingBackup_returnsNotModified() throws Exception {
|
void testPost_forPendingBackup_returnsNotModified() throws Exception {
|
||||||
setPendingBackup();
|
setPendingBackup();
|
||||||
|
@ -189,7 +174,7 @@ public class CheckBackupActionTest {
|
||||||
action.kindsToLoadParam = "";
|
action.kindsToLoadParam = "";
|
||||||
|
|
||||||
action.run();
|
action.run();
|
||||||
assertNoTasksEnqueued("export-snapshot");
|
cloudTasksHelper.assertNoTasksEnqueued("export-snapshot");
|
||||||
}
|
}
|
||||||
|
|
||||||
@MockitoSettings(strictness = Strictness.LENIENT)
|
@MockitoSettings(strictness = Strictness.LENIENT)
|
||||||
|
|
|
@ -14,15 +14,7 @@
|
||||||
|
|
||||||
package google.registry.export;
|
package google.registry.export;
|
||||||
|
|
||||||
import static com.google.appengine.api.taskqueue.QueueFactory.getQueue;
|
|
||||||
import static com.google.common.truth.Truth.assertThat;
|
import static com.google.common.truth.Truth.assertThat;
|
||||||
import static google.registry.export.UpdateSnapshotViewAction.QUEUE;
|
|
||||||
import static google.registry.export.UpdateSnapshotViewAction.UPDATE_SNAPSHOT_DATASET_ID_PARAM;
|
|
||||||
import static google.registry.export.UpdateSnapshotViewAction.UPDATE_SNAPSHOT_KIND_PARAM;
|
|
||||||
import static google.registry.export.UpdateSnapshotViewAction.UPDATE_SNAPSHOT_TABLE_ID_PARAM;
|
|
||||||
import static google.registry.export.UpdateSnapshotViewAction.UPDATE_SNAPSHOT_VIEWNAME_PARAM;
|
|
||||||
import static google.registry.export.UpdateSnapshotViewAction.createViewUpdateTask;
|
|
||||||
import static google.registry.testing.TaskQueueHelper.assertTasksEnqueued;
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
import static org.mockito.ArgumentMatchers.anyString;
|
import static org.mockito.ArgumentMatchers.anyString;
|
||||||
|
@ -38,7 +30,6 @@ import com.google.common.collect.Iterables;
|
||||||
import google.registry.bigquery.CheckedBigquery;
|
import google.registry.bigquery.CheckedBigquery;
|
||||||
import google.registry.request.HttpException.InternalServerErrorException;
|
import google.registry.request.HttpException.InternalServerErrorException;
|
||||||
import google.registry.testing.AppEngineExtension;
|
import google.registry.testing.AppEngineExtension;
|
||||||
import google.registry.testing.TaskQueueHelper.TaskMatcher;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
@ -81,23 +72,6 @@ public class UpdateSnapshotViewActionTest {
|
||||||
action.tableId = "12345_fookind";
|
action.tableId = "12345_fookind";
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
void testSuccess_createViewUpdateTask() {
|
|
||||||
getQueue(QUEUE)
|
|
||||||
.add(
|
|
||||||
createViewUpdateTask(
|
|
||||||
"some_dataset", "12345_fookind", "fookind", "latest_datastore_export"));
|
|
||||||
assertTasksEnqueued(
|
|
||||||
QUEUE,
|
|
||||||
new TaskMatcher()
|
|
||||||
.url(UpdateSnapshotViewAction.PATH)
|
|
||||||
.method("POST")
|
|
||||||
.param(UPDATE_SNAPSHOT_DATASET_ID_PARAM, "some_dataset")
|
|
||||||
.param(UPDATE_SNAPSHOT_TABLE_ID_PARAM, "12345_fookind")
|
|
||||||
.param(UPDATE_SNAPSHOT_KIND_PARAM, "fookind")
|
|
||||||
.param(UPDATE_SNAPSHOT_VIEWNAME_PARAM, "latest_datastore_export"));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void testSuccess_doPost() throws Exception {
|
void testSuccess_doPost() throws Exception {
|
||||||
action.run();
|
action.run();
|
||||||
|
|
|
@ -16,16 +16,16 @@ package google.registry.export;
|
||||||
|
|
||||||
import static com.google.common.collect.Iterables.transform;
|
import static com.google.common.collect.Iterables.transform;
|
||||||
import static com.google.common.truth.Truth.assertThat;
|
import static com.google.common.truth.Truth.assertThat;
|
||||||
|
import static com.google.common.truth.Truth8.assertThat;
|
||||||
|
import static google.registry.export.BigqueryPollJobAction.CHAINED_TASK_QUEUE_HEADER;
|
||||||
|
import static google.registry.export.BigqueryPollJobAction.JOB_ID_HEADER;
|
||||||
|
import static google.registry.export.BigqueryPollJobAction.PROJECT_ID_HEADER;
|
||||||
|
import static google.registry.export.UpdateSnapshotViewAction.UPDATE_SNAPSHOT_DATASET_ID_PARAM;
|
||||||
|
import static google.registry.export.UpdateSnapshotViewAction.UPDATE_SNAPSHOT_KIND_PARAM;
|
||||||
|
import static google.registry.export.UpdateSnapshotViewAction.UPDATE_SNAPSHOT_TABLE_ID_PARAM;
|
||||||
|
import static google.registry.export.UpdateSnapshotViewAction.UPDATE_SNAPSHOT_VIEWNAME_PARAM;
|
||||||
import static google.registry.export.UploadDatastoreBackupAction.BACKUP_DATASET;
|
import static google.registry.export.UploadDatastoreBackupAction.BACKUP_DATASET;
|
||||||
import static google.registry.export.UploadDatastoreBackupAction.LATEST_BACKUP_VIEW_NAME;
|
|
||||||
import static google.registry.export.UploadDatastoreBackupAction.PATH;
|
|
||||||
import static google.registry.export.UploadDatastoreBackupAction.QUEUE;
|
|
||||||
import static google.registry.export.UploadDatastoreBackupAction.UPLOAD_BACKUP_FOLDER_PARAM;
|
|
||||||
import static google.registry.export.UploadDatastoreBackupAction.UPLOAD_BACKUP_ID_PARAM;
|
|
||||||
import static google.registry.export.UploadDatastoreBackupAction.UPLOAD_BACKUP_KINDS_PARAM;
|
|
||||||
import static google.registry.export.UploadDatastoreBackupAction.enqueueUploadBackupTask;
|
|
||||||
import static google.registry.export.UploadDatastoreBackupAction.getBackupInfoFileForKind;
|
import static google.registry.export.UploadDatastoreBackupAction.getBackupInfoFileForKind;
|
||||||
import static google.registry.testing.TaskQueueHelper.assertTasksEnqueued;
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
import static org.mockito.ArgumentMatchers.eq;
|
import static org.mockito.ArgumentMatchers.eq;
|
||||||
|
@ -38,16 +38,20 @@ import com.google.api.services.bigquery.Bigquery;
|
||||||
import com.google.api.services.bigquery.model.Dataset;
|
import com.google.api.services.bigquery.model.Dataset;
|
||||||
import com.google.api.services.bigquery.model.Job;
|
import com.google.api.services.bigquery.model.Job;
|
||||||
import com.google.api.services.bigquery.model.JobConfigurationLoad;
|
import com.google.api.services.bigquery.model.JobConfigurationLoad;
|
||||||
import com.google.api.services.bigquery.model.JobReference;
|
import com.google.cloud.tasks.v2.HttpMethod;
|
||||||
import com.google.appengine.api.taskqueue.QueueFactory;
|
import com.google.common.collect.ImmutableMultimap;
|
||||||
import com.google.common.collect.ImmutableSet;
|
|
||||||
import com.google.common.collect.Iterables;
|
import com.google.common.collect.Iterables;
|
||||||
|
import com.google.protobuf.util.Timestamps;
|
||||||
import google.registry.bigquery.CheckedBigquery;
|
import google.registry.bigquery.CheckedBigquery;
|
||||||
import google.registry.export.BigqueryPollJobAction.BigqueryPollJobEnqueuer;
|
|
||||||
import google.registry.request.HttpException.InternalServerErrorException;
|
import google.registry.request.HttpException.InternalServerErrorException;
|
||||||
import google.registry.testing.AppEngineExtension;
|
import google.registry.testing.AppEngineExtension;
|
||||||
import google.registry.testing.TaskQueueHelper.TaskMatcher;
|
import google.registry.testing.CloudTasksHelper;
|
||||||
|
import google.registry.testing.CloudTasksHelper.TaskMatcher;
|
||||||
|
import google.registry.testing.FakeClock;
|
||||||
|
import google.registry.util.CloudTasksUtils;
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.ObjectInputStream;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
@ -67,8 +71,9 @@ public class UploadDatastoreBackupActionTest {
|
||||||
private final Bigquery.Datasets bigqueryDatasets = mock(Bigquery.Datasets.class);
|
private final Bigquery.Datasets bigqueryDatasets = mock(Bigquery.Datasets.class);
|
||||||
private final Bigquery.Datasets.Insert bigqueryDatasetsInsert =
|
private final Bigquery.Datasets.Insert bigqueryDatasetsInsert =
|
||||||
mock(Bigquery.Datasets.Insert.class);
|
mock(Bigquery.Datasets.Insert.class);
|
||||||
private final BigqueryPollJobEnqueuer bigqueryPollEnqueuer = mock(BigqueryPollJobEnqueuer.class);
|
|
||||||
private UploadDatastoreBackupAction action;
|
private UploadDatastoreBackupAction action;
|
||||||
|
private CloudTasksHelper cloudTasksHelper = new CloudTasksHelper();
|
||||||
|
private CloudTasksUtils cloudTasksUtils = cloudTasksHelper.getTestCloudTasksUtils();
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
void beforeEach() throws Exception {
|
void beforeEach() throws Exception {
|
||||||
|
@ -80,25 +85,14 @@ public class UploadDatastoreBackupActionTest {
|
||||||
.thenReturn(bigqueryDatasetsInsert);
|
.thenReturn(bigqueryDatasetsInsert);
|
||||||
action = new UploadDatastoreBackupAction();
|
action = new UploadDatastoreBackupAction();
|
||||||
action.checkedBigquery = checkedBigquery;
|
action.checkedBigquery = checkedBigquery;
|
||||||
action.bigqueryPollEnqueuer = bigqueryPollEnqueuer;
|
|
||||||
action.projectId = "Project-Id";
|
action.projectId = "Project-Id";
|
||||||
action.backupFolderUrl = "gs://bucket/path";
|
action.backupFolderUrl = "gs://bucket/path";
|
||||||
action.backupId = "2018-12-05T17:46:39_92612";
|
action.backupId = "2018-12-05T17:46:39_92612";
|
||||||
action.backupKinds = "one,two,three";
|
action.backupKinds = "one,two,three";
|
||||||
|
action.cloudTasksUtils = cloudTasksHelper.getTestCloudTasksUtils();
|
||||||
|
action.clock = new FakeClock();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
void testSuccess_enqueueLoadTask() {
|
|
||||||
enqueueUploadBackupTask("id12345", "gs://bucket/path", ImmutableSet.of("one", "two", "three"));
|
|
||||||
assertTasksEnqueued(
|
|
||||||
QUEUE,
|
|
||||||
new TaskMatcher()
|
|
||||||
.url(PATH)
|
|
||||||
.method("POST")
|
|
||||||
.param(UPLOAD_BACKUP_ID_PARAM, "id12345")
|
|
||||||
.param(UPLOAD_BACKUP_FOLDER_PARAM, "gs://bucket/path")
|
|
||||||
.param(UPLOAD_BACKUP_KINDS_PARAM, "one,two,three"));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void testSuccess_doPost() throws Exception {
|
void testSuccess_doPost() throws Exception {
|
||||||
|
@ -153,33 +147,91 @@ public class UploadDatastoreBackupActionTest {
|
||||||
verify(bigqueryJobsInsert, times(3)).execute();
|
verify(bigqueryJobsInsert, times(3)).execute();
|
||||||
|
|
||||||
// Check that the poll tasks for each load job were enqueued.
|
// Check that the poll tasks for each load job were enqueued.
|
||||||
verify(bigqueryPollEnqueuer)
|
cloudTasksHelper.assertTasksEnqueued(
|
||||||
.enqueuePollTask(
|
BigqueryPollJobAction.QUEUE,
|
||||||
new JobReference()
|
new TaskMatcher()
|
||||||
.setProjectId("Project-Id")
|
.method(HttpMethod.POST)
|
||||||
.setJobId("load-backup-2018_12_05T17_46_39_92612-one"),
|
.header(PROJECT_ID_HEADER, "Project-Id")
|
||||||
UpdateSnapshotViewAction.createViewUpdateTask(
|
.header(JOB_ID_HEADER, "load-backup-2018_12_05T17_46_39_92612-one")
|
||||||
BACKUP_DATASET, "2018_12_05T17_46_39_92612_one", "one", LATEST_BACKUP_VIEW_NAME),
|
.header(CHAINED_TASK_QUEUE_HEADER, UpdateSnapshotViewAction.QUEUE)
|
||||||
QueueFactory.getQueue(UpdateSnapshotViewAction.QUEUE));
|
.scheduleTime(
|
||||||
verify(bigqueryPollEnqueuer)
|
Timestamps.fromMillis(
|
||||||
.enqueuePollTask(
|
action.clock.nowUtc().plus(BigqueryPollJobAction.POLL_COUNTDOWN).getMillis())),
|
||||||
new JobReference()
|
new TaskMatcher()
|
||||||
.setProjectId("Project-Id")
|
.method(HttpMethod.POST)
|
||||||
.setJobId("load-backup-2018_12_05T17_46_39_92612-two"),
|
.header(PROJECT_ID_HEADER, "Project-Id")
|
||||||
UpdateSnapshotViewAction.createViewUpdateTask(
|
.header(JOB_ID_HEADER, "load-backup-2018_12_05T17_46_39_92612-two")
|
||||||
BACKUP_DATASET, "2018_12_05T17_46_39_92612_two", "two", LATEST_BACKUP_VIEW_NAME),
|
.header(CHAINED_TASK_QUEUE_HEADER, UpdateSnapshotViewAction.QUEUE)
|
||||||
QueueFactory.getQueue(UpdateSnapshotViewAction.QUEUE));
|
.scheduleTime(
|
||||||
verify(bigqueryPollEnqueuer)
|
Timestamps.fromMillis(
|
||||||
.enqueuePollTask(
|
action.clock.nowUtc().plus(BigqueryPollJobAction.POLL_COUNTDOWN).getMillis())),
|
||||||
new JobReference()
|
new TaskMatcher()
|
||||||
.setProjectId("Project-Id")
|
.method(HttpMethod.POST)
|
||||||
.setJobId("load-backup-2018_12_05T17_46_39_92612-three"),
|
.header(PROJECT_ID_HEADER, "Project-Id")
|
||||||
UpdateSnapshotViewAction.createViewUpdateTask(
|
.header(JOB_ID_HEADER, "load-backup-2018_12_05T17_46_39_92612-three")
|
||||||
BACKUP_DATASET,
|
.header(CHAINED_TASK_QUEUE_HEADER, UpdateSnapshotViewAction.QUEUE)
|
||||||
|
.scheduleTime(
|
||||||
|
Timestamps.fromMillis(
|
||||||
|
action.clock.nowUtc().plus(BigqueryPollJobAction.POLL_COUNTDOWN).getMillis())));
|
||||||
|
|
||||||
|
// assert the chained task of each enqueud task is correct
|
||||||
|
assertThat(
|
||||||
|
cloudTasksHelper.getTestTasksFor(BigqueryPollJobAction.QUEUE).stream()
|
||||||
|
.map(
|
||||||
|
testTask -> {
|
||||||
|
try {
|
||||||
|
return new ObjectInputStream(
|
||||||
|
new ByteArrayInputStream(
|
||||||
|
testTask.getAppEngineHttpRequest().getBody().toByteArray()))
|
||||||
|
.readObject();
|
||||||
|
} catch (ClassNotFoundException | IOException e) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
.containsExactly(
|
||||||
|
cloudTasksHelper
|
||||||
|
.getTestCloudTasksUtils()
|
||||||
|
.createPostTask(
|
||||||
|
UpdateSnapshotViewAction.PATH,
|
||||||
|
"BACKEND",
|
||||||
|
ImmutableMultimap.of(
|
||||||
|
UPDATE_SNAPSHOT_DATASET_ID_PARAM,
|
||||||
|
"datastore_backups",
|
||||||
|
UPDATE_SNAPSHOT_TABLE_ID_PARAM,
|
||||||
|
"2018_12_05T17_46_39_92612_one",
|
||||||
|
UPDATE_SNAPSHOT_KIND_PARAM,
|
||||||
|
"one",
|
||||||
|
UPDATE_SNAPSHOT_VIEWNAME_PARAM,
|
||||||
|
"latest_datastore_export")),
|
||||||
|
cloudTasksHelper
|
||||||
|
.getTestCloudTasksUtils()
|
||||||
|
.createPostTask(
|
||||||
|
UpdateSnapshotViewAction.PATH,
|
||||||
|
"BACKEND",
|
||||||
|
ImmutableMultimap.of(
|
||||||
|
UPDATE_SNAPSHOT_DATASET_ID_PARAM,
|
||||||
|
"datastore_backups",
|
||||||
|
UPDATE_SNAPSHOT_TABLE_ID_PARAM,
|
||||||
|
"2018_12_05T17_46_39_92612_two",
|
||||||
|
UPDATE_SNAPSHOT_KIND_PARAM,
|
||||||
|
"two",
|
||||||
|
UPDATE_SNAPSHOT_VIEWNAME_PARAM,
|
||||||
|
"latest_datastore_export")),
|
||||||
|
cloudTasksHelper
|
||||||
|
.getTestCloudTasksUtils()
|
||||||
|
.createPostTask(
|
||||||
|
UpdateSnapshotViewAction.PATH,
|
||||||
|
"BACKEND",
|
||||||
|
ImmutableMultimap.of(
|
||||||
|
UPDATE_SNAPSHOT_DATASET_ID_PARAM,
|
||||||
|
"datastore_backups",
|
||||||
|
UPDATE_SNAPSHOT_TABLE_ID_PARAM,
|
||||||
"2018_12_05T17_46_39_92612_three",
|
"2018_12_05T17_46_39_92612_three",
|
||||||
|
UPDATE_SNAPSHOT_KIND_PARAM,
|
||||||
"three",
|
"three",
|
||||||
LATEST_BACKUP_VIEW_NAME),
|
UPDATE_SNAPSHOT_VIEWNAME_PARAM,
|
||||||
QueueFactory.getQueue(UpdateSnapshotViewAction.QUEUE));
|
"latest_datastore_export")))
|
||||||
|
.inOrder();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue