diff --git a/java/google/registry/bigquery/BigqueryConnection.java b/java/google/registry/bigquery/BigqueryConnection.java index d17534f1c..0ec4c558e 100644 --- a/java/google/registry/bigquery/BigqueryConnection.java +++ b/java/google/registry/bigquery/BigqueryConnection.java @@ -19,6 +19,9 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Strings.isNullOrEmpty; import static com.google.common.base.Verify.verify; +import static com.google.common.util.concurrent.Futures.transform; +import static com.google.common.util.concurrent.Futures.transformAsync; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static google.registry.bigquery.BigqueryUtils.toJobReferenceString; import static google.registry.config.RegistryConfig.getProjectId; import static org.joda.time.DateTimeZone.UTC; @@ -404,7 +407,7 @@ public class BigqueryConnection implements AutoCloseable { .setSourceFormat(sourceFormat.toString()) .setSourceUris(ImmutableList.copyOf(sourceUris)) .setDestinationTable(dest.getTableReference()))); - return Futures.transform(runJobToCompletion(job, dest), new UpdateTableFunction()); + return transform(runJobToCompletion(job, dest), new UpdateTableFunction(), directExecutor()); } /** @@ -418,8 +421,10 @@ public class BigqueryConnection implements AutoCloseable { if (dest.type == TableType.VIEW) { // Use Futures.transform() rather than calling apply() directly so that any exceptions thrown // by calling UpdateTableFunction will be propagated on the get() call, not from here. - return Futures.transform( - Futures.immediateFuture(dest.withQuery(querySql)), new UpdateTableFunction()); + return transform( + Futures.immediateFuture(dest.withQuery(querySql)), + new UpdateTableFunction(), + directExecutor()); } else { Job job = new Job() .setConfiguration(new JobConfiguration() @@ -428,7 +433,7 @@ public class BigqueryConnection implements AutoCloseable { .setDefaultDataset(getDataset()) .setWriteDisposition(dest.getWriteDisposition().toString()) .setDestinationTable(dest.getTableReference()))); - return Futures.transform(runJobToCompletion(job, dest), new UpdateTableFunction()); + return transform(runJobToCompletion(job, dest), new UpdateTableFunction(), directExecutor()); } } @@ -447,13 +452,15 @@ public class BigqueryConnection implements AutoCloseable { .setQuery(new JobConfigurationQuery() .setQuery(querySql) .setDefaultDataset(getDataset()))); - return Futures.transform( + return transform( runJobToCompletion(job), new Function>() { @Override public ImmutableTable apply(Job job) { return getQueryResults(job); - }}); + } + }, + directExecutor()); } /** @@ -563,13 +570,15 @@ public class BigqueryConnection implements AutoCloseable { // we can't rely on that for running the extract job because it may not be fully replicated. // Tracking bug for query-to-GCS support is b/13777340. DestinationTable tempTable = buildTemporaryTable().build(); - return Futures.transformAsync( - query(querySql, tempTable), new AsyncFunction() { + return transformAsync( + query(querySql, tempTable), + new AsyncFunction() { @Override public ListenableFuture apply(DestinationTable tempTable) { return extractTable(tempTable, destinationUri, destinationFormat, printHeader); } - }); + }, + directExecutor()); } /** @see #runJob(Job, AbstractInputStreamContent) */ diff --git a/java/google/registry/tools/LoadSnapshotCommand.java b/java/google/registry/tools/LoadSnapshotCommand.java index 5516baf2c..a680a8de1 100644 --- a/java/google/registry/tools/LoadSnapshotCommand.java +++ b/java/google/registry/tools/LoadSnapshotCommand.java @@ -15,6 +15,8 @@ package google.registry.tools; import static com.google.common.base.Predicates.notNull; +import static com.google.common.util.concurrent.Futures.addCallback; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import com.beust.jcommander.Parameter; import com.beust.jcommander.Parameters; @@ -97,22 +99,25 @@ final class LoadSnapshotCommand extends BigqueryCommand { // Add callbacks to each load job that print information on successful completion or failure. for (final String jobId : loadJobs.keySet()) { final String jobName = "load-" + jobId; - Futures.addCallback(loadJobs.get(jobId), new FutureCallback() { - private double elapsedSeconds() { - return (System.currentTimeMillis() - startTime) / 1000.0; - } + addCallback( + loadJobs.get(jobId), + new FutureCallback() { + private double elapsedSeconds() { + return (System.currentTimeMillis() - startTime) / 1000.0; + } - @Override - public void onSuccess(Object unused) { - System.err.printf("Job %s succeeded (%.3fs)\n", jobName, elapsedSeconds()); - } + @Override + public void onSuccess(Object unused) { + System.err.printf("Job %s succeeded (%.3fs)\n", jobName, elapsedSeconds()); + } - @Override - public void onFailure(Throwable error) { - System.err.printf( - "Job %s failed (%.3fs): %s\n", jobName, elapsedSeconds(), error.getMessage()); - } - }); + @Override + public void onFailure(Throwable error) { + System.err.printf( + "Job %s failed (%.3fs): %s\n", jobName, elapsedSeconds(), error.getMessage()); + } + }, + directExecutor()); } // Block on the completion of all the load jobs. List results = Futures.successfulAsList(loadJobs.values()).get();