mirror of
https://github.com/google/nomulus.git
synced 2025-07-21 10:16:07 +02:00
Refactor LordnTask to LordnTaskUtils
Made it clear that it is a util class and moved some of the functions only used in NordnUploadAction (to NordnUploadAction). Also used Retrier to handle retries when leasing tasks. These changes allow us to no longer use InjectRule in related unit tests. ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=217761117
This commit is contained in:
parent
0f1f418034
commit
b254269d2f
10 changed files with 170 additions and 176 deletions
|
@ -19,21 +19,27 @@ import static com.google.appengine.api.taskqueue.TaskOptions.Builder.withUrl;
|
|||
import static com.google.appengine.api.urlfetch.FetchOptions.Builder.validateCertificate;
|
||||
import static com.google.appengine.api.urlfetch.HTTPMethod.POST;
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
import static com.google.common.net.HttpHeaders.LOCATION;
|
||||
import static com.google.common.net.MediaType.CSV_UTF_8;
|
||||
import static google.registry.tmch.LordnTask.COLUMNS_CLAIMS;
|
||||
import static google.registry.tmch.LordnTask.COLUMNS_SUNRISE;
|
||||
import static google.registry.tmch.LordnTask.convertTasksToCsv;
|
||||
import static google.registry.tmch.LordnTaskUtils.COLUMNS_CLAIMS;
|
||||
import static google.registry.tmch.LordnTaskUtils.COLUMNS_SUNRISE;
|
||||
import static google.registry.util.UrlFetchUtils.getHeaderFirst;
|
||||
import static google.registry.util.UrlFetchUtils.setPayloadMultipart;
|
||||
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||
import static javax.servlet.http.HttpServletResponse.SC_ACCEPTED;
|
||||
|
||||
import com.google.appengine.api.taskqueue.LeaseOptions;
|
||||
import com.google.appengine.api.taskqueue.Queue;
|
||||
import com.google.appengine.api.taskqueue.TaskHandle;
|
||||
import com.google.appengine.api.taskqueue.TaskOptions;
|
||||
import com.google.appengine.api.taskqueue.TransientFailureException;
|
||||
import com.google.appengine.api.urlfetch.HTTPRequest;
|
||||
import com.google.appengine.api.urlfetch.HTTPResponse;
|
||||
import com.google.appengine.api.urlfetch.URLFetchService;
|
||||
import com.google.apphosting.api.DeadlineExceededException;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.flogger.FluentLogger;
|
||||
import google.registry.config.RegistryConfig.Config;
|
||||
import google.registry.request.Action;
|
||||
|
@ -41,6 +47,7 @@ import google.registry.request.Parameter;
|
|||
import google.registry.request.RequestParameters;
|
||||
import google.registry.request.auth.Auth;
|
||||
import google.registry.util.Clock;
|
||||
import google.registry.util.Retrier;
|
||||
import google.registry.util.TaskQueueUtils;
|
||||
import google.registry.util.UrlFetchException;
|
||||
import java.io.IOException;
|
||||
|
@ -48,29 +55,30 @@ import java.net.URL;
|
|||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import javax.inject.Inject;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Duration;
|
||||
|
||||
/**
|
||||
* Action that reads the NORDN pull queues, uploads claims and sunrise marks data to TMCH, and
|
||||
* enqueues subsequent upload verification tasks. A unique actionLogId is generated and passed
|
||||
* along to the verify action so that connected verify tasks can be identified by looking at logs.
|
||||
* enqueues subsequent upload verification tasks. A unique actionLogId is generated and passed along
|
||||
* to the verify action so that connected verify tasks can be identified by looking at logs.
|
||||
*
|
||||
* @see NordnVerifyAction
|
||||
*/
|
||||
@Action(
|
||||
path = NordnUploadAction.PATH,
|
||||
method = Action.Method.POST,
|
||||
automaticallyPrintOk = true,
|
||||
auth = Auth.AUTH_INTERNAL_ONLY
|
||||
)
|
||||
path = NordnUploadAction.PATH,
|
||||
method = Action.Method.POST,
|
||||
automaticallyPrintOk = true,
|
||||
auth = Auth.AUTH_INTERNAL_ONLY)
|
||||
public final class NordnUploadAction implements Runnable {
|
||||
|
||||
static final String PATH = "/_dr/task/nordnUpload";
|
||||
static final String LORDN_PHASE_PARAM = "lordn-phase";
|
||||
|
||||
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
|
||||
private static final Duration LEASE_PERIOD = Duration.standardHours(1);
|
||||
|
||||
/**
|
||||
* A unique (enough) id that is outputted in log lines to make it clear which log lines are
|
||||
|
@ -80,6 +88,7 @@ public final class NordnUploadAction implements Runnable {
|
|||
private final String actionLogId = String.valueOf(1000000000 + new Random().nextInt(1000000000));
|
||||
|
||||
@Inject Clock clock;
|
||||
@Inject Retrier retrier;
|
||||
@Inject Random random;
|
||||
@Inject LordnRequestInitializer lordnRequestInitializer;
|
||||
@Inject URLFetchService fetchService;
|
||||
|
@ -108,15 +117,54 @@ public final class NordnUploadAction implements Runnable {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts a list of queue tasks, each containing a row of CSV data, into a single newline-
|
||||
* delimited String.
|
||||
*/
|
||||
static String convertTasksToCsv(List<TaskHandle> tasks, DateTime now, String columns) {
|
||||
String header = String.format("1,%s,%d\n%s\n", now, tasks.size(), columns);
|
||||
StringBuilder csv = new StringBuilder(header);
|
||||
for (TaskHandle task : checkNotNull(tasks)) {
|
||||
String payload = new String(task.getPayload(), UTF_8);
|
||||
if (!Strings.isNullOrEmpty(payload)) {
|
||||
csv.append(payload).append("\n");
|
||||
}
|
||||
}
|
||||
return csv.toString();
|
||||
}
|
||||
|
||||
/** Leases and returns all tasks from the queue with the specified tag tld, in batches. */
|
||||
List<TaskHandle> loadAllTasks(Queue queue, String tld) {
|
||||
ImmutableList.Builder<TaskHandle> allTasks = new ImmutableList.Builder<>();
|
||||
while (true) {
|
||||
List<TaskHandle> tasks =
|
||||
retrier.callWithRetry(
|
||||
() ->
|
||||
queue.leaseTasks(
|
||||
LeaseOptions.Builder.withTag(tld)
|
||||
.leasePeriod(LEASE_PERIOD.getMillis(), TimeUnit.MILLISECONDS)
|
||||
.countLimit(TaskQueueUtils.getBatchSize())),
|
||||
TransientFailureException.class,
|
||||
DeadlineExceededException.class);
|
||||
if (tasks.isEmpty()) {
|
||||
return allTasks.build();
|
||||
}
|
||||
allTasks.addAll(tasks);
|
||||
}
|
||||
}
|
||||
|
||||
private void processLordnTasks() throws IOException {
|
||||
checkArgument(phase.equals(PARAM_LORDN_PHASE_SUNRISE)
|
||||
|| phase.equals(PARAM_LORDN_PHASE_CLAIMS),
|
||||
"Invalid phase specified to Nordn servlet: %s.", phase);
|
||||
DateTime now = clock.nowUtc();
|
||||
Queue queue = getQueue(
|
||||
phase.equals(PARAM_LORDN_PHASE_SUNRISE) ? LordnTask.QUEUE_SUNRISE : LordnTask.QUEUE_CLAIMS);
|
||||
Queue queue =
|
||||
getQueue(
|
||||
phase.equals(PARAM_LORDN_PHASE_SUNRISE)
|
||||
? LordnTaskUtils.QUEUE_SUNRISE
|
||||
: LordnTaskUtils.QUEUE_CLAIMS);
|
||||
String columns = phase.equals(PARAM_LORDN_PHASE_SUNRISE) ? COLUMNS_SUNRISE : COLUMNS_CLAIMS;
|
||||
List<TaskHandle> tasks = LordnTask.loadAllTasks(queue, tld);
|
||||
List<TaskHandle> tasks = loadAllTasks(queue, tld);
|
||||
if (!tasks.isEmpty()) {
|
||||
String csvData = convertTasksToCsv(tasks, now, columns);
|
||||
uploadCsvToLordn(String.format("/LORDN/%s/%s", tld, phase), csvData);
|
||||
|
@ -146,16 +194,19 @@ public final class NordnUploadAction implements Runnable {
|
|||
actionLogId, rsp.getResponseCode(), rsp.getContent());
|
||||
if (rsp.getResponseCode() != SC_ACCEPTED) {
|
||||
throw new UrlFetchException(
|
||||
String.format("LORDN upload task %s error: Failed to upload LORDN claims to MarksDB",
|
||||
actionLogId),
|
||||
req, rsp);
|
||||
String.format(
|
||||
"LORDN upload task %s error: Failed to upload LORDN claims to MarksDB", actionLogId),
|
||||
req,
|
||||
rsp);
|
||||
}
|
||||
Optional<String> location = getHeaderFirst(rsp, LOCATION);
|
||||
if (!location.isPresent()) {
|
||||
throw new UrlFetchException(
|
||||
String.format("LORDN upload task %s error: MarksDB failed to provide a Location header",
|
||||
String.format(
|
||||
"LORDN upload task %s error: MarksDB failed to provide a Location header",
|
||||
actionLogId),
|
||||
req, rsp);
|
||||
req,
|
||||
rsp);
|
||||
}
|
||||
getQueue(NordnVerifyAction.QUEUE).add(makeVerifyTask(new URL(location.get())));
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue