Consolidate BigQuery handling into one place

I'm writing a follow-up CL that will send integrity checking data to
BigQuery, and that is made a lot easier by centralizing the BigQuery
connection logic.
-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=119375766
This commit is contained in:
mcilwain 2016-04-08 08:44:04 -07:00 committed by Justine Tunney
parent c880a042a7
commit 755fce9e52
12 changed files with 329 additions and 347 deletions

View file

@ -14,19 +14,12 @@
package com.google.domain.registry.monitoring.whitebox;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.domain.registry.util.HttpServletUtils.getRequiredParameterValue;
import com.google.api.client.extensions.appengine.http.UrlFetchTransport;
import com.google.api.client.googleapis.extensions.appengine.auth.oauth2.AppIdentityCredential;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.BigqueryScopes;
import com.google.api.services.bigquery.model.TableDataInsertAllRequest;
import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
import com.google.api.services.bigquery.model.TableDataInsertAllResponse.InsertErrors;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.collect.FluentIterable;
@ -35,13 +28,11 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.domain.registry.bigquery.BigqueryFactory;
import com.google.domain.registry.bigquery.BigqueryHelper;
import com.google.domain.registry.config.RegistryEnvironment;
import com.google.domain.registry.util.FormattingLogger;
import com.google.domain.registry.util.NonFinalForTesting;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import javax.servlet.http.HttpServlet;
@ -59,25 +50,11 @@ public class MetricsTaskServlet extends HttpServlet {
private static final Set<String> SPECIAL_PARAMS = ImmutableSet.of("tableId", "insertId");
// Add any concrete Metric classes to this map or doPost() will throw IllegalArgumentException.
private static final Map<String, ImmutableList<TableFieldSchema>> KNOWN_TABLE_SCHEMAS =
ImmutableMap.of(EppMetrics.TABLE_ID, EppMetrics.SCHEMA_FIELDS);
// servlet level cross-request caches to avoid unnecessary RPCs.
@NonFinalForTesting
private static Set<String> knownTables = Sets.newConcurrentHashSet();
@NonFinalForTesting
private static Set<String> datasets = Sets.newConcurrentHashSet();
@NonFinalForTesting
private static BigqueryFactory bigqueryFactory = new BigqueryFactory();
@NonFinalForTesting
private static BigqueryHelper bigqueryHelper = new BigqueryHelper();
/** Returns a filtered {@link ImmutableMap} from an {@link HttpServletRequest} */
private static ImmutableMap<String, Object> getFiteredMapFromRequest(
private static ImmutableMap<String, Object> getFilteredMapFromRequest(
HttpServletRequest req,
Set<String> filter) {
ImmutableMap.Builder<String, Object> b = new ImmutableMap.Builder<>();
@ -97,35 +74,8 @@ public class MetricsTaskServlet extends HttpServlet {
public void doPost(HttpServletRequest req, HttpServletResponse rsp) throws IOException {
try {
final String tableId = getRequiredParameterValue(req, "tableId");
ImmutableMap<String, Object> fields = getFiteredMapFromRequest(req, SPECIAL_PARAMS);
final Bigquery bigquery = bigqueryFactory.create(
getClass().getSimpleName(),
new UrlFetchTransport(),
new JacksonFactory(),
new AppIdentityCredential(BigqueryScopes.all()));
// Note: it's safe for multiple threads to call this as the dataset will
// only be created once.
if (!datasets.contains(DATASET_ID)) {
bigqueryHelper.ensureDataset(bigquery, PROJECT_ID, DATASET_ID);
datasets.add(DATASET_ID);
}
checkArgument(KNOWN_TABLE_SCHEMAS.containsKey(tableId), "Unknown table ID: %s", tableId);
if (!knownTables.contains(tableId)) {
bigqueryHelper.ensureTable(
bigquery,
new TableReference()
.setDatasetId(DATASET_ID)
.setProjectId(PROJECT_ID)
.setTableId(tableId),
KNOWN_TABLE_SCHEMAS.get(tableId));
knownTables.add(tableId);
}
ImmutableMap<String, Object> fields = getFilteredMapFromRequest(req, SPECIAL_PARAMS);
Bigquery bigquery = bigqueryFactory.create(PROJECT_ID, DATASET_ID, tableId);
TableDataInsertAllResponse response = bigquery.tabledata()
.insertAll(