Add support for delay of duration when scheduling a task (#1493)

* Add support for delay by duration when scheduling task

* Fix comments

* Add test for negative duration

* Change delay parameter type to duration
This commit is contained in:
Rachel Guan 2022-02-03 22:25:39 -05:00 committed by GitHub
parent 1688d27b4e
commit 98ba687005
3 changed files with 147 additions and 6 deletions

View file

@ -43,6 +43,7 @@ import java.util.Arrays;
import java.util.Optional;
import java.util.Random;
import java.util.function.Supplier;
import org.joda.time.Duration;
/** Utilities for dealing with Cloud Tasks. */
public class CloudTasksUtils implements Serializable {
@ -167,12 +168,45 @@ public class CloudTasksUtils implements Serializable {
if (!jitterSeconds.isPresent() || jitterSeconds.get() <= 0) {
return createTask(path, method, service, params);
}
Instant scheduleTime =
Instant.ofEpochMilli(
clock
.nowUtc()
.plusMillis(random.nextInt((int) SECONDS.toMillis(jitterSeconds.get())))
.getMillis());
return createTask(
path,
method,
service,
params,
clock,
Duration.millis(random.nextInt((int) SECONDS.toMillis(jitterSeconds.get()))));
}
/**
* Create a {@link Task} to be enqueued with delay of {@code duration}.
*
* @param path the relative URI (staring with a slash and ending without one).
* @param method the HTTP method to be used for the request, only GET and POST are supported.
* @param service the App Engine service to route the request to. Note that with App Engine Task
* Queue API if no service is specified, the service which enqueues the task will be used to
* process the task. Cloud Tasks API does not support this feature so the service will always
* needs to be explicitly specified.
* @param params a multi-map of URL query parameters. Duplicate keys are saved as is, and it is up
* to the server to process the duplicate keys.
* @param clock a source of time.
* @param delay the amount of time that a task needs to delayed for.
* @return the enqueued task.
* @see <a
* href=ttps://cloud.google.com/appengine/docs/standard/java/taskqueue/push/creating-tasks#target>Specifyinig
* the worker service</a>
*/
private static Task createTask(
String path,
HttpMethod method,
String service,
Multimap<String, String> params,
Clock clock,
Duration delay) {
if (delay.isEqual(Duration.ZERO)) {
return createTask(path, method, service, params);
}
checkArgument(delay.isLongerThan(Duration.ZERO), "Negative duration is not supported.");
Instant scheduleTime = Instant.ofEpochMilli(clock.nowUtc().getMillis() + delay.getMillis());
return Task.newBuilder(createTask(path, method, service, params))
.setScheduleTime(
Timestamp.newBuilder()
@ -214,6 +248,18 @@ public class CloudTasksUtils implements Serializable {
return createTask(path, HttpMethod.GET, service, params, clock, jitterSeconds);
}
/** Create a {@link Task} via HTTP.POST that will be delayed for {@code delay}. */
public static Task createPostTask(
String path, String service, Multimap<String, String> params, Clock clock, Duration delay) {
return createTask(path, HttpMethod.POST, service, params, clock, delay);
}
/** Create a {@link Task} via HTTP.GET that will be delayed for {@code delay}. */
public static Task createGetTask(
String path, String service, Multimap<String, String> params, Clock clock, Duration delay) {
return createTask(path, HttpMethod.GET, service, params, clock, delay);
}
public abstract static class SerializableCloudTasksClient implements Serializable {
public abstract Task enqueue(String projectId, String locationId, String queueName, Task task);
}

View file

@ -34,6 +34,7 @@ import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Optional;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -219,6 +220,87 @@ public class CloudTasksUtilsTest {
assertThat(task.getScheduleTime().getSeconds()).isEqualTo(0);
}
@Test
void testSuccess_createGetTasks_withDelay() {
Task task =
CloudTasksUtils.createGetTask(
"/the/path", "myservice", params, clock, Duration.standardMinutes(10));
assertThat(task.getAppEngineHttpRequest().getHttpMethod()).isEqualTo(HttpMethod.GET);
assertThat(task.getAppEngineHttpRequest().getRelativeUri())
.isEqualTo("/the/path?key1=val1&key2=val2&key1=val3");
assertThat(task.getAppEngineHttpRequest().getAppEngineRouting().getService())
.isEqualTo("myservice");
assertThat(Instant.ofEpochSecond(task.getScheduleTime().getSeconds()))
.isEqualTo(Instant.ofEpochMilli(clock.nowUtc().plusMinutes(10).getMillis()));
}
@Test
void testSuccess_createPostTasks_withDelay() {
Task task =
CloudTasksUtils.createPostTask(
"/the/path", "myservice", params, clock, Duration.standardMinutes(10));
assertThat(task.getAppEngineHttpRequest().getHttpMethod()).isEqualTo(HttpMethod.POST);
assertThat(task.getAppEngineHttpRequest().getRelativeUri()).isEqualTo("/the/path");
assertThat(task.getAppEngineHttpRequest().getAppEngineRouting().getService())
.isEqualTo("myservice");
assertThat(task.getAppEngineHttpRequest().getHeadersMap().get("Content-Type"))
.isEqualTo("application/x-www-form-urlencoded");
assertThat(task.getAppEngineHttpRequest().getBody().toString(StandardCharsets.UTF_8))
.isEqualTo("key1=val1&key2=val2&key1=val3");
assertThat(task.getScheduleTime().getSeconds()).isNotEqualTo(0);
assertThat(Instant.ofEpochSecond(task.getScheduleTime().getSeconds()))
.isEqualTo(Instant.ofEpochMilli(clock.nowUtc().plusMinutes(10).getMillis()));
}
@Test
void testFailure_createGetTasks_withNegativeDelay() {
IllegalArgumentException thrown =
assertThrows(
IllegalArgumentException.class,
() ->
CloudTasksUtils.createGetTask(
"/the/path", "myservice", params, clock, Duration.standardMinutes(-10)));
assertThat(thrown).hasMessageThat().isEqualTo("Negative duration is not supported.");
}
@Test
void testFailure_createPostTasks_withNegativeDelay() {
IllegalArgumentException thrown =
assertThrows(
IllegalArgumentException.class,
() ->
CloudTasksUtils.createGetTask(
"/the/path", "myservice", params, clock, Duration.standardMinutes(-10)));
assertThat(thrown).hasMessageThat().isEqualTo("Negative duration is not supported.");
}
@Test
void testSuccess_createPostTasks_withZeroDelay() {
Task task =
CloudTasksUtils.createPostTask("/the/path", "myservice", params, clock, Duration.ZERO);
assertThat(task.getAppEngineHttpRequest().getHttpMethod()).isEqualTo(HttpMethod.POST);
assertThat(task.getAppEngineHttpRequest().getRelativeUri()).isEqualTo("/the/path");
assertThat(task.getAppEngineHttpRequest().getAppEngineRouting().getService())
.isEqualTo("myservice");
assertThat(task.getAppEngineHttpRequest().getHeadersMap().get("Content-Type"))
.isEqualTo("application/x-www-form-urlencoded");
assertThat(task.getAppEngineHttpRequest().getBody().toString(StandardCharsets.UTF_8))
.isEqualTo("key1=val1&key2=val2&key1=val3");
assertThat(task.getScheduleTime().getSeconds()).isEqualTo(0);
}
@Test
void testSuccess_createGetTasks_withZeroDelay() {
Task task =
CloudTasksUtils.createGetTask("/the/path", "myservice", params, clock, Duration.ZERO);
assertThat(task.getAppEngineHttpRequest().getHttpMethod()).isEqualTo(HttpMethod.GET);
assertThat(task.getAppEngineHttpRequest().getRelativeUri())
.isEqualTo("/the/path?key1=val1&key2=val2&key1=val3");
assertThat(task.getAppEngineHttpRequest().getAppEngineRouting().getService())
.isEqualTo("myservice");
assertThat(task.getScheduleTime().getSeconds()).isEqualTo(0);
}
@Test
void testFailure_illegalPath() {
assertThrows(