// Copyright 2016 The Nomulus Authors. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package google.registry.export; import static com.google.appengine.api.taskqueue.QueueFactory.getQueue; import static com.google.appengine.api.taskqueue.TaskOptions.Builder.withUrl; import static google.registry.bigquery.BigqueryUtils.toJobReferenceString; import com.google.api.services.bigquery.Bigquery; import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobReference; import com.google.appengine.api.taskqueue.Queue; import com.google.appengine.api.taskqueue.TaskHandle; import com.google.appengine.api.taskqueue.TaskOptions; import com.google.appengine.api.taskqueue.TaskOptions.Method; import dagger.Lazy; import google.registry.request.Action; import google.registry.request.Header; import google.registry.request.HttpException.BadRequestException; import google.registry.request.HttpException.NotModifiedException; import google.registry.request.Payload; import google.registry.util.FormattingLogger; import google.registry.util.TaskEnqueuer; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import javax.inject.Inject; import org.joda.time.Duration; /** * An action which polls the state of a bigquery job. If it is completed then it will log its * completion state; otherwise it will return a failure code so that the task will be retried. */ @Action( path = BigqueryPollJobAction.PATH, method = {Action.Method.GET, Action.Method.POST}, automaticallyPrintOk = true) public class BigqueryPollJobAction implements Runnable { private static final FormattingLogger logger = FormattingLogger.getLoggerForCallerClass(); static final String QUEUE = "export-bigquery-poll"; // See queue.xml static final String PATH = "/_dr/task/pollBigqueryJob"; // See web.xml static final String CHAINED_TASK_QUEUE_HEADER = "X-DomainRegistry-ChainedTaskQueue"; static final String PROJECT_ID_HEADER = "X-DomainRegistry-ProjectId"; static final String JOB_ID_HEADER = "X-DomainRegistry-JobId"; static final Duration POLL_COUNTDOWN = Duration.standardSeconds(20); @Inject Bigquery bigquery; @Inject TaskEnqueuer enqueuer; @Inject @Header(CHAINED_TASK_QUEUE_HEADER) Lazy chainedQueueName; @Inject @Header(PROJECT_ID_HEADER) String projectId; @Inject @Header(JOB_ID_HEADER) String jobId; @Inject @Payload byte[] payload; @Inject BigqueryPollJobAction() {} @Override public void run() { checkJobOutcome(); // Throws a NotModifiedException if the job hasn't completed. if (payload == null || payload.length == 0) { return; } // If there is a payload, it's a chained task, so enqueue it. TaskOptions task; try { task = (TaskOptions) new ObjectInputStream(new ByteArrayInputStream(payload)).readObject(); } catch (ClassNotFoundException | IOException e) { logger.severe(e, e.toString()); throw new BadRequestException("Cannot deserialize task from payload", e); } String taskName = enqueuer.enqueue(getQueue(chainedQueueName.get()), task).getName(); logger.infofmt( "Added chained task %s for %s to queue %s: %s", taskName, task.getUrl(), chainedQueueName.get(), task.toString()); } /** * Returns true if the provided job succeeded, false if it failed, and throws an exception if it * is still pending. */ private boolean checkJobOutcome() { Job job = null; String jobRefString = toJobReferenceString(new JobReference().setProjectId(projectId).setJobId(jobId)); try { job = bigquery.jobs().get(projectId, jobId).execute(); } catch (IOException e) { // We will throw a new exception because done==false, but first log this exception. logger.warning(e, e.getMessage()); } // If job is not yet done, then throw an exception so that we'll return a failing HTTP status // code and the task will be retried. if (job == null || !job.getStatus().getState().equals("DONE")) { throw new NotModifiedException(jobRefString); } // Check if the job ended with an error. if (job.getStatus().getErrorResult() != null) { logger.severefmt("Bigquery job failed - %s - %s", jobRefString, job); return false; } logger.infofmt("Bigquery job succeeded - %s", jobRefString); return true; } /** Helper class to enqueue a bigquery poll job. */ public static class BigqueryPollJobEnqueuer { private final TaskEnqueuer enqueuer; @Inject BigqueryPollJobEnqueuer(TaskEnqueuer enqueuer) { this.enqueuer = enqueuer; } /** Enqueue a task to poll for the success or failure of the referenced BigQuery job. */ public TaskHandle enqueuePollTask(JobReference jobRef) { return enqueuer.enqueue(getQueue(QUEUE), createCommonPollTask(jobRef).method(Method.GET)); } /** * Enqueue a task to poll for the success or failure of the referenced BigQuery job and to * launch the provided task in the specified queue if the job succeeds. */ public TaskHandle enqueuePollTask( JobReference jobRef, TaskOptions chainedTask, Queue chainedTaskQueue) throws IOException { // Serialize the chainedTask into a byte array to put in the task payload. ByteArrayOutputStream taskBytes = new ByteArrayOutputStream(); new ObjectOutputStream(taskBytes).writeObject(chainedTask); return enqueuer.enqueue( getQueue(QUEUE), createCommonPollTask(jobRef) .method(Method.POST) .header(CHAINED_TASK_QUEUE_HEADER, chainedTaskQueue.getQueueName()) .payload(taskBytes.toByteArray())); } /** * Enqueue a task to poll for the success or failure of the referenced BigQuery job and to * launch the provided task in the specified queue if the job succeeds. */ private static TaskOptions createCommonPollTask(JobReference jobRef) { // Omit host header so that task will be run on the current backend/module. return withUrl(PATH) .countdownMillis(POLL_COUNTDOWN.getMillis()) .header(PROJECT_ID_HEADER, jobRef.getProjectId()) .header(JOB_ID_HEADER, jobRef.getJobId()); } } }