Remove deprecated Datastore backup code

Removed three Action classes and the CheckSnapshot command.

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=230545631
This commit is contained in:
weiminyu 2019-01-23 09:39:26 -08:00 committed by jianglai
parent 701ebc6a28
commit acbd23fa64
20 changed files with 14 additions and 1527 deletions

View file

@ -28,9 +28,7 @@ import google.registry.request.auth.Auth;
import javax.inject.Inject;
/**
* Action to trigger a Datastore backup job that writes a snapshot to Google Cloud Storage. This
* class is introduced as an experimental feature, and will eventually replace {@link
* ExportSnapshotAction}.
* Action to trigger a Datastore backup job that writes a snapshot to Google Cloud Storage.
*
* <p>This is the first step of a four step workflow for exporting snapshots, with each step calling
* the next upon successful completion:

View file

@ -1,163 +0,0 @@
// Copyright 2017 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.export;
import static com.google.common.collect.Sets.intersection;
import static google.registry.export.LoadSnapshotAction.enqueueLoadSnapshotTask;
import static google.registry.request.Action.Method.GET;
import static google.registry.request.Action.Method.POST;
import com.google.appengine.api.taskqueue.QueueFactory;
import com.google.appengine.api.taskqueue.TaskHandle;
import com.google.appengine.api.taskqueue.TaskOptions;
import com.google.appengine.api.taskqueue.TaskOptions.Method;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.flogger.FluentLogger;
import google.registry.export.DatastoreBackupInfo.BackupStatus;
import google.registry.request.Action;
import google.registry.request.HttpException.BadRequestException;
import google.registry.request.HttpException.NoContentException;
import google.registry.request.HttpException.NotModifiedException;
import google.registry.request.Parameter;
import google.registry.request.RequestMethod;
import google.registry.request.Response;
import google.registry.request.auth.Auth;
import java.util.Set;
import javax.inject.Inject;
import org.joda.time.Duration;
import org.joda.time.PeriodType;
import org.joda.time.format.PeriodFormat;
/**
* Action that checks the status of a snapshot, and if complete, trigger loading it into BigQuery.
*/
@Action(
service = Action.Service.BACKEND,
path = CheckSnapshotAction.PATH,
method = {POST, GET},
automaticallyPrintOk = true,
auth = Auth.AUTH_INTERNAL_ONLY)
public class CheckSnapshotAction implements Runnable {
/** Parameter names for passing parameters into this action. */
static final String CHECK_SNAPSHOT_NAME_PARAM = "name";
static final String CHECK_SNAPSHOT_KINDS_TO_LOAD_PARAM = "kindsToLoad";
/** Action-specific details needed for enqueuing tasks against itself. */
static final String QUEUE = "export-snapshot-poll"; // See queue.xml.
static final String PATH = "/_dr/task/checkSnapshot"; // See web.xml.
static final Duration POLL_COUNTDOWN = Duration.standardMinutes(2);
/** The maximum amount of time we allow a backup to run before abandoning it. */
static final Duration MAXIMUM_BACKUP_RUNNING_TIME = Duration.standardHours(20);
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
@Inject Response response;
@Inject @RequestMethod Action.Method requestMethod;
@Inject DatastoreBackupService backupService;
@Inject @Parameter(CHECK_SNAPSHOT_NAME_PARAM) String snapshotName;
@Inject @Parameter(CHECK_SNAPSHOT_KINDS_TO_LOAD_PARAM) String kindsToLoadParam;
@Inject CheckSnapshotAction() {}
@Override
public void run() {
if (requestMethod == POST) {
checkAndLoadSnapshotIfComplete();
} else {
// This is a GET request.
response.setPayload(getBackup().getInformation());
}
}
private DatastoreBackupInfo getBackup() {
try {
return backupService.findByName(snapshotName);
} catch (IllegalArgumentException e) {
String message = String.format("Bad backup name %s: %s", snapshotName, e.getMessage());
// TODO(b/19081569): Ideally this would return a 2XX error so the task would not be
// retried but we might abandon backups that start late and haven't yet written to
// Datastore. We could fix that by replacing this with a two-phase polling strategy.
throw new BadRequestException(message, e);
}
}
private void checkAndLoadSnapshotIfComplete() {
Set<String> kindsToLoad = ImmutableSet.copyOf(Splitter.on(',').split(kindsToLoadParam));
DatastoreBackupInfo backup = getBackup();
// Stop now if the backup is not complete.
if (!backup.getStatus().equals(BackupStatus.COMPLETE)) {
Duration runningTime = backup.getRunningTime();
if (runningTime.isShorterThan(MAXIMUM_BACKUP_RUNNING_TIME)) {
// Backup might still be running, so send a 304 to have the task retry.
throw new NotModifiedException(
String.format("Datastore backup %s still pending", snapshotName));
} else {
// Declare the backup a lost cause, and send 204 No Content so the task will
// not be retried.
String message =
String.format(
"Datastore backup %s abandoned - not complete after %s",
snapshotName,
PeriodFormat.getDefault()
.print(
runningTime
.toPeriod()
.normalizedStandard(PeriodType.dayTime().withMillisRemoved())));
throw new NoContentException(message);
}
}
// Get a compact string to identify this snapshot in BigQuery by trying to parse the unique
// suffix out of the snapshot name and falling back to the start time as a string.
String snapshotId =
snapshotName.startsWith(ExportSnapshotAction.SNAPSHOT_PREFIX)
? snapshotName.substring(ExportSnapshotAction.SNAPSHOT_PREFIX.length())
: backup.getStartTime().toString("YYYYMMdd_HHmmss");
// Log a warning if kindsToLoad is not a subset of the exported snapshot kinds.
if (!backup.getKinds().containsAll(kindsToLoad)) {
logger.atWarning().log(
"Kinds to load included non-exported kinds: %s",
Sets.difference(kindsToLoad, backup.getKinds()));
}
// Load kinds from the snapshot, limited to those also in kindsToLoad (if it's present).
ImmutableSet<String> exportedKindsToLoad =
ImmutableSet.copyOf(intersection(backup.getKinds(), kindsToLoad));
String message = String.format("Datastore backup %s complete - ", snapshotName);
if (exportedKindsToLoad.isEmpty()) {
message += "no kinds to load into BigQuery";
} else {
enqueueLoadSnapshotTask(snapshotId, backup.getGcsFilename().get(), exportedKindsToLoad);
message += "BigQuery load task enqueued";
}
logger.atInfo().log(message);
response.setPayload(message);
}
/** Enqueue a poll task to monitor the named snapshot for completion. */
static TaskHandle enqueuePollTask(String snapshotName, ImmutableSet<String> kindsToLoad) {
return QueueFactory.getQueue(QUEUE)
.add(
TaskOptions.Builder.withUrl(PATH)
.method(Method.POST)
.countdownMillis(POLL_COUNTDOWN.getMillis())
.param(CHECK_SNAPSHOT_NAME_PARAM, snapshotName)
.param(CHECK_SNAPSHOT_KINDS_TO_LOAD_PARAM, Joiner.on(',').join(kindsToLoad)));
}
}

