// Copyright 2016 The Domain Registry Authors. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package google.registry.bigquery; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Strings.isNullOrEmpty; import static com.google.common.base.Verify.verify; import static google.registry.bigquery.BigqueryUtils.toJobReferenceString; import static org.joda.time.DateTimeZone.UTC; import com.google.api.client.googleapis.auth.oauth2.GoogleCredential; import com.google.api.client.googleapis.json.GoogleJsonResponseException; import com.google.api.client.http.AbstractInputStreamContent; import com.google.api.client.http.HttpRequestInitializer; import com.google.api.client.http.HttpTransport; import com.google.api.client.json.JsonFactory; import com.google.api.services.bigquery.Bigquery; import com.google.api.services.bigquery.model.Dataset; import com.google.api.services.bigquery.model.DatasetReference; import com.google.api.services.bigquery.model.ErrorProto; import com.google.api.services.bigquery.model.GetQueryResultsResponse; import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobConfiguration; import com.google.api.services.bigquery.model.JobConfigurationExtract; import com.google.api.services.bigquery.model.JobConfigurationLoad; import com.google.api.services.bigquery.model.JobConfigurationQuery; import com.google.api.services.bigquery.model.JobReference; import com.google.api.services.bigquery.model.JobStatistics; import com.google.api.services.bigquery.model.JobStatus; import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableCell; import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.ViewDefinition; import com.google.common.base.Function; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableTable; import com.google.common.io.BaseEncoding; import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import google.registry.bigquery.BigqueryUtils.DestinationFormat; import google.registry.bigquery.BigqueryUtils.SourceFormat; import google.registry.bigquery.BigqueryUtils.TableType; import google.registry.bigquery.BigqueryUtils.WriteDisposition; import google.registry.config.RegistryEnvironment; import google.registry.util.FormattingLogger; import google.registry.util.NonFinalForTesting; import google.registry.util.Sleeper; import google.registry.util.SqlTemplate; import google.registry.util.SystemSleeper; import org.joda.time.DateTime; import org.joda.time.Duration; import java.io.IOException; import java.util.Iterator; import java.util.List; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import javax.annotation.Nullable; /** Class encapsulating parameters and state for accessing the Bigquery API. */ public class BigqueryConnection implements AutoCloseable { private static final FormattingLogger logger = FormattingLogger.getLoggerForCallerClass(); private static final Duration MIN_POLL_INTERVAL = Duration.millis(500); @NonFinalForTesting private static Sleeper sleeper = new SystemSleeper(); /** Default name of the default dataset to use for requests to the API. */ public static final String DEFAULT_DATASET_NAME = "testing"; /** Default dataset to use for storing temporary tables. */ private static final String TEMP_DATASET_NAME = "__temp__"; /** Default time to live for temporary tables. */ private static final Duration TEMP_TABLE_TTL = Duration.standardHours(24); /** Bigquery client instance wrapped by this class. */ private Bigquery bigquery; /** Executor service for bigquery jobs. */ private ListeningExecutorService service; /** Credential object to use for initializing HTTP requests to the bigquery API. */ private HttpRequestInitializer credential; /** HTTP transport object to use for accessing bigquery API. */ private HttpTransport httpTransport; /** JSON factory object to use for accessing bigquery API. */ private JsonFactory jsonFactory; /** Pseudo-randomness source to use for creating random table names. */ private Random random = new Random(); /** Name of the default dataset to use for inserting tables. */ private String datasetId = DEFAULT_DATASET_NAME; /** Whether to automatically overwrite existing tables and views. */ private boolean overwrite = false; /** Duration to wait between polls for job status. */ private Duration pollInterval = Duration.millis(1000); /** Builder for a {@link BigqueryConnection}, since the latter is immutable once created. */ public static class Builder { private BigqueryConnection instance; public Builder() { instance = new BigqueryConnection(); } /** * The BigqueryConnection takes ownership of this {@link ExecutorService} and will * shut it down when the BigqueryConnection is closed. */ public Builder setExecutorService(ExecutorService executorService) { instance.service = MoreExecutors.listeningDecorator(executorService); return this; } public Builder setCredential(GoogleCredential credential) { instance.credential = checkNotNull(credential); instance.httpTransport = credential.getTransport(); instance.jsonFactory = credential.getJsonFactory(); return this; } public Builder setDatasetId(String datasetId) { instance.datasetId = checkNotNull(datasetId); return this; } public Builder setOverwrite(boolean overwrite) { instance.overwrite = overwrite; return this; } public Builder setPollInterval(Duration pollInterval) { checkArgument( !pollInterval.isShorterThan(MIN_POLL_INTERVAL), "poll interval must be at least %ldms", MIN_POLL_INTERVAL.getMillis()); instance.pollInterval = pollInterval; return this; } public BigqueryConnection build() { try { checkNotNull(instance.service, "Must provide executor service"); return instance; } finally { // Clear the internal instance so you can't accidentally mutate it through this builder. instance = null; } } } /** * Class that wraps a normal Bigquery API Table object to make it immutable from the client side * and give it additional semantics as a "destination" for load or query jobs, with an overwrite * flag set by the client upon creation. * *

