Make a few quality-of-life improvements in CloudTasksUtils (#1521)

* Make a few quality-of-life improvements in CloudTasksUtils

1. Update the method names. There are too many overloaded methods and it
   is hard to figure out which one does which without checking the
   javadoc.

2. Added a method in the task matcher to specify the delay time in
   DateTime, so the caller does not need to convert it to Timestamp.

3. Remove the expilict dependency on a clock when enqueueing a task with
   delay, the clock is now injected directly into the util instance
   itself.
This commit is contained in:
Lai Jiang 2022-02-18 20:21:56 -05:00 committed by GitHub
parent 88f274a601
commit d355da362f
31 changed files with 119 additions and 123 deletions

View file

@ -35,10 +35,9 @@ import com.google.common.net.HttpHeaders;
import com.google.common.net.MediaType;
import com.google.common.net.UrlEscapers;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import com.google.protobuf.util.Timestamps;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Arrays;
import java.util.Optional;
import java.util.Random;
@ -53,13 +52,19 @@ public class CloudTasksUtils implements Serializable {
private static final Random random = new Random();
private final Retrier retrier;
private final Clock clock;
private final String projectId;
private final String locationId;
private final SerializableCloudTasksClient client;
public CloudTasksUtils(
Retrier retrier, String projectId, String locationId, SerializableCloudTasksClient client) {
Retrier retrier,
Clock clock,
String projectId,
String locationId,
SerializableCloudTasksClient client) {
this.retrier = retrier;
this.clock = clock;
this.projectId = projectId;
this.locationId = locationId;
this.client = client;
@ -102,7 +107,7 @@ public class CloudTasksUtils implements Serializable {
* href=ttps://cloud.google.com/appengine/docs/standard/java/taskqueue/push/creating-tasks#target>Specifyinig
* the worker service</a>
*/
private static Task createTask(
private Task createTask(
String path, HttpMethod method, String service, Multimap<String, String> params) {
checkArgument(
path != null && !path.isEmpty() && path.charAt(0) == '/',
@ -151,29 +156,26 @@ public class CloudTasksUtils implements Serializable {
* needs to be explicitly specified.
* @param params a multi-map of URL query parameters. Duplicate keys are saved as is, and it is up
* to the server to process the duplicate keys.
* @param clock a source of time.
* @param jitterSeconds the number of seconds that a task is randomly delayed up to.
* @return the enqueued task.
* @see <a
* href=ttps://cloud.google.com/appengine/docs/standard/java/taskqueue/push/creating-tasks#target>Specifyinig
* the worker service</a>
*/
private static Task createTask(
private Task createTaskWithJitter(
String path,
HttpMethod method,
String service,
Multimap<String, String> params,
Clock clock,
Optional<Integer> jitterSeconds) {
if (!jitterSeconds.isPresent() || jitterSeconds.get() <= 0) {
return createTask(path, method, service, params);
}
return createTask(
return createTaskWithDelay(
path,
method,
service,
params,
clock,
Duration.millis(random.nextInt((int) SECONDS.toMillis(jitterSeconds.get()))));
}
@ -188,76 +190,67 @@ public class CloudTasksUtils implements Serializable {
* needs to be explicitly specified.
* @param params a multi-map of URL query parameters. Duplicate keys are saved as is, and it is up
* to the server to process the duplicate keys.
* @param clock a source of time.
* @param delay the amount of time that a task needs to delayed for.
* @return the enqueued task.
* @see <a
* href=ttps://cloud.google.com/appengine/docs/standard/java/taskqueue/push/creating-tasks#target>Specifyinig
* the worker service</a>
*/
private static Task createTask(
private Task createTaskWithDelay(
String path,
HttpMethod method,
String service,
Multimap<String, String> params,
Clock clock,
Duration delay) {
if (delay.isEqual(Duration.ZERO)) {
return createTask(path, method, service, params);
}
checkArgument(delay.isLongerThan(Duration.ZERO), "Negative duration is not supported.");
Instant scheduleTime = Instant.ofEpochMilli(clock.nowUtc().getMillis() + delay.getMillis());
return Task.newBuilder(createTask(path, method, service, params))
.setScheduleTime(
Timestamp.newBuilder()
.setSeconds(scheduleTime.getEpochSecond())
.setNanos(scheduleTime.getNano())
.build())
.setScheduleTime(Timestamps.fromMillis(clock.nowUtc().plus(delay).getMillis()))
.build();
}
public static Task createPostTask(String path, String service, Multimap<String, String> params) {
public Task createPostTask(String path, String service, Multimap<String, String> params) {
return createTask(path, HttpMethod.POST, service, params);
}
public static Task createGetTask(String path, String service, Multimap<String, String> params) {
public Task createGetTask(String path, String service, Multimap<String, String> params) {
return createTask(path, HttpMethod.GET, service, params);
}
/**
* Create a {@link Task} via HTTP.POST that will be randomly delayed up to {@code jitterSeconds}.
*/
public static Task createPostTask(
public Task createPostTaskWithJitter(
String path,
String service,
Multimap<String, String> params,
Clock clock,
Optional<Integer> jitterSeconds) {
return createTask(path, HttpMethod.POST, service, params, clock, jitterSeconds);
return createTaskWithJitter(path, HttpMethod.POST, service, params, jitterSeconds);
}
/**
* Create a {@link Task} via HTTP.GET that will be randomly delayed up to {@code jitterSeconds}.
*/
public static Task createGetTask(
public Task createGetTaskWithJitter(
String path,
String service,
Multimap<String, String> params,
Clock clock,
Optional<Integer> jitterSeconds) {
return createTask(path, HttpMethod.GET, service, params, clock, jitterSeconds);
return createTaskWithJitter(path, HttpMethod.GET, service, params, jitterSeconds);
}
/** Create a {@link Task} via HTTP.POST that will be delayed for {@code delay}. */
public static Task createPostTask(
String path, String service, Multimap<String, String> params, Clock clock, Duration delay) {
return createTask(path, HttpMethod.POST, service, params, clock, delay);
public Task createPostTaskWithDelay(
String path, String service, Multimap<String, String> params, Duration delay) {
return createTaskWithDelay(path, HttpMethod.POST, service, params, delay);
}
/** Create a {@link Task} via HTTP.GET that will be delayed for {@code delay}. */
public static Task createGetTask(
String path, String service, Multimap<String, String> params, Clock clock, Duration delay) {
return createTask(path, HttpMethod.GET, service, params, clock, delay);
public Task createGetTaskWithDelay(
String path, String service, Multimap<String, String> params, Duration delay) {
return createTaskWithDelay(path, HttpMethod.GET, service, params, delay);
}
public abstract static class SerializableCloudTasksClient implements Serializable {

View file

@ -43,10 +43,10 @@ public class CloudTasksUtilsTest {
// Use a LinkedListMultimap to preserve order of the inserted entries for assertion.
private final LinkedListMultimap<String, String> params = LinkedListMultimap.create();
private final SerializableCloudTasksClient mockClient = mock(SerializableCloudTasksClient.class);
private final FakeClock clock = new FakeClock(DateTime.parse("2021-11-08"));
private final CloudTasksUtils cloudTasksUtils =
new CloudTasksUtils(
new Retrier(new FakeSleeper(new FakeClock()), 1), "project", "location", mockClient);
private final Clock clock = new FakeClock(DateTime.parse("2021-11-08"));
new Retrier(new FakeSleeper(clock), 1), clock, "project", "location", mockClient);
@BeforeEach
void beforeEach() {
@ -59,7 +59,7 @@ public class CloudTasksUtilsTest {
@Test
void testSuccess_createGetTasks() {
Task task = CloudTasksUtils.createGetTask("/the/path", "myservice", params);
Task task = cloudTasksUtils.createGetTask("/the/path", "myservice", params);
assertThat(task.getAppEngineHttpRequest().getHttpMethod()).isEqualTo(HttpMethod.GET);
assertThat(task.getAppEngineHttpRequest().getRelativeUri())
.isEqualTo("/the/path?key1=val1&key2=val2&key1=val3");
@ -70,7 +70,7 @@ public class CloudTasksUtilsTest {
@Test
void testSuccess_createPostTasks() {
Task task = CloudTasksUtils.createPostTask("/the/path", "myservice", params);
Task task = cloudTasksUtils.createPostTask("/the/path", "myservice", params);
assertThat(task.getAppEngineHttpRequest().getHttpMethod()).isEqualTo(HttpMethod.POST);
assertThat(task.getAppEngineHttpRequest().getRelativeUri()).isEqualTo("/the/path");
assertThat(task.getAppEngineHttpRequest().getAppEngineRouting().getService())
@ -84,7 +84,7 @@ public class CloudTasksUtilsTest {
@Test
void testSuccess_createGetTasks_withNullParams() {
Task task = CloudTasksUtils.createGetTask("/the/path", "myservice", null);
Task task = cloudTasksUtils.createGetTask("/the/path", "myservice", null);
assertThat(task.getAppEngineHttpRequest().getHttpMethod()).isEqualTo(HttpMethod.GET);
assertThat(task.getAppEngineHttpRequest().getRelativeUri()).isEqualTo("/the/path");
assertThat(task.getAppEngineHttpRequest().getAppEngineRouting().getService())
@ -94,7 +94,7 @@ public class CloudTasksUtilsTest {
@Test
void testSuccess_createPostTasks_withNullParams() {
Task task = CloudTasksUtils.createPostTask("/the/path", "myservice", null);
Task task = cloudTasksUtils.createPostTask("/the/path", "myservice", null);
assertThat(task.getAppEngineHttpRequest().getHttpMethod()).isEqualTo(HttpMethod.POST);
assertThat(task.getAppEngineHttpRequest().getRelativeUri()).isEqualTo("/the/path");
assertThat(task.getAppEngineHttpRequest().getAppEngineRouting().getService())
@ -105,7 +105,7 @@ public class CloudTasksUtilsTest {
@Test
void testSuccess_createGetTasks_withEmptyParams() {
Task task = CloudTasksUtils.createGetTask("/the/path", "myservice", ImmutableMultimap.of());
Task task = cloudTasksUtils.createGetTask("/the/path", "myservice", ImmutableMultimap.of());
assertThat(task.getAppEngineHttpRequest().getHttpMethod()).isEqualTo(HttpMethod.GET);
assertThat(task.getAppEngineHttpRequest().getRelativeUri()).isEqualTo("/the/path");
assertThat(task.getAppEngineHttpRequest().getAppEngineRouting().getService())
@ -115,7 +115,7 @@ public class CloudTasksUtilsTest {
@Test
void testSuccess_createPostTasks_withEmptyParams() {
Task task = CloudTasksUtils.createPostTask("/the/path", "myservice", ImmutableMultimap.of());
Task task = cloudTasksUtils.createPostTask("/the/path", "myservice", ImmutableMultimap.of());
assertThat(task.getAppEngineHttpRequest().getHttpMethod()).isEqualTo(HttpMethod.POST);
assertThat(task.getAppEngineHttpRequest().getRelativeUri()).isEqualTo("/the/path");
assertThat(task.getAppEngineHttpRequest().getAppEngineRouting().getService())
@ -128,7 +128,7 @@ public class CloudTasksUtilsTest {
@Test
void testSuccess_createGetTasks_withJitterSeconds() {
Task task =
CloudTasksUtils.createGetTask("/the/path", "myservice", params, clock, Optional.of(100));
cloudTasksUtils.createGetTaskWithJitter("/the/path", "myservice", params, Optional.of(100));
assertThat(task.getAppEngineHttpRequest().getHttpMethod()).isEqualTo(HttpMethod.GET);
assertThat(task.getAppEngineHttpRequest().getRelativeUri())
.isEqualTo("/the/path?key1=val1&key2=val2&key1=val3");
@ -147,7 +147,7 @@ public class CloudTasksUtilsTest {
@Test
void testSuccess_createPostTasks_withJitterSeconds() {
Task task =
CloudTasksUtils.createPostTask("/the/path", "myservice", params, clock, Optional.of(1));
cloudTasksUtils.createPostTaskWithJitter("/the/path", "myservice", params, Optional.of(1));
assertThat(task.getAppEngineHttpRequest().getHttpMethod()).isEqualTo(HttpMethod.POST);
assertThat(task.getAppEngineHttpRequest().getRelativeUri()).isEqualTo("/the/path");
assertThat(task.getAppEngineHttpRequest().getAppEngineRouting().getService())
@ -169,7 +169,8 @@ public class CloudTasksUtilsTest {
@Test
void testSuccess_createPostTasks_withEmptyJitterSeconds() {
Task task =
CloudTasksUtils.createPostTask("/the/path", "myservice", params, clock, Optional.empty());
cloudTasksUtils.createPostTaskWithJitter(
"/the/path", "myservice", params, Optional.empty());
assertThat(task.getAppEngineHttpRequest().getHttpMethod()).isEqualTo(HttpMethod.POST);
assertThat(task.getAppEngineHttpRequest().getRelativeUri()).isEqualTo("/the/path");
assertThat(task.getAppEngineHttpRequest().getAppEngineRouting().getService())
@ -184,7 +185,7 @@ public class CloudTasksUtilsTest {
@Test
void testSuccess_createGetTasks_withEmptyJitterSeconds() {
Task task =
CloudTasksUtils.createGetTask("/the/path", "myservice", params, clock, Optional.empty());
cloudTasksUtils.createGetTaskWithJitter("/the/path", "myservice", params, Optional.empty());
assertThat(task.getAppEngineHttpRequest().getHttpMethod()).isEqualTo(HttpMethod.GET);
assertThat(task.getAppEngineHttpRequest().getRelativeUri())
.isEqualTo("/the/path?key1=val1&key2=val2&key1=val3");
@ -196,7 +197,7 @@ public class CloudTasksUtilsTest {
@Test
void testSuccess_createPostTasks_withZeroJitterSeconds() {
Task task =
CloudTasksUtils.createPostTask("/the/path", "myservice", params, clock, Optional.of(0));
cloudTasksUtils.createPostTaskWithJitter("/the/path", "myservice", params, Optional.of(0));
assertThat(task.getAppEngineHttpRequest().getHttpMethod()).isEqualTo(HttpMethod.POST);
assertThat(task.getAppEngineHttpRequest().getRelativeUri()).isEqualTo("/the/path");
assertThat(task.getAppEngineHttpRequest().getAppEngineRouting().getService())
@ -211,7 +212,7 @@ public class CloudTasksUtilsTest {
@Test
void testSuccess_createGetTasks_withZeroJitterSeconds() {
Task task =
CloudTasksUtils.createGetTask("/the/path", "myservice", params, clock, Optional.of(0));
cloudTasksUtils.createGetTaskWithJitter("/the/path", "myservice", params, Optional.of(0));
assertThat(task.getAppEngineHttpRequest().getHttpMethod()).isEqualTo(HttpMethod.GET);
assertThat(task.getAppEngineHttpRequest().getRelativeUri())
.isEqualTo("/the/path?key1=val1&key2=val2&key1=val3");
@ -223,8 +224,8 @@ public class CloudTasksUtilsTest {
@Test
void testSuccess_createGetTasks_withDelay() {
Task task =
CloudTasksUtils.createGetTask(
"/the/path", "myservice", params, clock, Duration.standardMinutes(10));
cloudTasksUtils.createGetTaskWithDelay(
"/the/path", "myservice", params, Duration.standardMinutes(10));
assertThat(task.getAppEngineHttpRequest().getHttpMethod()).isEqualTo(HttpMethod.GET);
assertThat(task.getAppEngineHttpRequest().getRelativeUri())
.isEqualTo("/the/path?key1=val1&key2=val2&key1=val3");
@ -237,8 +238,8 @@ public class CloudTasksUtilsTest {
@Test
void testSuccess_createPostTasks_withDelay() {
Task task =
CloudTasksUtils.createPostTask(
"/the/path", "myservice", params, clock, Duration.standardMinutes(10));
cloudTasksUtils.createPostTaskWithDelay(
"/the/path", "myservice", params, Duration.standardMinutes(10));
assertThat(task.getAppEngineHttpRequest().getHttpMethod()).isEqualTo(HttpMethod.POST);
assertThat(task.getAppEngineHttpRequest().getRelativeUri()).isEqualTo("/the/path");
assertThat(task.getAppEngineHttpRequest().getAppEngineRouting().getService())
@ -258,8 +259,8 @@ public class CloudTasksUtilsTest {
assertThrows(
IllegalArgumentException.class,
() ->
CloudTasksUtils.createGetTask(
"/the/path", "myservice", params, clock, Duration.standardMinutes(-10)));
cloudTasksUtils.createGetTaskWithDelay(
"/the/path", "myservice", params, Duration.standardMinutes(-10)));
assertThat(thrown).hasMessageThat().isEqualTo("Negative duration is not supported.");
}
@ -269,15 +270,15 @@ public class CloudTasksUtilsTest {
assertThrows(
IllegalArgumentException.class,
() ->
CloudTasksUtils.createGetTask(
"/the/path", "myservice", params, clock, Duration.standardMinutes(-10)));
cloudTasksUtils.createGetTaskWithDelay(
"/the/path", "myservice", params, Duration.standardMinutes(-10)));
assertThat(thrown).hasMessageThat().isEqualTo("Negative duration is not supported.");
}
@Test
void testSuccess_createPostTasks_withZeroDelay() {
Task task =
CloudTasksUtils.createPostTask("/the/path", "myservice", params, clock, Duration.ZERO);
cloudTasksUtils.createPostTaskWithDelay("/the/path", "myservice", params, Duration.ZERO);
assertThat(task.getAppEngineHttpRequest().getHttpMethod()).isEqualTo(HttpMethod.POST);
assertThat(task.getAppEngineHttpRequest().getRelativeUri()).isEqualTo("/the/path");
assertThat(task.getAppEngineHttpRequest().getAppEngineRouting().getService())
@ -292,7 +293,7 @@ public class CloudTasksUtilsTest {
@Test
void testSuccess_createGetTasks_withZeroDelay() {
Task task =
CloudTasksUtils.createGetTask("/the/path", "myservice", params, clock, Duration.ZERO);
cloudTasksUtils.createGetTaskWithDelay("/the/path", "myservice", params, Duration.ZERO);
assertThat(task.getAppEngineHttpRequest().getHttpMethod()).isEqualTo(HttpMethod.GET);
assertThat(task.getAppEngineHttpRequest().getRelativeUri())
.isEqualTo("/the/path?key1=val1&key2=val2&key1=val3");
@ -305,26 +306,26 @@ public class CloudTasksUtilsTest {
void testFailure_illegalPath() {
assertThrows(
IllegalArgumentException.class,
() -> CloudTasksUtils.createPostTask("the/path", "myservice", params));
() -> cloudTasksUtils.createPostTask("the/path", "myservice", params));
assertThrows(
IllegalArgumentException.class,
() -> CloudTasksUtils.createPostTask(null, "myservice", params));
() -> cloudTasksUtils.createPostTask(null, "myservice", params));
assertThrows(
IllegalArgumentException.class,
() -> CloudTasksUtils.createPostTask("", "myservice", params));
() -> cloudTasksUtils.createPostTask("", "myservice", params));
}
@Test
void testSuccess_enqueueTask() {
Task task = CloudTasksUtils.createGetTask("/the/path", "myservice", params);
Task task = cloudTasksUtils.createGetTask("/the/path", "myservice", params);
cloudTasksUtils.enqueue("test-queue", task);
verify(mockClient).enqueue("project", "location", "test-queue", task);
}
@Test
void testSuccess_enqueueTasks_varargs() {
Task task1 = CloudTasksUtils.createGetTask("/the/path", "myservice", params);
Task task2 = CloudTasksUtils.createGetTask("/other/path", "yourservice", params);
Task task1 = cloudTasksUtils.createGetTask("/the/path", "myservice", params);
Task task2 = cloudTasksUtils.createGetTask("/other/path", "yourservice", params);
cloudTasksUtils.enqueue("test-queue", task1, task2);
verify(mockClient).enqueue("project", "location", "test-queue", task1);
verify(mockClient).enqueue("project", "location", "test-queue", task2);
@ -332,8 +333,8 @@ public class CloudTasksUtilsTest {
@Test
void testSuccess_enqueueTasks_iterable() {
Task task1 = CloudTasksUtils.createGetTask("/the/path", "myservice", params);
Task task2 = CloudTasksUtils.createGetTask("/other/path", "yourservice", params);
Task task1 = cloudTasksUtils.createGetTask("/the/path", "myservice", params);
Task task2 = cloudTasksUtils.createGetTask("/other/path", "yourservice", params);
cloudTasksUtils.enqueue("test-queue", ImmutableList.of(task1, task2));
verify(mockClient).enqueue("project", "location", "test-queue", task1);
verify(mockClient).enqueue("project", "location", "test-queue", task2);