View file

@ -1,149 +0,0 @@
// Copyright 2017 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.export;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.joda.time.DateTimeZone.UTC;
import com.google.appengine.api.datastore.Entity;
import com.google.appengine.api.datastore.Text;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Ascii;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableSet;
import google.registry.util.Clock;
import google.registry.util.NonFinalForTesting;
import google.registry.util.SystemClock;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import org.joda.time.DateTime;
import org.joda.time.Duration;
/** Container for information about a Datastore backup. */
public class DatastoreBackupInfo {
@NonFinalForTesting
private static Clock clock = new SystemClock();
/** The possible status values for a Datastore backup. */
public enum BackupStatus { PENDING, COMPLETE }
/** The name of the Datastore backup. */
private final String backupName;
/** The entity kinds included in this Datastore backup. */
private final ImmutableSet<String> kinds;
/** The start time of the Datastore backup. */
private final DateTime startTime;
/** The completion time of the Datastore backup, present if it has completed. */
private final Optional<DateTime> completeTime;
/**
* The GCS filename to which the backup's top-level .backup_info manifest file has been written,
* present if the backup has completed.
*/
private final Optional<String> gcsFilename;
/** DatastoreBackupInfo instances should only be obtained via DatastoreBackupService. */
DatastoreBackupInfo(Entity backupEntity) {
backupName = (String) checkNotNull(backupEntity.getProperty("name"), "name");
@SuppressWarnings("unchecked")
List<String> rawKinds = (List<String>) checkNotNull(backupEntity.getProperty("kinds"), "kinds");
Date rawStartTime = (Date) checkNotNull(backupEntity.getProperty("start_time"), "start_time");
Date rawCompleteTime = (Date) backupEntity.getProperty("complete_time");
Text rawGcsFilename = (Text) backupEntity.getProperty("gs_handle");
kinds = ImmutableSet.copyOf(rawKinds);
startTime = new DateTime(rawStartTime).withZone(UTC);
completeTime = Optional.ofNullable(
rawCompleteTime == null ? null : new DateTime(rawCompleteTime).withZone(UTC));
gcsFilename = Optional.ofNullable(
rawGcsFilename == null ? null : gcsPathToUri(rawGcsFilename.getValue()));
}
/** This constructor is only exposed for test purposes. */
@VisibleForTesting
DatastoreBackupInfo(
String backupName,
DateTime startTime,
Optional<DateTime> completeTime,
ImmutableSet<String> kinds,
Optional<String> gcsFilename) {
this.backupName = backupName;
this.startTime = startTime;
this.completeTime = completeTime;
this.kinds = kinds;
this.gcsFilename = gcsFilename;
}
/**
* Rewrite a GCS path as stored by Datastore Admin (with a "/gs/" prefix) to the more standard
* URI format that uses a "gs://" scheme prefix.
*/
private static String gcsPathToUri(String backupGcsPath) {
checkArgument(backupGcsPath.startsWith("/gs/"), "GCS path not in expected format");
return backupGcsPath.replaceFirst("/gs/", "gs://");
}
public String getName() {
return backupName;
}
public ImmutableSet<String> getKinds() {
return kinds;
}
public BackupStatus getStatus() {
return completeTime.isPresent() ? BackupStatus.COMPLETE : BackupStatus.PENDING;
}
public DateTime getStartTime() {
return startTime;
}
public Optional<DateTime> getCompleteTime() {
return completeTime;
}
/**
* Returns the length of time the backup ran for (if completed) or the length of time since the
* backup started (if it has not completed).
*/
public Duration getRunningTime() {
return new Duration(startTime, completeTime.orElse(clock.nowUtc()));
}
public Optional<String> getGcsFilename() {
return gcsFilename;
}
/** Returns a string version of key information about the backup. */
public String getInformation() {
return Joiner.on('\n')
.join(
"Backup name: " + backupName,
"Status: " + getStatus(),
"Started: " + startTime,
"Ended: " + completeTime.orElse(null),
"Duration: " + Ascii.toLowerCase(getRunningTime().toPeriod().toString().substring(2)),
"GCS: " + gcsFilename.orElse(null),
"Kinds: " + kinds,
"");
}
}