Additionally provides encapsulation so that clients of BigqueryConnection don't need to take * any direct dependencies on Bigquery API classes and can instead use DestinationTable. */ public static class DestinationTable { /** The wrapped Bigquery API Table object. */ private final Table table; /** The type of this table. */ private final TableType type; /** The write disposition for jobs writing to this destination table. */ private final WriteDisposition writeDisposition; /** * A query to package with this table if the type is VIEW; not immutable but also not visible * to clients. */ private String query; /** A builder for DestinationTable. */ public static final class Builder { private final Table table = new Table(); private final TableReference tableRef = new TableReference(); private TableType type = TableType.TABLE; private WriteDisposition writeDisposition = WriteDisposition.WRITE_EMPTY; public Builder datasetId(String datasetId) { tableRef.setDatasetId(datasetId); return this; } public Builder name(String name) { tableRef.setTableId(name); return this; } public Builder description(String description) { table.setDescription(description); return this; } public Builder type(TableType type) { this.type = type; return this; } public Builder timeToLive(Duration duration) { this.table.setExpirationTime(new DateTime(UTC).plus(duration).getMillis()); return this; } public Builder overwrite(boolean overwrite) { if (overwrite) { this.writeDisposition = WriteDisposition.WRITE_TRUNCATE; } return this; } public Builder append(boolean append) { if (append) { this.writeDisposition = WriteDisposition.WRITE_APPEND; } return this; } public DestinationTable build() { tableRef.setProjectId(getEnvironmentProjectId()); table.setTableReference(tableRef); checkState(!isNullOrEmpty(table.getTableReference().getDatasetId())); checkState(!isNullOrEmpty(table.getTableReference().getTableId())); return new DestinationTable(this); } } /** Constructs a new DestinationTable from its Builder. */ private DestinationTable(Builder b) { table = b.table.clone(); type = b.type; writeDisposition = b.writeDisposition; } /** * Stores the provided query with this DestinationTable and returns it; used for packaging * a query along with the DestinationTable before sending it to the table update logic. */ private DestinationTable withQuery(String query) { checkState(type == TableType.VIEW); this.query = query; return this; } /** Returns a new copy of the Bigquery API Table object wrapped by this DestinationTable. */ private Table getTable() { Table tableCopy = table.clone(); if (type == TableType.VIEW) { tableCopy.setView(new ViewDefinition().setQuery(query)); } return tableCopy; } /** Returns the write disposition that should be used for jobs writing to this table. */ private WriteDisposition getWriteDisposition() { return writeDisposition; } /** Returns a new copy of the TableReference for the Table wrapped by this DestinationTable. */ private TableReference getTableReference() { return table.getTableReference().clone(); } /** Returns a string representation of the TableReference for the wrapped table. */ public String getStringReference() { return tableReferenceToString(table.getTableReference()); } /** Returns a string representation of the given TableReference. */ private static String tableReferenceToString(TableReference tableRef) { return String.format( "%s:%s.%s", tableRef.getProjectId(), tableRef.getDatasetId(), tableRef.getTableId()); } } /** * Initializes the BigqueryConnection object by setting up the API client and creating the * default dataset if it doesn't exist. */ public BigqueryConnection initialize() throws Exception { bigquery = new Bigquery.Builder(httpTransport, jsonFactory, credential) .setApplicationName(getClass().getSimpleName()) .build(); createDatasetIfNeeded(datasetId); createDatasetIfNeeded(TEMP_DATASET_NAME); return this; } /** * Closes the BigqueryConnection object by shutting down the executor service. Clients * should only call this after all ListenableFutures obtained from BigqueryConnection methods * have resolved; this method does not block on their completion. */ @Override public void close() { service.shutdown(); } /** Returns a partially built DestinationTable with the default dataset and overwrite behavior. */ public DestinationTable.Builder buildDestinationTable(String tableName) { return new DestinationTable.Builder() .datasetId(datasetId) .type(TableType.TABLE) .name(tableName) .overwrite(overwrite); } /** * Returns a partially built DestinationTable with a randomly generated name under the default * temporary table dataset, with the default TTL and overwrite behavior. */ public DestinationTable.Builder buildTemporaryTable() { return new DestinationTable.Builder() .datasetId(TEMP_DATASET_NAME) .type(TableType.TABLE) .name(getRandomTableName()) .timeToLive(TEMP_TABLE_TTL) .overwrite(overwrite); } /** Returns a random table name consisting only of the chars {@code [a-v0-9_]}. */ private String getRandomTableName() { byte[] randBytes = new byte[8]; // 64 bits of randomness ought to be plenty. random.nextBytes(randBytes); return "_" + BaseEncoding.base32Hex().lowerCase().omitPadding().encode(randBytes); } /** * A function that updates the specified Bigquery table to reflect the metadata from the input * DestinationTable, passing the same DestinationTable through as the output. If the specified * table does not already exist, it will be inserted into the dataset. * *

Clients can call this function directly to update a table on demand, or can pass it to * Futures.transform() to update a table produced as the asynchronous result of a load or query * job (e.g. to add a description to it). */ private class UpdateTableFunction implements Function { @Override public DestinationTable apply(final DestinationTable destinationTable) { Table table = destinationTable.getTable(); TableReference ref = table.getTableReference(); try { if (checkTableExists(ref.getDatasetId(), ref.getTableId())) { bigquery.tables() .update(ref.getProjectId(), ref.getDatasetId(), ref.getTableId(), table) .execute(); } else { bigquery.tables() .insert(ref.getProjectId(), ref.getDatasetId(), table) .execute(); } return destinationTable; } catch (IOException e) { throw BigqueryJobFailureException.create(e); } } } /** * Starts an asynchronous load job to populate the specified destination table with the given * source URIs and source format. Returns a ListenableFuture that holds the same destination * table object on success. */ public ListenableFuture load( DestinationTable dest, SourceFormat sourceFormat, Iterable sourceUris) throws Exception { Job job = new Job() .setConfiguration(new JobConfiguration() .setLoad(new JobConfigurationLoad() .setWriteDisposition(dest.getWriteDisposition().toString()) .setSourceFormat(sourceFormat.toString()) .setSourceUris(ImmutableList.copyOf(sourceUris)) .setDestinationTable(dest.getTableReference()))); return Futures.transform(runJobToCompletion(job, dest), new UpdateTableFunction()); } /** * Starts an asynchronous query job to populate the specified destination table with the results * of the specified query, or if the table is a view, to update the view to reflect that query. * Returns a ListenableFuture that holds the same destination table object on success. */ public ListenableFuture query( String querySql, DestinationTable dest) { if (dest.type == TableType.VIEW) { // Use Futures.transform() rather than calling apply() directly so that any exceptions thrown // by calling UpdateTableFunction will be propagated on the get() call, not from here. return Futures.transform( Futures.immediateFuture(dest.withQuery(querySql)), new UpdateTableFunction()); } else { Job job = new Job() .setConfiguration(new JobConfiguration() .setQuery(new JobConfigurationQuery() .setQuery(querySql) .setDefaultDataset(getDataset()) .setWriteDisposition(dest.getWriteDisposition().toString()) .setDestinationTable(dest.getTableReference()))); return Futures.transform(runJobToCompletion(job, dest), new UpdateTableFunction()); } } /** * Starts an asynchronous query job to dump the results of the specified query into a local * ImmutableTable object, row-keyed by the row number (indexed from 1), column-keyed by the * TableFieldSchema for that column, and with the value object as the cell value. Note that null * values will not actually be null, but they can be checked for using Data.isNull(). * *

Returns a ListenableFuture that holds the ImmutableTable on success. */ public ListenableFuture> queryToLocalTable(String querySql) throws Exception { Job job = new Job() .setConfiguration(new JobConfiguration() .setQuery(new JobConfigurationQuery() .setQuery(querySql) .setDefaultDataset(getDataset()))); return Futures.transform( runJobToCompletion(job), new Function>() { @Override public ImmutableTable apply(Job job) { return getQueryResults(job); }}); } /** * Returns the query results for the given job as an ImmutableTable, row-keyed by row number * (indexed from 1), column-keyed by the TableFieldSchema for that field, and with the value * object as the cell value. Note that null values will not actually be null (since we're using * ImmutableTable) but they can be checked for using Data.isNull(). * *

This table is fully materialized in memory (not lazily loaded), so it should not be used * with queries expected to return large results. */ private ImmutableTable getQueryResults(Job job) { try { ImmutableTable.Builder builder = new ImmutableTable.Builder<>(); String pageToken = null; int rowNumber = 1; while (true) { GetQueryResultsResponse queryResults = bigquery.jobs() .getQueryResults(getProjectId(), job.getJobReference().getJobId()) .setPageToken(pageToken) .execute(); // If the job isn't complete yet, retry; getQueryResults() waits for up to 10 seconds on // each invocation so this will effectively poll for completion. if (queryResults.getJobComplete()) { List schemaFields = queryResults.getSchema().getFields(); for (TableRow row : queryResults.getRows()) { Iterator fieldIterator = schemaFields.iterator(); Iterator cellIterator = row.getF().iterator(); while (fieldIterator.hasNext() && cellIterator.hasNext()) { builder.put(rowNumber, fieldIterator.next(), cellIterator.next().getV()); } rowNumber++; } pageToken = queryResults.getPageToken(); if (pageToken == null) { break; } } } return builder.build(); } catch (IOException e) { throw BigqueryJobFailureException.create(e); } } /** * Starts an asynchronous job to extract the specified source table and output it to the * given GCS filepath in the specified destination format, optionally printing headers. * Returns a ListenableFuture that holds the destination GCS URI on success. */ private ListenableFuture extractTable( DestinationTable sourceTable, String destinationUri, DestinationFormat destinationFormat, boolean printHeader) { checkArgument(sourceTable.type == TableType.TABLE); Job job = new Job() .setConfiguration(new JobConfiguration() .setExtract(new JobConfigurationExtract() .setSourceTable(sourceTable.getTableReference()) .setDestinationFormat(destinationFormat.toString()) .setDestinationUris(ImmutableList.of(destinationUri)) .setPrintHeader(printHeader))); return runJobToCompletion(job, destinationUri); } /** * Starts an asynchronous job to extract the specified source table or view and output it to the * given GCS filepath in the specified destination format, optionally printing headers. * Returns a ListenableFuture that holds the destination GCS URI on success. */ public ListenableFuture extract( DestinationTable sourceTable, String destinationUri, DestinationFormat destinationFormat, boolean printHeader) { if (sourceTable.type == TableType.TABLE) { return extractTable(sourceTable, destinationUri, destinationFormat, printHeader); } else { // We can't extract directly from a view, so instead extract from a query dumping that view. return extractQuery( SqlTemplate .create("SELECT * FROM [%DATASET%.%TABLE%]") .put("DATASET", sourceTable.getTableReference().getDatasetId()) .put("TABLE", sourceTable.getTableReference().getTableId()) .build(), destinationUri, destinationFormat, printHeader); } } /** * Starts an asynchronous job to run the provided query, store the results in a temporary table, * and then extract the contents of that table to the given GCS filepath in the specified * destination format, optionally printing headers. * *

Returns a ListenableFuture that holds the destination GCS URI on success. */ public ListenableFuture extractQuery( String querySql, final String destinationUri, final DestinationFormat destinationFormat, final boolean printHeader) { // Note: although BigQuery queries save their results to an auto-generated anonymous table, // we can't rely on that for running the extract job because it may not be fully replicated. // Tracking bug for query-to-GCS support is b/13777340. DestinationTable tempTable = buildTemporaryTable().build(); return Futures.transformAsync( query(querySql, tempTable), new AsyncFunction() { @Override public ListenableFuture apply(DestinationTable tempTable) { return extractTable(tempTable, destinationUri, destinationFormat, printHeader); } }); } /** @see #runJob(Job, AbstractInputStreamContent) */ public Job runJob(Job job) { return runJob(job, null); } /** * Lanuch a job, wait for it to complete, but do not check for errors. * * @throws BigqueryJobFailureException */ public Job runJob(Job job, @Nullable AbstractInputStreamContent data) { return checkJob(waitForJob(launchJob(job, data))); } /** * Lanuch a job, but do not wait for it to complete. * * @throws BigqueryJobFailureException */ private Job launchJob(Job job, @Nullable AbstractInputStreamContent data) { verify(job.getStatus() == null); try { return data != null ? bigquery.jobs().insert(getProjectId(), job, data).execute() : bigquery.jobs().insert(getProjectId(), job).execute(); } catch (IOException e) { throw BigqueryJobFailureException.create(e); } } /** * Synchronously waits for a job to complete that's already been launched. * * @throws BigqueryJobFailureException */ private Job waitForJob(Job job) { verify(job.getStatus() != null); while (!job.getStatus().getState().equals("DONE")) { sleeper.sleepUninterruptibly(pollInterval); JobReference ref = job.getJobReference(); try { job = bigquery.jobs().get(ref.getProjectId(), ref.getJobId()).execute(); } catch (IOException e) { throw BigqueryJobFailureException.create(e); } } return job; } /** * Checks completed job for errors. * * @throws BigqueryJobFailureException */ private static Job checkJob(Job job) { verify(job.getStatus() != null); JobStatus jobStatus = job.getStatus(); if (jobStatus.getErrorResult() != null) { throw BigqueryJobFailureException.create(jobStatus); } else { logger.info(summarizeCompletedJob(job)); if (jobStatus.getErrors() != null) { for (ErrorProto error : jobStatus.getErrors()) { logger.warning(String.format("%s: %s", error.getReason(), error.getMessage())); } } return job; } } /** Returns a summarization of a completed job's statistics for logging. */ private static String summarizeCompletedJob(Job job) { JobStatistics stats = job.getStatistics(); return String.format( "Job took %,.3f seconds after a %,.3f second delay and processed %,d bytes (%s)", (stats.getEndTime() - stats.getStartTime()) / 1000.0, (stats.getStartTime() - stats.getCreationTime()) / 1000.0, stats.getTotalBytesProcessed(), toJobReferenceString(job.getJobReference())); } private ListenableFuture runJobToCompletion(Job job, T result) { return runJobToCompletion(job, result, null); } /** Runs job and returns a future that yields {@code result} when {@code job} is completed. */ private ListenableFuture runJobToCompletion( final Job job, final T result, @Nullable final AbstractInputStreamContent data) { return service.submit(new Callable() { @Override public T call() { runJob(job, data); return result; }}); } private ListenableFuture runJobToCompletion(final Job job) { return service.submit(new Callable() { @Override public Job call() { return runJob(job, null); }}); } /** Helper that returns true if a dataset with this name exists. */ public boolean checkDatasetExists(String datasetName) throws IOException { try { bigquery.datasets().get(getProjectId(), datasetName).execute(); return true; } catch (GoogleJsonResponseException e) { if (e.getDetails().getCode() == 404) { return false; } throw e; } } /** Helper that returns true if a table with this name and dataset name exists. */ public boolean checkTableExists(String datasetName, String tableName) throws IOException { try { bigquery.tables().get(getProjectId(), datasetName, tableName).execute(); return true; } catch (GoogleJsonResponseException e) { if (e.getDetails().getCode() == 404) { return false; } throw e; } } /** Returns the projectId set by the environment, or {@code null} if none is set. */ public static String getEnvironmentProjectId() { return RegistryEnvironment.get().config().getProjectId(); } /** Returns the projectId associated with this bigquery connection. */ public String getProjectId() { return getEnvironmentProjectId(); } /** Returns the dataset name that this bigquery connection uses by default. */ public String getDatasetId() { return datasetId; } /** Returns dataset reference that can be used to avoid having to specify dataset in SQL code. */ public DatasetReference getDataset() { return new DatasetReference() .setProjectId(getProjectId()) .setDatasetId(getDatasetId()); } /** Returns table reference with the projectId and datasetId filled out for you. */ public TableReference getTable(String tableName) { return new TableReference() .setProjectId(getProjectId()) .setDatasetId(getDatasetId()) .setTableId(tableName); } /** * Helper that creates a dataset with this name if it doesn't already exist, and returns true * if creation took place. */ public boolean createDatasetIfNeeded(String datasetName) throws IOException { if (!checkDatasetExists(datasetName)) { bigquery.datasets() .insert(getProjectId(), new Dataset().setDatasetReference(new DatasetReference() .setProjectId(getProjectId()) .setDatasetId(datasetName))) .execute(); System.err.printf("Created dataset: %s:%s\n", getProjectId(), datasetName); return true; } return false; } /** Create a table from a SQL query if it doesn't already exist. */ public TableReference ensureTable(TableReference table, String sqlQuery) { try { runJob(new Job() .setConfiguration(new JobConfiguration() .setQuery(new JobConfigurationQuery() .setQuery(sqlQuery) .setDefaultDataset(getDataset()) .setDestinationTable(table)))); } catch (BigqueryJobFailureException e) { if (e.getReason().equals("duplicate")) { // Table already exists. } else { throw e; } } return table; } }