Migrate away from deprecated Executor method

Among the Futures methods that run user callbacks, those that don't take
an Executor will be deleted.  This CL migrates them to the counterparts
that take MoreExecutors.directExecutor() as such Executor in the
parameter list, exactly the way that the old method works.

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=145358533
This commit is contained in:
Ben McIlwain 2017-01-23 17:31:34 -08:00
parent be4c62ccf1
commit d16971c34f
2 changed files with 37 additions and 23 deletions

View file

@ -19,6 +19,9 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Strings.isNullOrEmpty; import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.base.Verify.verify; import static com.google.common.base.Verify.verify;
import static com.google.common.util.concurrent.Futures.transform;
import static com.google.common.util.concurrent.Futures.transformAsync;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static google.registry.bigquery.BigqueryUtils.toJobReferenceString; import static google.registry.bigquery.BigqueryUtils.toJobReferenceString;
import static google.registry.config.RegistryConfig.getProjectId; import static google.registry.config.RegistryConfig.getProjectId;
import static org.joda.time.DateTimeZone.UTC; import static org.joda.time.DateTimeZone.UTC;
@ -404,7 +407,7 @@ public class BigqueryConnection implements AutoCloseable {
.setSourceFormat(sourceFormat.toString()) .setSourceFormat(sourceFormat.toString())
.setSourceUris(ImmutableList.copyOf(sourceUris)) .setSourceUris(ImmutableList.copyOf(sourceUris))
.setDestinationTable(dest.getTableReference()))); .setDestinationTable(dest.getTableReference())));
return Futures.transform(runJobToCompletion(job, dest), new UpdateTableFunction()); return transform(runJobToCompletion(job, dest), new UpdateTableFunction(), directExecutor());
} }
/** /**
@ -418,8 +421,10 @@ public class BigqueryConnection implements AutoCloseable {
if (dest.type == TableType.VIEW) { if (dest.type == TableType.VIEW) {
// Use Futures.transform() rather than calling apply() directly so that any exceptions thrown // Use Futures.transform() rather than calling apply() directly so that any exceptions thrown
// by calling UpdateTableFunction will be propagated on the get() call, not from here. // by calling UpdateTableFunction will be propagated on the get() call, not from here.
return Futures.transform( return transform(
Futures.immediateFuture(dest.withQuery(querySql)), new UpdateTableFunction()); Futures.immediateFuture(dest.withQuery(querySql)),
new UpdateTableFunction(),
directExecutor());
} else { } else {
Job job = new Job() Job job = new Job()
.setConfiguration(new JobConfiguration() .setConfiguration(new JobConfiguration()
@ -428,7 +433,7 @@ public class BigqueryConnection implements AutoCloseable {
.setDefaultDataset(getDataset()) .setDefaultDataset(getDataset())
.setWriteDisposition(dest.getWriteDisposition().toString()) .setWriteDisposition(dest.getWriteDisposition().toString())
.setDestinationTable(dest.getTableReference()))); .setDestinationTable(dest.getTableReference())));
return Futures.transform(runJobToCompletion(job, dest), new UpdateTableFunction()); return transform(runJobToCompletion(job, dest), new UpdateTableFunction(), directExecutor());
} }
} }
@ -447,13 +452,15 @@ public class BigqueryConnection implements AutoCloseable {
.setQuery(new JobConfigurationQuery() .setQuery(new JobConfigurationQuery()
.setQuery(querySql) .setQuery(querySql)
.setDefaultDataset(getDataset()))); .setDefaultDataset(getDataset())));
return Futures.transform( return transform(
runJobToCompletion(job), runJobToCompletion(job),
new Function<Job, ImmutableTable<Integer, TableFieldSchema, Object>>() { new Function<Job, ImmutableTable<Integer, TableFieldSchema, Object>>() {
@Override @Override
public ImmutableTable<Integer, TableFieldSchema, Object> apply(Job job) { public ImmutableTable<Integer, TableFieldSchema, Object> apply(Job job) {
return getQueryResults(job); return getQueryResults(job);
}}); }
},
directExecutor());
} }
/** /**
@ -563,13 +570,15 @@ public class BigqueryConnection implements AutoCloseable {
// we can't rely on that for running the extract job because it may not be fully replicated. // we can't rely on that for running the extract job because it may not be fully replicated.
// Tracking bug for query-to-GCS support is b/13777340. // Tracking bug for query-to-GCS support is b/13777340.
DestinationTable tempTable = buildTemporaryTable().build(); DestinationTable tempTable = buildTemporaryTable().build();
return Futures.transformAsync( return transformAsync(
query(querySql, tempTable), new AsyncFunction<DestinationTable, String>() { query(querySql, tempTable),
new AsyncFunction<DestinationTable, String>() {
@Override @Override
public ListenableFuture<String> apply(DestinationTable tempTable) { public ListenableFuture<String> apply(DestinationTable tempTable) {
return extractTable(tempTable, destinationUri, destinationFormat, printHeader); return extractTable(tempTable, destinationUri, destinationFormat, printHeader);
} }
}); },
directExecutor());
} }
/** @see #runJob(Job, AbstractInputStreamContent) */ /** @see #runJob(Job, AbstractInputStreamContent) */

View file

@ -15,6 +15,8 @@
package google.registry.tools; package google.registry.tools;
import static com.google.common.base.Predicates.notNull; import static com.google.common.base.Predicates.notNull;
import static com.google.common.util.concurrent.Futures.addCallback;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import com.beust.jcommander.Parameter; import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters; import com.beust.jcommander.Parameters;
@ -97,22 +99,25 @@ final class LoadSnapshotCommand extends BigqueryCommand {
// Add callbacks to each load job that print information on successful completion or failure. // Add callbacks to each load job that print information on successful completion or failure.
for (final String jobId : loadJobs.keySet()) { for (final String jobId : loadJobs.keySet()) {
final String jobName = "load-" + jobId; final String jobName = "load-" + jobId;
Futures.addCallback(loadJobs.get(jobId), new FutureCallback<Object>() { addCallback(
private double elapsedSeconds() { loadJobs.get(jobId),
return (System.currentTimeMillis() - startTime) / 1000.0; new FutureCallback<Object>() {
} private double elapsedSeconds() {
return (System.currentTimeMillis() - startTime) / 1000.0;
}
@Override @Override
public void onSuccess(Object unused) { public void onSuccess(Object unused) {
System.err.printf("Job %s succeeded (%.3fs)\n", jobName, elapsedSeconds()); System.err.printf("Job %s succeeded (%.3fs)\n", jobName, elapsedSeconds());
} }
@Override @Override
public void onFailure(Throwable error) { public void onFailure(Throwable error) {
System.err.printf( System.err.printf(
"Job %s failed (%.3fs): %s\n", jobName, elapsedSeconds(), error.getMessage()); "Job %s failed (%.3fs): %s\n", jobName, elapsedSeconds(), error.getMessage());
} }
}); },
directExecutor());
} }
// Block on the completion of all the load jobs. // Block on the completion of all the load jobs.
List<?> results = Futures.successfulAsList(loadJobs.values()).get(); List<?> results = Futures.successfulAsList(loadJobs.values()).get();