View file

@ -1,103 +0,0 @@
// Copyright 2017 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.export;
import static com.google.appengine.api.datastore.DatastoreServiceFactory.getDatastoreService;
import static com.google.appengine.api.taskqueue.QueueFactory.getQueue;
import static com.google.common.base.Strings.nullToEmpty;
import static com.google.common.collect.ImmutableList.toImmutableList;
import com.google.appengine.api.datastore.Query;
import com.google.appengine.api.taskqueue.TaskHandle;
import com.google.appengine.api.taskqueue.TaskOptions;
import com.google.appengine.api.taskqueue.TaskOptions.Method;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Streams;
import google.registry.util.AppEngineServiceUtils;
import java.util.NoSuchElementException;
import javax.inject.Inject;
/** An object providing methods for starting and querying Datastore backups. */
public class DatastoreBackupService {
/** The internal kind name used for entities storing information about Datastore backups. */
static final String BACKUP_INFO_KIND = "_AE_Backup_Information";
/** The name of the app version used for hosting the Datastore Admin functionality. */
static final String DATASTORE_ADMIN_VERSION_NAME = "ah-builtin-python-bundle";
private final AppEngineServiceUtils appEngineServiceUtils;
@Inject
public DatastoreBackupService(AppEngineServiceUtils appEngineServiceUtils) {
this.appEngineServiceUtils = appEngineServiceUtils;
}
/**
* Generates the TaskOptions needed to trigger an AppEngine Datastore backup job.
*
* @see <a href="https://developers.google.com/appengine/articles/scheduled_backups">Scheduled Backups</a>
*/
private TaskOptions makeTaskOptions(
String queue, String name, String gcsBucket, ImmutableSet<String> kinds) {
String hostname =
appEngineServiceUtils.getVersionHostname("default", DATASTORE_ADMIN_VERSION_NAME);
TaskOptions options = TaskOptions.Builder.withUrl("/_ah/datastore_admin/backup.create")
.header("Host", hostname)
.method(Method.GET)
.param("name", name + "_") // Add underscore since the name will be used as a prefix.
.param("filesystem", "gs")
.param("gs_bucket_name", gcsBucket)
.param("queue", queue);
for (String kind : kinds) {
options.param("kind", kind);
}
return options;
}
/**
* Launches a new Datastore backup with the given name, GCS bucket, and set of kinds by
* submitting a task to the given task queue, and returns a handle to that task.
*/
public TaskHandle launchNewBackup(
String queue, String name, String gcsBucket, ImmutableSet<String> kinds) {
return getQueue(queue).add(makeTaskOptions(queue, name, gcsBucket, kinds));
}
/** Return an iterable of all Datastore backups whose names have the given string prefix. */
public Iterable<DatastoreBackupInfo> findAllByNamePrefix(final String namePrefix) {
// Need the raw DatastoreService to access the internal _AE_Backup_Information entities.
// TODO(b/19081037): make an Objectify entity class for these raw Datastore entities instead.
return Streams.stream(getDatastoreService().prepare(new Query(BACKUP_INFO_KIND)).asIterable())
.filter(entity -> nullToEmpty((String) entity.getProperty("name")).startsWith(namePrefix))
.map(DatastoreBackupInfo::new)
.collect(toImmutableList());
}
/**
* Return a single DatastoreBackup that uniquely matches this name prefix. Throws an IAE
* if no backups match or if more than one backup matches.
*/
public DatastoreBackupInfo findByName(final String namePrefix) {
try {
return Iterables.getOnlyElement(findAllByNamePrefix(namePrefix));
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("More than one backup with name prefix " + namePrefix, e);
} catch (NoSuchElementException e) {
throw new IllegalArgumentException("No backup found with name prefix " + namePrefix, e);
}
}
}

