Add standardSQL views to Bigquery Datastore snapshots

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=163124895
This commit is contained in:
larryruili 2017-07-25 14:38:47 -07:00 committed by Ben McIlwain
parent 8869814e96
commit d2cd576796
2 changed files with 94 additions and 41 deletions

View file

@ -35,36 +35,48 @@ import java.io.IOException;
import javax.inject.Inject;
/** Update a well-known view to point at a certain Datastore snapshot table in BigQuery. */
@Action(
path = UpdateSnapshotViewAction.PATH,
method = POST,
auth = Auth.AUTH_INTERNAL_ONLY
)
@Action(path = UpdateSnapshotViewAction.PATH, method = POST, auth = Auth.AUTH_INTERNAL_ONLY)
public class UpdateSnapshotViewAction implements Runnable {
/** Headers for passing parameters into the servlet. */
static final String UPDATE_SNAPSHOT_DATASET_ID_PARAM = "dataset";
static final String UPDATE_SNAPSHOT_TABLE_ID_PARAM = "table";
static final String UPDATE_SNAPSHOT_KIND_PARAM = "kind";
static final String LATEST_SNAPSHOT_DATASET = "latest_snapshot";
static final String LEGACY_LATEST_SNAPSHOT_DATASET = "latest_snapshot";
static final String STANDARD_LATEST_SNAPSHOT_DATASET = "latest_datastore_export";
/** Servlet-specific details needed for enqueuing tasks against itself. */
static final String QUEUE = "export-snapshot-update-view"; // See queue.xml.
static final String PATH = "/_dr/task/updateSnapshotView"; // See web.xml.
private static final FormattingLogger logger = FormattingLogger.getLoggerForCallerClass();
@Inject @Parameter(UPDATE_SNAPSHOT_DATASET_ID_PARAM) String datasetId;
@Inject @Parameter(UPDATE_SNAPSHOT_TABLE_ID_PARAM) String tableId;
@Inject @Parameter(UPDATE_SNAPSHOT_KIND_PARAM) String kindName;
@Inject @Config("projectId") String projectId;
@Inject
@Parameter(UPDATE_SNAPSHOT_DATASET_ID_PARAM)
String datasetId;
@Inject
@Parameter(UPDATE_SNAPSHOT_TABLE_ID_PARAM)
String tableId;
@Inject
@Parameter(UPDATE_SNAPSHOT_KIND_PARAM)
String kindName;
@Inject
@Config("projectId")
String projectId;
@Inject BigqueryFactory bigqueryFactory;
@Inject UpdateSnapshotViewAction() {}
@Inject
UpdateSnapshotViewAction() {}
/** Create a task for updating a snapshot view. */
public static TaskOptions createViewUpdateTask(
String datasetId, String tableId, String kindName) {
static TaskOptions createViewUpdateTask(String datasetId, String tableId, String kindName) {
return TaskOptions.Builder.withUrl(PATH)
.method(Method.POST)
.param(UPDATE_SNAPSHOT_DATASET_ID_PARAM, datasetId)
@ -75,39 +87,62 @@ public class UpdateSnapshotViewAction implements Runnable {
@Override
public void run() {
try {
updateSnapshotView(datasetId, tableId, kindName);
// TODO(b/32377148): Remove the legacySql view when migration complete.
SqlTemplate legacyTemplate =
SqlTemplate.create(
"#legacySQL\nSELECT * FROM [%PROJECT%:%SOURCE_DATASET%.%SOURCE_TABLE%]");
updateSnapshotView(
datasetId, tableId, kindName, LEGACY_LATEST_SNAPSHOT_DATASET, legacyTemplate);
SqlTemplate standardTemplate =
SqlTemplate.create(
"#standardSQL\nSELECT * FROM `%PROJECT%.%SOURCE_DATASET%.%SOURCE_TABLE%`");
updateSnapshotView(
datasetId, tableId, kindName, STANDARD_LATEST_SNAPSHOT_DATASET, standardTemplate);
} catch (Throwable e) {
logger.severefmt(e, "Could not update snapshot view for table %s", tableId);
throw new InternalServerErrorException("Error in update snapshot view action");
}
}
private void updateSnapshotView(String datasetId, String tableId, String kindName)
private void updateSnapshotView(
String sourceDatasetId,
String sourceTableId,
String kindName,
String viewDataset,
SqlTemplate viewQueryTemplate)
throws IOException {
Bigquery bigquery = bigqueryFactory.create(projectId, LATEST_SNAPSHOT_DATASET);
updateTable(bigquery, new Table()
.setTableReference(new TableReference()
Bigquery bigquery = bigqueryFactory.create(projectId, viewDataset);
updateTable(
bigquery,
new Table()
.setTableReference(
new TableReference()
.setProjectId(projectId)
.setDatasetId(LATEST_SNAPSHOT_DATASET)
.setDatasetId(viewDataset)
.setTableId(kindName))
.setView(new ViewDefinition().setQuery(
SqlTemplate.create("SELECT * FROM [%PROJECT%:%DATASET%.%TABLE%]")
.setView(
new ViewDefinition()
.setQuery(
viewQueryTemplate
.put("PROJECT", projectId)
.put("DATASET", datasetId)
.put("TABLE", tableId)
.put("SOURCE_DATASET", sourceDatasetId)
.put("SOURCE_TABLE", sourceTableId)
.build())));
logger.infofmt(
"Updated view %s to point at snapshot table %s.",
String.format("[%s:%s.%s]", projectId, LATEST_SNAPSHOT_DATASET, kindName),
String.format("[%s:%s.%s]", projectId, datasetId, tableId));
String.format("[%s:%s.%s]", projectId, viewDataset, kindName),
String.format("[%s:%s.%s]", projectId, sourceDatasetId, sourceTableId));
}
private static void updateTable(Bigquery bigquery, Table table) throws IOException {
TableReference ref = table.getTableReference();
try {
bigquery.tables()
bigquery
.tables()
.update(ref.getProjectId(), ref.getDatasetId(), ref.getTableId(), table)
.execute();
} catch (GoogleJsonResponseException e) {

View file

@ -25,13 +25,15 @@ import static google.registry.testing.TaskQueueHelper.assertTasksEnqueued;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.model.Dataset;
import com.google.api.services.bigquery.model.Table;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import google.registry.bigquery.BigqueryFactory;
import google.registry.request.HttpException.InternalServerErrorException;
import google.registry.testing.AppEngineRule;
@ -44,6 +46,7 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
/** Unit tests for {@link UpdateSnapshotViewAction}. */
@RunWith(JUnit4.class)
@ -101,16 +104,31 @@ public class UpdateSnapshotViewActionTest {
public void testSuccess_doPost() throws Exception {
action.run();
InOrder factoryOrder = inOrder(bigqueryFactory);
// Check that the BigQuery factory was called in such a way that the dataset would be created
// if it didn't already exist.
verify(bigqueryFactory).create("myproject", "latest_snapshot");
factoryOrder.verify(bigqueryFactory).create("myproject", "latest_snapshot");
factoryOrder.verify(bigqueryFactory).create("myproject", "latest_datastore_export");
// Check that we updated the view.
// Check that we updated both views
InOrder tableOrder = inOrder(bigqueryTables);
ArgumentCaptor<Table> tableArg = ArgumentCaptor.forClass(Table.class);
verify(bigqueryTables).update(
eq("myproject"), eq("latest_snapshot"), eq("fookind"), tableArg.capture());
assertThat(tableArg.getValue().getView().getQuery())
.isEqualTo("SELECT * FROM [myproject:some_dataset.12345_fookind]");
tableOrder.verify(bigqueryTables)
.update(eq("myproject"), eq("latest_snapshot"), eq("fookind"), tableArg.capture());
tableOrder.verify(bigqueryTables)
.update(eq("myproject"), eq("latest_datastore_export"), eq("fookind"), tableArg.capture());
Iterable<String> actualQueries =
Iterables.transform(
tableArg.getAllValues(),
new Function<Table, String>() {
@Override
public String apply(Table table) {
return table.getView().getQuery();
}
});
assertThat(actualQueries).containsExactly(
"#legacySQL\nSELECT * FROM [myproject:some_dataset.12345_fookind]",
"#standardSQL\nSELECT * FROM `myproject.some_dataset.12345_fookind`");
}
@Test