google-nomulus/java/google/registry/export/BigqueryPollJobAction.java
guyben e224a67eda Change @Auth to an AutoValue, and created a set of predefined Auths
We want to be safer and more explicit about the authentication needed by the many actions that exist.

As such, we make the 'auth' parameter required in @Action (so it's always clear who can run a specific action) and we replace the @Auth with an enum so that only pre-approved configurations that are aptly named and documented can be used.

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=162210306
2017-08-01 16:33:10 -04:00

172 lines
6.8 KiB
Java

// Copyright 2017 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.export;
import static com.google.appengine.api.taskqueue.QueueFactory.getQueue;
import static com.google.appengine.api.taskqueue.TaskOptions.Builder.withUrl;
import static google.registry.bigquery.BigqueryUtils.toJobReferenceString;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobReference;
import com.google.appengine.api.taskqueue.Queue;
import com.google.appengine.api.taskqueue.TaskHandle;
import com.google.appengine.api.taskqueue.TaskOptions;
import com.google.appengine.api.taskqueue.TaskOptions.Method;
import dagger.Lazy;
import google.registry.request.Action;
import google.registry.request.Header;
import google.registry.request.HttpException.BadRequestException;
import google.registry.request.HttpException.NotModifiedException;
import google.registry.request.Payload;
import google.registry.request.auth.Auth;
import google.registry.util.FormattingLogger;
import google.registry.util.TaskEnqueuer;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import javax.inject.Inject;
import org.joda.time.Duration;
/**
* An action which polls the state of a bigquery job. If it is completed then it will log its
* completion state; otherwise it will return a failure code so that the task will be retried.
*/
@Action(
path = BigqueryPollJobAction.PATH,
method = {Action.Method.GET, Action.Method.POST},
automaticallyPrintOk = true,
auth = Auth.AUTH_INTERNAL_ONLY
)
public class BigqueryPollJobAction implements Runnable {
private static final FormattingLogger logger = FormattingLogger.getLoggerForCallerClass();
static final String QUEUE = "export-bigquery-poll"; // See queue.xml
static final String PATH = "/_dr/task/pollBigqueryJob"; // See web.xml
static final String CHAINED_TASK_QUEUE_HEADER = "X-DomainRegistry-ChainedTaskQueue";
static final String PROJECT_ID_HEADER = "X-DomainRegistry-ProjectId";
static final String JOB_ID_HEADER = "X-DomainRegistry-JobId";
static final Duration POLL_COUNTDOWN = Duration.standardSeconds(20);
@Inject Bigquery bigquery;
@Inject TaskEnqueuer enqueuer;
@Inject @Header(CHAINED_TASK_QUEUE_HEADER) Lazy<String> chainedQueueName;
@Inject @Header(PROJECT_ID_HEADER) String projectId;
@Inject @Header(JOB_ID_HEADER) String jobId;
@Inject @Payload byte[] payload;
@Inject BigqueryPollJobAction() {}
@Override
public void run() {
checkJobOutcome(); // Throws a NotModifiedException if the job hasn't completed.
if (payload == null || payload.length == 0) {
return;
}
// If there is a payload, it's a chained task, so enqueue it.
TaskOptions task;
try {
task = (TaskOptions) new ObjectInputStream(new ByteArrayInputStream(payload)).readObject();
} catch (ClassNotFoundException | IOException e) {
logger.severe(e, e.toString());
throw new BadRequestException("Cannot deserialize task from payload", e);
}
String taskName = enqueuer.enqueue(getQueue(chainedQueueName.get()), task).getName();
logger.infofmt(
"Added chained task %s for %s to queue %s: %s",
taskName,
task.getUrl(),
chainedQueueName.get(),
task.toString());
}
/**
* Returns true if the provided job succeeded, false if it failed, and throws an exception if it
* is still pending.
*/
private boolean checkJobOutcome() {
Job job = null;
String jobRefString =
toJobReferenceString(new JobReference().setProjectId(projectId).setJobId(jobId));
try {
job = bigquery.jobs().get(projectId, jobId).execute();
} catch (IOException e) {
// We will throw a new exception because done==false, but first log this exception.
logger.warning(e, e.getMessage());
}
// If job is not yet done, then throw an exception so that we'll return a failing HTTP status
// code and the task will be retried.
if (job == null || !job.getStatus().getState().equals("DONE")) {
throw new NotModifiedException(jobRefString);
}
// Check if the job ended with an error.
if (job.getStatus().getErrorResult() != null) {
logger.severefmt("Bigquery job failed - %s - %s", jobRefString, job);
return false;
}
logger.infofmt("Bigquery job succeeded - %s", jobRefString);
return true;
}
/** Helper class to enqueue a bigquery poll job. */
public static class BigqueryPollJobEnqueuer {
private final TaskEnqueuer enqueuer;
@Inject
BigqueryPollJobEnqueuer(TaskEnqueuer enqueuer) {
this.enqueuer = enqueuer;
}
/** Enqueue a task to poll for the success or failure of the referenced BigQuery job. */
public TaskHandle enqueuePollTask(JobReference jobRef) {
return enqueuer.enqueue(getQueue(QUEUE), createCommonPollTask(jobRef).method(Method.GET));
}
/**
* Enqueue a task to poll for the success or failure of the referenced BigQuery job and to
* launch the provided task in the specified queue if the job succeeds.
*/
public TaskHandle enqueuePollTask(
JobReference jobRef, TaskOptions chainedTask, Queue chainedTaskQueue) throws IOException {
// Serialize the chainedTask into a byte array to put in the task payload.
ByteArrayOutputStream taskBytes = new ByteArrayOutputStream();
new ObjectOutputStream(taskBytes).writeObject(chainedTask);
return enqueuer.enqueue(
getQueue(QUEUE),
createCommonPollTask(jobRef)
.method(Method.POST)
.header(CHAINED_TASK_QUEUE_HEADER, chainedTaskQueue.getQueueName())
.payload(taskBytes.toByteArray()));
}
/**
* Enqueue a task to poll for the success or failure of the referenced BigQuery job and to
* launch the provided task in the specified queue if the job succeeds.
*/
private static TaskOptions createCommonPollTask(JobReference jobRef) {
// Omit host header so that task will be run on the current backend/module.
return withUrl(PATH)
.countdownMillis(POLL_COUNTDOWN.getMillis())
.header(PROJECT_ID_HEADER, jobRef.getProjectId())
.header(JOB_ID_HEADER, jobRef.getJobId());
}
}
}