View file

@ -17,16 +17,15 @@ package google.registry.export;
import static google.registry.export.BigqueryPollJobAction.CHAINED_TASK_QUEUE_HEADER;
import static google.registry.export.BigqueryPollJobAction.JOB_ID_HEADER;
import static google.registry.export.BigqueryPollJobAction.PROJECT_ID_HEADER;
import static google.registry.export.CheckSnapshotAction.CHECK_SNAPSHOT_KINDS_TO_LOAD_PARAM;
import static google.registry.export.CheckSnapshotAction.CHECK_SNAPSHOT_NAME_PARAM;
import static google.registry.export.LoadSnapshotAction.LOAD_SNAPSHOT_FILE_PARAM;
import static google.registry.export.LoadSnapshotAction.LOAD_SNAPSHOT_ID_PARAM;
import static google.registry.export.LoadSnapshotAction.LOAD_SNAPSHOT_KINDS_PARAM;
import static google.registry.export.CheckBackupAction.CHECK_BACKUP_KINDS_TO_LOAD_PARAM;
import static google.registry.export.CheckBackupAction.CHECK_BACKUP_NAME_PARAM;
import static google.registry.export.UpdateSnapshotViewAction.UPDATE_SNAPSHOT_DATASET_ID_PARAM;
import static google.registry.export.UpdateSnapshotViewAction.UPDATE_SNAPSHOT_KIND_PARAM;
import static google.registry.export.UpdateSnapshotViewAction.UPDATE_SNAPSHOT_TABLE_ID_PARAM;
import static google.registry.export.UpdateSnapshotViewAction.UPDATE_SNAPSHOT_VIEWNAME_PARAM;
import static google.registry.export.UploadDatastoreBackupAction.UPLOAD_BACKUP_FOLDER_PARAM;
import static google.registry.export.UploadDatastoreBackupAction.UPLOAD_BACKUP_ID_PARAM;
import static google.registry.export.UploadDatastoreBackupAction.UPLOAD_BACKUP_KINDS_PARAM;
import static google.registry.request.RequestParameters.extractRequiredHeader;
import static google.registry.request.RequestParameters.extractRequiredParameter;
@ -64,12 +63,6 @@ public final class ExportRequestModule {
return extractRequiredParameter(req, UPDATE_SNAPSHOT_VIEWNAME_PARAM);
}
@Provides
@Parameter(LOAD_SNAPSHOT_FILE_PARAM)
static String provideLoadSnapshotFile(HttpServletRequest req) {
return extractRequiredParameter(req, LOAD_SNAPSHOT_FILE_PARAM);
}
@Provides
@Parameter(UPLOAD_BACKUP_FOLDER_PARAM)
static String provideSnapshotUrlPrefix(HttpServletRequest req) {
@ -77,27 +70,27 @@ public final class ExportRequestModule {
}
@Provides
@Parameter(LOAD_SNAPSHOT_ID_PARAM)
@Parameter(UPLOAD_BACKUP_ID_PARAM)
static String provideLoadSnapshotId(HttpServletRequest req) {
return extractRequiredParameter(req, LOAD_SNAPSHOT_ID_PARAM);
return extractRequiredParameter(req, UPLOAD_BACKUP_ID_PARAM);
}
@Provides
@Parameter(LOAD_SNAPSHOT_KINDS_PARAM)
@Parameter(UPLOAD_BACKUP_KINDS_PARAM)
static String provideLoadSnapshotKinds(HttpServletRequest req) {
return extractRequiredParameter(req, LOAD_SNAPSHOT_KINDS_PARAM);
return extractRequiredParameter(req, UPLOAD_BACKUP_KINDS_PARAM);
}
@Provides
@Parameter(CHECK_SNAPSHOT_NAME_PARAM)
@Parameter(CHECK_BACKUP_NAME_PARAM)
static String provideCheckSnapshotName(HttpServletRequest req) {
return extractRequiredParameter(req, CHECK_SNAPSHOT_NAME_PARAM);
return extractRequiredParameter(req, CHECK_BACKUP_NAME_PARAM);
}
@Provides
@Parameter(CHECK_SNAPSHOT_KINDS_TO_LOAD_PARAM)
@Parameter(CHECK_BACKUP_KINDS_TO_LOAD_PARAM)
static String provideCheckSnapshotKindsToLoad(HttpServletRequest req) {
return extractRequiredParameter(req, CHECK_SNAPSHOT_KINDS_TO_LOAD_PARAM);
return extractRequiredParameter(req, CHECK_BACKUP_KINDS_TO_LOAD_PARAM);
}
@Provides

View file

@ -1,78 +0,0 @@
// Copyright 2017 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.export;
import static google.registry.export.CheckSnapshotAction.enqueuePollTask;
import static google.registry.request.Action.Method.POST;
import com.google.common.flogger.FluentLogger;
import google.registry.config.RegistryConfig;
import google.registry.request.Action;
import google.registry.request.Response;
import google.registry.request.auth.Auth;
import google.registry.util.Clock;
import javax.inject.Inject;
/**
* Action to trigger a Datastore backup job that writes a snapshot to Google Cloud Storage.
*
* <p>This is the first step of a four step workflow for exporting snapshots, with each step calling
* the next upon successful completion:
*
* <ol>
* <li>The snapshot is exported to Google Cloud Storage (this action).
* <li>The {@link CheckSnapshotAction} polls until the export is completed.
* <li>The {@link LoadSnapshotAction} imports the data from GCS to BigQuery.
* <li>The {@link UpdateSnapshotViewAction} updates the view in latest_datastore_export.
* </ol>
*/
@Action(
service = Action.Service.BACKEND,
path = ExportSnapshotAction.PATH,
method = POST,
automaticallyPrintOk = true,
auth = Auth.AUTH_INTERNAL_ONLY)
public class ExportSnapshotAction implements Runnable {
/** Queue to use for enqueuing the task that will actually launch the backup. */
static final String QUEUE = "export-snapshot"; // See queue.xml.
static final String PATH = "/_dr/task/exportSnapshot"; // See web.xml.
/** Prefix to use for naming all snapshots that are started by this servlet. */
static final String SNAPSHOT_PREFIX = "auto_snapshot_";
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
@Inject Clock clock;
@Inject DatastoreBackupService backupService;
@Inject Response response;
@Inject
ExportSnapshotAction() {}
@Override
public void run() {
// Use a unique name for the snapshot so we can explicitly check its completion later.
String snapshotName = SNAPSHOT_PREFIX + clock.nowUtc().toString("YYYYMMdd_HHmmss");
backupService.launchNewBackup(
QUEUE, snapshotName, RegistryConfig.getSnapshotsBucket(), ExportConstants.getBackupKinds());
// Enqueue a poll task to monitor the backup and load reporting-related kinds into bigquery.
enqueuePollTask(snapshotName, ExportConstants.getReportingKinds());
String message = "Datastore backup started with name: " + snapshotName;
logger.atInfo().log(message);
response.setPayload(message);
}
}

View file

@ -1,166 +0,0 @@
// Copyright 2017 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.export;
import static com.google.appengine.api.taskqueue.QueueFactory.getQueue;
import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkArgument;
import static google.registry.export.UpdateSnapshotViewAction.createViewUpdateTask;
import static google.registry.request.Action.Method.POST;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobConfiguration;
import com.google.api.services.bigquery.model.JobConfigurationLoad;
import com.google.api.services.bigquery.model.JobReference;
import com.google.api.services.bigquery.model.TableReference;
import com.google.appengine.api.taskqueue.TaskHandle;
import com.google.appengine.api.taskqueue.TaskOptions;
import com.google.appengine.api.taskqueue.TaskOptions.Method;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.flogger.FluentLogger;
import google.registry.bigquery.BigqueryUtils.SourceFormat;
import google.registry.bigquery.BigqueryUtils.WriteDisposition;
import google.registry.bigquery.CheckedBigquery;
import google.registry.config.RegistryConfig.Config;
import google.registry.export.BigqueryPollJobAction.BigqueryPollJobEnqueuer;
import google.registry.request.Action;
import google.registry.request.HttpException.BadRequestException;
import google.registry.request.HttpException.InternalServerErrorException;
import google.registry.request.Parameter;
import google.registry.request.auth.Auth;
import google.registry.util.Clock;
import java.io.IOException;
import javax.inject.Inject;
import org.joda.time.DateTime;
/** Action to load a Datastore snapshot from Google Cloud Storage into BigQuery. */
@Action(
service = Action.Service.BACKEND,
path = LoadSnapshotAction.PATH,
method = POST,
auth = Auth.AUTH_INTERNAL_ONLY)
public class LoadSnapshotAction implements Runnable {
/** Parameter names for passing parameters into the servlet. */
static final String LOAD_SNAPSHOT_ID_PARAM = "id";
static final String LOAD_SNAPSHOT_FILE_PARAM = "file";
static final String LOAD_SNAPSHOT_KINDS_PARAM = "kinds";
static final String SNAPSHOTS_DATASET = "snapshots";
static final String LATEST_SNAPSHOT_VIEW_NAME = "latest_datastore_export";
/** Servlet-specific details needed for enqueuing tasks against itself. */
static final String QUEUE = "export-snapshot"; // See queue.xml.
static final String PATH = "/_dr/task/loadSnapshot"; // See web.xml.
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
@Inject CheckedBigquery checkedBigquery;
@Inject BigqueryPollJobEnqueuer bigqueryPollEnqueuer;
@Inject Clock clock;
@Inject @Config("projectId") String projectId;
@Inject @Parameter(LOAD_SNAPSHOT_FILE_PARAM) String snapshotFile;
@Inject @Parameter(LOAD_SNAPSHOT_ID_PARAM) String snapshotId;
@Inject @Parameter(LOAD_SNAPSHOT_KINDS_PARAM) String snapshotKinds;
@Inject LoadSnapshotAction() {}
/** Enqueue a task for starting a backup load. */
public static TaskHandle enqueueLoadSnapshotTask(
String snapshotId, String gcsFile, ImmutableSet<String> kinds) {
return getQueue(QUEUE).add(
TaskOptions.Builder.withUrl(PATH)
.method(Method.POST)
.param(LOAD_SNAPSHOT_ID_PARAM, snapshotId)
.param(LOAD_SNAPSHOT_FILE_PARAM, gcsFile)
.param(LOAD_SNAPSHOT_KINDS_PARAM, Joiner.on(',').join(kinds)));
}
@Override
public void run() {
try {
String message =
loadSnapshot(snapshotId, snapshotFile, Splitter.on(',').split(snapshotKinds));
logger.atInfo().log("Loaded snapshot successfully: %s", message);
} catch (Throwable e) {
logger.atSevere().withCause(e).log("Error loading snapshot");
if (e instanceof IllegalArgumentException) {
throw new BadRequestException("Error calling load snapshot: " + e.getMessage(), e);
} else {
throw new InternalServerErrorException(
"Error loading snapshot: " + firstNonNull(e.getMessage(), e.toString()));
}
}
}
private String loadSnapshot(String snapshotId, String gcsFilename, Iterable<String> kinds)
throws IOException {
Bigquery bigquery = checkedBigquery.ensureDataSetExists(projectId, SNAPSHOTS_DATASET);
DateTime now = clock.nowUtc();
String loadMessage =
String.format("Loading Datastore snapshot %s from %s...", snapshotId, gcsFilename);
logger.atInfo().log(loadMessage);
StringBuilder builder = new StringBuilder(loadMessage + "\n");
builder.append("Load jobs:\n");
for (String kindName : kinds) {
String jobId = String.format("load-snapshot-%s-%s-%d", snapshotId, kindName, now.getMillis());
JobReference jobRef = new JobReference().setProjectId(projectId).setJobId(jobId);
String sourceUri = getBackupInfoFileForKind(gcsFilename, kindName);
String tableId = String.format("%s_%s", snapshotId, kindName);
// Launch the load job.
Job job = makeLoadJob(jobRef, sourceUri, tableId);
bigquery.jobs().insert(projectId, job).execute();
// Enqueue a task to check on the load job's completion, and if it succeeds, to update a
// well-known view in BigQuery to point at the newly loaded snapshot table for this kind.
bigqueryPollEnqueuer.enqueuePollTask(
jobRef,
createViewUpdateTask(SNAPSHOTS_DATASET, tableId, kindName, LATEST_SNAPSHOT_VIEW_NAME),
getQueue(UpdateSnapshotViewAction.QUEUE));
builder.append(String.format(" - %s:%s\n", projectId, jobId));
logger.atInfo().log("Submitted load job %s:%s", projectId, jobId);
}
return builder.toString();
}
private static String getBackupInfoFileForKind(String backupInfoFile, String kindName) {
String extension = ".backup_info";
checkArgument(backupInfoFile.endsWith(extension), "backup info file extension missing");
String prefix = backupInfoFile.substring(0, backupInfoFile.length() - extension.length());
return Joiner.on('.').join(prefix, kindName, extension.substring(1));
}
private Job makeLoadJob(JobReference jobRef, String sourceUri, String tableId) {
TableReference tableReference = new TableReference()
.setProjectId(jobRef.getProjectId())
.setDatasetId(SNAPSHOTS_DATASET)
.setTableId(tableId);
return new Job()
.setJobReference(jobRef)
.setConfiguration(new JobConfiguration()
.setLoad(new JobConfigurationLoad()
.setWriteDisposition(WriteDisposition.WRITE_EMPTY.toString())
.setSourceFormat(SourceFormat.DATASTORE_BACKUP.toString())
.setSourceUris(ImmutableList.of(sourceUri))
.setDestinationTable(tableReference)));
}
}

View file

@ -50,8 +50,7 @@ public class UpdateSnapshotViewAction implements Runnable {
static final String UPDATE_SNAPSHOT_VIEWNAME_PARAM = "viewname";
/** Servlet-specific details needed for enqueuing tasks against itself. */
// For now this queue is shared by the backup workflows started by both ExportSnapshotAction
// and BackupDatastoreAction.
// For now this queue is shared by the backup workflows started by BackupDatastoreAction.
// TODO(weiminyu): update queue name (snapshot->backup) after ExportSnapshot flow is removed.
static final String QUEUE = "export-snapshot-update-view"; // See queue.xml.