Change TaskQueueUtils to CloudTaskUtils in CommitLogFanoutAction (#1408)

* Change TaskOptions to Task in CommitLogFanoutAction

* Add a createTask method that takes clock and jitterSeconds

* Change CreateTask parameter type and improve test cases

* Improve comments and test casse

* Improve test cases that handel jitterSeconds
This commit is contained in:
Rachel Guan 2021-11-17 10:54:42 -05:00 committed by GitHub
parent 8915df8e87
commit 573f14514a
4 changed files with 199 additions and 24 deletions

View file

@ -16,6 +16,7 @@ package google.registry.util;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.concurrent.TimeUnit.SECONDS;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.tasks.v2.AppEngineHttpRequest;
@ -34,9 +35,13 @@ import com.google.common.net.HttpHeaders;
import com.google.common.net.MediaType;
import com.google.common.net.UrlEscapers;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Arrays;
import java.util.Optional;
import java.util.Random;
import java.util.function.Supplier;
/** Utilities for dealing with Cloud Tasks. */
@ -44,6 +49,7 @@ public class CloudTasksUtils implements Serializable {
private static final long serialVersionUID = -7605156291755534069L;
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private static final Random random = new Random();
private final Retrier retrier;
private final String projectId;
@ -88,7 +94,7 @@ public class CloudTasksUtils implements Serializable {
* Queue API if no service is specified, the service which enqueues the task will be used to
* process the task. Cloud Tasks API does not support this feature so the service will always
* needs to be explicitly specified.
* @param params A multi-map of URL query parameters. Duplicate keys are saved as is, and it is up
* @param params a multi-map of URL query parameters. Duplicate keys are saved as is, and it is up
* to the server to process the duplicate keys.
* @return the enqueued task.
* @see <a
@ -129,6 +135,49 @@ public class CloudTasksUtils implements Serializable {
return Task.newBuilder().setAppEngineHttpRequest(requestBuilder.build()).build();
}
/**
* Create a {@link Task} to be enqueued with a random delay up to {@code jitterSeconds}.
*
* @param path the relative URI (staring with a slash and ending without one).
* @param method the HTTP method to be used for the request, only GET and POST are supported.
* @param service the App Engine service to route the request to. Note that with App Engine Task
* Queue API if no service is specified, the service which enqueues the task will be used to
* process the task. Cloud Tasks API does not support this feature so the service will always
* needs to be explicitly specified.
* @param params a multi-map of URL query parameters. Duplicate keys are saved as is, and it is up
* to the server to process the duplicate keys.
* @param clock a source of time.
* @param jitterSeconds the number of seconds that a task is randomly delayed up to.
* @return the enqueued task.
* @see <a
* href=ttps://cloud.google.com/appengine/docs/standard/java/taskqueue/push/creating-tasks#target>Specifyinig
* the worker service</a>
*/
private static Task createTask(
String path,
HttpMethod method,
String service,
Multimap<String, String> params,
Clock clock,
Optional<Integer> jitterSeconds) {
if (jitterSeconds.isEmpty() || jitterSeconds.get() <= 0) {
return createTask(path, method, service, params);
}
Instant scheduleTime =
Instant.ofEpochMilli(
clock
.nowUtc()
.plusMillis(random.nextInt((int) SECONDS.toMillis(jitterSeconds.get())))
.getMillis());
return Task.newBuilder(createTask(path, method, service, params))
.setScheduleTime(
Timestamp.newBuilder()
.setSeconds(scheduleTime.getEpochSecond())
.setNanos(scheduleTime.getNano())
.build())
.build();
}
public static Task createPostTask(String path, String service, Multimap<String, String> params) {
return createTask(path, HttpMethod.POST, service, params);
}
@ -137,6 +186,30 @@ public class CloudTasksUtils implements Serializable {
return createTask(path, HttpMethod.GET, service, params);
}
/**
* Create a {@link Task} via HTTP.POST that will be randomly delayed up to {@code jitterSeconds}.
*/
public static Task createPostTask(
String path,
String service,
Multimap<String, String> params,
Clock clock,
Optional<Integer> jitterSeconds) {
return createTask(path, HttpMethod.POST, service, params, clock, jitterSeconds);
}
/**
* Create a {@link Task} via HTTP.GET that will be randomly delayed up to {@code jitterSeconds}.
*/
public static Task createGetTask(
String path,
String service,
Multimap<String, String> params,
Clock clock,
Optional<Integer> jitterSeconds) {
return createTask(path, HttpMethod.GET, service, params, clock, jitterSeconds);
}
public abstract static class SerializableCloudTasksClient implements Serializable {
public abstract Task enqueue(String projectId, String locationId, String queueName, Task task);
}

View file

@ -30,6 +30,9 @@ import google.registry.testing.FakeClock;
import google.registry.testing.FakeSleeper;
import google.registry.util.CloudTasksUtils.SerializableCloudTasksClient;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Optional;
import org.joda.time.DateTime;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -41,6 +44,7 @@ public class CloudTasksUtilsTest {
private final CloudTasksUtils cloudTasksUtils =
new CloudTasksUtils(
new Retrier(new FakeSleeper(new FakeClock()), 1), "project", "location", mockClient);
private final Clock clock = new FakeClock(DateTime.parse("2021-11-08"));
@BeforeEach
void beforeEach() {
@ -59,6 +63,7 @@ public class CloudTasksUtilsTest {
.isEqualTo("/the/path?key1=val1&key2=val2&key1=val3");
assertThat(task.getAppEngineHttpRequest().getAppEngineRouting().getService())
.isEqualTo("myservice");
assertThat(task.getScheduleTime().getSeconds()).isEqualTo(0);
}
@Test
@ -72,6 +77,103 @@ public class CloudTasksUtilsTest {
.isEqualTo("application/x-www-form-urlencoded");
assertThat(task.getAppEngineHttpRequest().getBody().toString(StandardCharsets.UTF_8))
.isEqualTo("key1=val1&key2=val2&key1=val3");
assertThat(task.getScheduleTime().getSeconds()).isEqualTo(0);
}
@SuppressWarnings("ProtoTimestampGetSecondsGetNano")
@Test
void testSuccess_createGetTasks_withJitterSeconds() {
Task task =
CloudTasksUtils.createGetTask("/the/path", "myservice", params, clock, Optional.of(100));
assertThat(task.getAppEngineHttpRequest().getHttpMethod()).isEqualTo(HttpMethod.GET);
assertThat(task.getAppEngineHttpRequest().getRelativeUri())
.isEqualTo("/the/path?key1=val1&key2=val2&key1=val3");
assertThat(task.getAppEngineHttpRequest().getAppEngineRouting().getService())
.isEqualTo("myservice");
Instant scheduleTime = Instant.ofEpochSecond(task.getScheduleTime().getSeconds());
Instant lowerBoundTime = Instant.ofEpochMilli(clock.nowUtc().getMillis());
Instant upperBound = Instant.ofEpochMilli(clock.nowUtc().plusSeconds(100).getMillis());
assertThat(scheduleTime.isBefore(lowerBoundTime)).isFalse();
assertThat(upperBound.isBefore(scheduleTime)).isFalse();
}
@SuppressWarnings("ProtoTimestampGetSecondsGetNano")
@Test
void testSuccess_createPostTasks_withJitterSeconds() {
Task task =
CloudTasksUtils.createPostTask("/the/path", "myservice", params, clock, Optional.of(1));
assertThat(task.getAppEngineHttpRequest().getHttpMethod()).isEqualTo(HttpMethod.POST);
assertThat(task.getAppEngineHttpRequest().getRelativeUri()).isEqualTo("/the/path");
assertThat(task.getAppEngineHttpRequest().getAppEngineRouting().getService())
.isEqualTo("myservice");
assertThat(task.getAppEngineHttpRequest().getHeadersMap().get("Content-Type"))
.isEqualTo("application/x-www-form-urlencoded");
assertThat(task.getAppEngineHttpRequest().getBody().toString(StandardCharsets.UTF_8))
.isEqualTo("key1=val1&key2=val2&key1=val3");
assertThat(task.getScheduleTime().getSeconds()).isNotEqualTo(0);
Instant scheduleTime = Instant.ofEpochSecond(task.getScheduleTime().getSeconds());
Instant lowerBoundTime = Instant.ofEpochMilli(clock.nowUtc().getMillis());
Instant upperBound = Instant.ofEpochMilli(clock.nowUtc().plusSeconds(1).getMillis());
assertThat(scheduleTime.isBefore(lowerBoundTime)).isFalse();
assertThat(upperBound.isBefore(scheduleTime)).isFalse();
}
@Test
void testSuccess_createPostTasks_withEmptyJitterSeconds() {
Task task =
CloudTasksUtils.createPostTask("/the/path", "myservice", params, clock, Optional.empty());
assertThat(task.getAppEngineHttpRequest().getHttpMethod()).isEqualTo(HttpMethod.POST);
assertThat(task.getAppEngineHttpRequest().getRelativeUri()).isEqualTo("/the/path");
assertThat(task.getAppEngineHttpRequest().getAppEngineRouting().getService())
.isEqualTo("myservice");
assertThat(task.getAppEngineHttpRequest().getHeadersMap().get("Content-Type"))
.isEqualTo("application/x-www-form-urlencoded");
assertThat(task.getAppEngineHttpRequest().getBody().toString(StandardCharsets.UTF_8))
.isEqualTo("key1=val1&key2=val2&key1=val3");
assertThat(task.getScheduleTime().getSeconds()).isEqualTo(0);
}
@Test
void testSuccess_createGetTasks_withEmptyJitterSeconds() {
Task task =
CloudTasksUtils.createGetTask("/the/path", "myservice", params, clock, Optional.empty());
assertThat(task.getAppEngineHttpRequest().getHttpMethod()).isEqualTo(HttpMethod.GET);
assertThat(task.getAppEngineHttpRequest().getRelativeUri())
.isEqualTo("/the/path?key1=val1&key2=val2&key1=val3");
assertThat(task.getAppEngineHttpRequest().getAppEngineRouting().getService())
.isEqualTo("myservice");
assertThat(task.getScheduleTime().getSeconds()).isEqualTo(0);
}
@Test
void testSuccess_createPostTasks_withZeroJitterSeconds() {
Task task =
CloudTasksUtils.createPostTask("/the/path", "myservice", params, clock, Optional.of(0));
assertThat(task.getAppEngineHttpRequest().getHttpMethod()).isEqualTo(HttpMethod.POST);
assertThat(task.getAppEngineHttpRequest().getRelativeUri()).isEqualTo("/the/path");
assertThat(task.getAppEngineHttpRequest().getAppEngineRouting().getService())
.isEqualTo("myservice");
assertThat(task.getAppEngineHttpRequest().getHeadersMap().get("Content-Type"))
.isEqualTo("application/x-www-form-urlencoded");
assertThat(task.getAppEngineHttpRequest().getBody().toString(StandardCharsets.UTF_8))
.isEqualTo("key1=val1&key2=val2&key1=val3");
assertThat(task.getScheduleTime().getSeconds()).isEqualTo(0);
}
@Test
void testSuccess_createGetTasks_withZeroJitterSeconds() {
Task task =
CloudTasksUtils.createGetTask("/the/path", "myservice", params, clock, Optional.of(0));
assertThat(task.getAppEngineHttpRequest().getHttpMethod()).isEqualTo(HttpMethod.GET);
assertThat(task.getAppEngineHttpRequest().getRelativeUri())
.isEqualTo("/the/path?key1=val1&key2=val2&key1=val3");
assertThat(task.getAppEngineHttpRequest().getAppEngineRouting().getService())
.isEqualTo("myservice");
assertThat(task.getScheduleTime().getSeconds()).isEqualTo(0);
}
@Test