Implement a util class to manage push queues using Cloud Tasks API (#1290)

* Implement a util class to manage push queues using Cloud Tasks API

Push queues were part of App Engine when they debuted. As a result the
Task Queue API were part of the App Engine SDK and can only be used in
App Engine classic runtime. The new Cloud Tasks API can be used in any
runtime but it only supports push queues. In this PR we implement a util
class (CloudTasksUtils) like TaskQueueUtils to handle enqueuing tasks to
push queues using Cloud Tasks. One action (TldFanoutAction) was
converted to use the new API as a demo. Mass migration of other call sites of
the old API will follow in a separate PR.

TESTED=deployed to alpha and verified that tasks are corrected enqueued
and executed.
This commit is contained in:
Lai Jiang 2021-08-24 21:13:54 -04:00 committed by GitHub
parent d3b07f6ab0
commit bc62e13e41
113 changed files with 3822 additions and 1994 deletions

View file

@ -0,0 +1,144 @@
// Copyright 2021 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.util;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.tasks.v2.AppEngineHttpRequest;
import com.google.cloud.tasks.v2.AppEngineRouting;
import com.google.cloud.tasks.v2.CloudTasksClient;
import com.google.cloud.tasks.v2.HttpMethod;
import com.google.cloud.tasks.v2.QueueName;
import com.google.cloud.tasks.v2.Task;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Multimap;
import com.google.common.collect.Streams;
import com.google.common.escape.Escaper;
import com.google.common.flogger.FluentLogger;
import com.google.common.net.HttpHeaders;
import com.google.common.net.MediaType;
import com.google.common.net.UrlEscapers;
import com.google.protobuf.ByteString;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import javax.inject.Provider;
/** Utilities for dealing with Cloud Tasks. */
public class CloudTasksUtils implements Serializable {
private static final long serialVersionUID = -7605156291755534069L;
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private final Retrier retrier;
private final String projectId;
private final String locationId;
private final Provider<CloudTasksClient> clientProvider;
public CloudTasksUtils(
Retrier retrier,
String projectId,
String locationId,
Provider<CloudTasksClient> clientProvider) {
this.retrier = retrier;
this.projectId = projectId;
this.locationId = locationId;
this.clientProvider = clientProvider;
}
public Task enqueue(String queue, Task task) {
return retrier.callWithRetry(
() -> {
logger.atInfo().log(
"Enqueuing queue='%s' endpoint='%s' service='%s'",
queue,
task.getAppEngineHttpRequest().getRelativeUri(),
task.getAppEngineHttpRequest().getAppEngineRouting().getService());
try (CloudTasksClient client = clientProvider.get()) {
return client.createTask(QueueName.of(projectId, locationId, queue), task);
}
},
ApiException.class);
}
public ImmutableList<Task> enqueue(String queue, Iterable<Task> tasks) {
return Streams.stream(tasks).map(task -> enqueue(queue, task)).collect(toImmutableList());
}
public ImmutableList<Task> enqueue(String queue, Task... tasks) {
return enqueue(queue, Arrays.asList(tasks));
}
/**
* Create a {@link Task} to be enqueued.
*
* @param path the relative URI (staring with a slash and ending without one).
* @param method the HTTP method to be used for the request, only GET and POST are supported.
* @param service the App Engine service to route the request to. Note that with App Engine Task
* Queue API if no service is specified, the service which enqueues the task will be used to
* process the task. Cloud Tasks API does not support this feature so the service will always
* needs to be explicitly specified.
* @param params A multi-map of URL query parameters. Duplicate keys are saved as is, and it is up
* to the server to process the duplicate keys.
* @return the enqueued task.
* @see <a
* href=ttps://cloud.google.com/appengine/docs/standard/java/taskqueue/push/creating-tasks#target>Specifyinig
* the worker service</a>
*/
private static Task createTask(
String path, HttpMethod method, String service, Multimap<String, String> params) {
checkArgument(
path != null && !path.isEmpty() && path.charAt(0) == '/',
"The path must start with a '/'.");
AppEngineHttpRequest.Builder requestBuilder =
AppEngineHttpRequest.newBuilder()
.setHttpMethod(method)
.setAppEngineRouting(AppEngineRouting.newBuilder().setService(service).build());
Escaper escaper = UrlEscapers.urlPathSegmentEscaper();
String encodedParams =
Joiner.on("&")
.join(
params.entries().stream()
.map(
entry ->
String.format(
"%s=%s",
escaper.escape(entry.getKey()), escaper.escape(entry.getValue())))
.collect(toImmutableList()));
if (method == HttpMethod.GET) {
path = String.format("%s?%s", path, encodedParams);
} else if (method == HttpMethod.POST) {
requestBuilder
.putHeaders(HttpHeaders.CONTENT_TYPE, MediaType.FORM_DATA.toString())
.setBody(ByteString.copyFrom(encodedParams, StandardCharsets.UTF_8));
} else {
throw new IllegalArgumentException(
String.format("HTTP method %s is used. Only GET and POST are allowed.", method));
}
requestBuilder.setRelativeUri(path);
return Task.newBuilder().setAppEngineHttpRequest(requestBuilder.build()).build();
}
public static Task createPostTask(String path, String service, Multimap<String, String> params) {
return createTask(path, HttpMethod.POST, service, params);
}
public static Task createGetTask(String path, String service, Multimap<String, String> params) {
return createTask(path, HttpMethod.GET, service, params);
}
}

View file

@ -0,0 +1,117 @@
// Copyright 2021 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.util;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.google.cloud.tasks.v2.CloudTasksClient;
import com.google.cloud.tasks.v2.HttpMethod;
import com.google.cloud.tasks.v2.QueueName;
import com.google.cloud.tasks.v2.Task;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.LinkedListMultimap;
import google.registry.testing.FakeClock;
import google.registry.testing.FakeSleeper;
import java.nio.charset.StandardCharsets;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
/** Unit tests for {@link CloudTasksUtils}. */
public class CloudTasksUtilsTest {
// Use a LinkedListMultimap to preserve order of the inserted entries for assertion.
private final LinkedListMultimap<String, String> params = LinkedListMultimap.create();
private final CloudTasksClient mockClient = mock(CloudTasksClient.class);
private final CloudTasksUtils cloudTasksUtils =
new CloudTasksUtils(
new Retrier(new FakeSleeper(new FakeClock()), 1),
"project",
"location",
() -> mockClient);
@BeforeEach
void beforeEach() {
params.put("key1", "val1");
params.put("key2", "val2");
params.put("key1", "val3");
when(mockClient.createTask(any(QueueName.class), any(Task.class)))
.thenAnswer(invocation -> invocation.getArgument(1));
}
@Test
void testSuccess_createGetTasks() {
Task task = CloudTasksUtils.createGetTask("/the/path", "myservice", params);
assertThat(task.getAppEngineHttpRequest().getHttpMethod()).isEqualTo(HttpMethod.GET);
assertThat(task.getAppEngineHttpRequest().getRelativeUri())
.isEqualTo("/the/path?key1=val1&key2=val2&key1=val3");
assertThat(task.getAppEngineHttpRequest().getAppEngineRouting().getService())
.isEqualTo("myservice");
}
@Test
void testSuccess_createPostTasks() {
Task task = CloudTasksUtils.createPostTask("/the/path", "myservice", params);
assertThat(task.getAppEngineHttpRequest().getHttpMethod()).isEqualTo(HttpMethod.POST);
assertThat(task.getAppEngineHttpRequest().getRelativeUri()).isEqualTo("/the/path");
assertThat(task.getAppEngineHttpRequest().getAppEngineRouting().getService())
.isEqualTo("myservice");
assertThat(task.getAppEngineHttpRequest().getHeadersMap().get("Content-Type"))
.isEqualTo("application/x-www-form-urlencoded");
assertThat(task.getAppEngineHttpRequest().getBody().toString(StandardCharsets.UTF_8))
.isEqualTo("key1=val1&key2=val2&key1=val3");
}
@Test
void testFailure_illegalPath() {
assertThrows(
IllegalArgumentException.class,
() -> CloudTasksUtils.createPostTask("the/path", "myservice", params));
assertThrows(
IllegalArgumentException.class,
() -> CloudTasksUtils.createPostTask(null, "myservice", params));
assertThrows(
IllegalArgumentException.class,
() -> CloudTasksUtils.createPostTask("", "myservice", params));
}
@Test
void testSuccess_enqueueTask() {
Task task = CloudTasksUtils.createGetTask("/the/path", "myservice", params);
cloudTasksUtils.enqueue("test-queue", task);
verify(mockClient).createTask(QueueName.of("project", "location", "test-queue"), task);
}
@Test
void testSuccess_enqueueTasks_varargs() {
Task task1 = CloudTasksUtils.createGetTask("/the/path", "myservice", params);
Task task2 = CloudTasksUtils.createGetTask("/other/path", "yourservice", params);
cloudTasksUtils.enqueue("test-queue", task1, task2);
verify(mockClient).createTask(QueueName.of("project", "location", "test-queue"), task1);
verify(mockClient).createTask(QueueName.of("project", "location", "test-queue"), task2);
}
@Test
void testSuccess_enqueueTasks_iterable() {
Task task1 = CloudTasksUtils.createGetTask("/the/path", "myservice", params);
Task task2 = CloudTasksUtils.createGetTask("/other/path", "yourservice", params);
cloudTasksUtils.enqueue("test-queue", ImmutableList.of(task1, task2));
verify(mockClient).createTask(QueueName.of("project", "location", "test-queue"), task1);
verify(mockClient).createTask(QueueName.of("project", "location", "test-queue"), task2);
}
}