mirror of
https://github.com/google/nomulus.git
synced 2025-04-30 20:17:51 +02:00
124 lines
5 KiB
Java
124 lines
5 KiB
Java
// Copyright 2016 The Domain Registry Authors. All Rights Reserved.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package google.registry.tools;
|
|
|
|
import static com.google.common.base.Predicates.notNull;
|
|
|
|
import com.beust.jcommander.Parameter;
|
|
import com.beust.jcommander.Parameters;
|
|
import com.google.common.collect.FluentIterable;
|
|
import com.google.common.collect.ImmutableList;
|
|
import com.google.common.collect.ImmutableMap;
|
|
import com.google.common.util.concurrent.FutureCallback;
|
|
import com.google.common.util.concurrent.Futures;
|
|
import com.google.common.util.concurrent.ListenableFuture;
|
|
import google.registry.bigquery.BigqueryUtils.SourceFormat;
|
|
import google.registry.export.ExportConstants;
|
|
import java.util.ArrayList;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
|
|
/** Command to load datastore snapshots into Bigquery. */
|
|
@Parameters(separators = " =", commandDescription = "Load datastore snapshot into Bigquery")
|
|
final class LoadSnapshotCommand extends BigqueryCommand {
|
|
|
|
@Parameter(
|
|
names = "--snapshot",
|
|
description = "Common filename prefix of the specific snapshot series to import.")
|
|
private String snapshotPrefix = null;
|
|
|
|
@Parameter(
|
|
names = "--gcs_bucket",
|
|
description = "Name of the GCS bucket from which to import datastore snapshots.")
|
|
private String snapshotGcsBucket = "domain-registry/snapshots/testing";
|
|
|
|
@Parameter(
|
|
names = "--kinds",
|
|
description = "List of datastore kinds for which to import snapshot data.")
|
|
private List<String> kindNames = new ArrayList<>(ExportConstants.getReportingKinds());
|
|
|
|
/** Runs the main snapshot import logic. */
|
|
@Override
|
|
public void runWithBigquery() throws Exception {
|
|
kindNames.removeAll(ImmutableList.of("")); // Filter out any empty kind names.
|
|
if (snapshotPrefix == null || kindNames.isEmpty()) {
|
|
System.err.println("Nothing to import; specify --snapshot and at least one kind.");
|
|
return;
|
|
}
|
|
Map<String, ListenableFuture<?>> loadJobs = loadSnapshotKinds(kindNames);
|
|
waitForLoadJobs(loadJobs);
|
|
}
|
|
|
|
/**
|
|
* Starts load jobs for the given snapshot kinds, and returns a map of kind name to
|
|
* ListenableFuture representing the result of the load job for that kind.
|
|
*/
|
|
private Map<String, ListenableFuture<?>> loadSnapshotKinds(List<String> kindNames)
|
|
throws Exception {
|
|
ImmutableMap.Builder<String, ListenableFuture<?>> builder = new ImmutableMap.Builder<>();
|
|
for (String kind : kindNames) {
|
|
String filename = String.format(
|
|
"gs://%s/%s.%s.backup_info", snapshotGcsBucket, snapshotPrefix, kind);
|
|
builder.put(kind, loadSnapshotFile(filename, kind));
|
|
System.err.println("Started load job for kind: " + kind);
|
|
}
|
|
return builder.build();
|
|
}
|
|
|
|
/** Starts a load job for the specified kind name, sourcing data from the given GCS file. */
|
|
private ListenableFuture<?> loadSnapshotFile(String filename, String kindName) throws Exception {
|
|
return bigquery().load(
|
|
bigquery().buildDestinationTable(kindName)
|
|
.description("Datastore snapshot import for " + kindName + ".")
|
|
.build(),
|
|
SourceFormat.DATASTORE_BACKUP,
|
|
ImmutableList.of(filename));
|
|
}
|
|
|
|
/**
|
|
* Block on the completion of the load jobs in the provided map, printing out information on
|
|
* each job's success or failure.
|
|
*/
|
|
private void waitForLoadJobs(Map<String, ListenableFuture<?>> loadJobs) throws Exception {
|
|
final long startTime = System.currentTimeMillis();
|
|
System.err.println("Waiting for load jobs...");
|
|
// Add callbacks to each load job that print information on successful completion or failure.
|
|
for (final String jobId : loadJobs.keySet()) {
|
|
final String jobName = "load-" + jobId;
|
|
Futures.addCallback(loadJobs.get(jobId), new FutureCallback<Object>() {
|
|
private double elapsedSeconds() {
|
|
return (System.currentTimeMillis() - startTime) / 1000.0;
|
|
}
|
|
|
|
@Override
|
|
public void onSuccess(Object unused) {
|
|
System.err.printf("Job %s succeeded (%.3fs)\n", jobName, elapsedSeconds());
|
|
}
|
|
|
|
@Override
|
|
public void onFailure(Throwable error) {
|
|
System.err.printf(
|
|
"Job %s failed (%.3fs): %s\n", jobName, elapsedSeconds(), error.getMessage());
|
|
}
|
|
});
|
|
}
|
|
// Block on the completion of all the load jobs.
|
|
List<?> results = Futures.successfulAsList(loadJobs.values()).get();
|
|
int numSucceeded = FluentIterable.from(results).filter(notNull()).size();
|
|
System.err.printf(
|
|
"All load jobs have terminated: %d/%d successful.\n",
|
|
numSucceeded, loadJobs.size());
|
|
}
|
|
}
|