Change from TaskQueueUtils to CloudTasksUtils in RdeStaging (#1411)

* Change from TaskQueueUtils to CloudTaskUtils in RdeStaging
This commit is contained in:
Rachel Guan 2022-02-01 20:41:56 -05:00 committed by GitHub
parent 68b0dd2595
commit de258d4dfc
3 changed files with 41 additions and 39 deletions

View file

@ -14,8 +14,6 @@
package google.registry.rde; package google.registry.rde;
import static com.google.appengine.api.taskqueue.QueueFactory.getQueue;
import static com.google.appengine.api.taskqueue.TaskOptions.Builder.withUrl;
import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify; import static com.google.common.base.Verify.verify;
import static google.registry.model.common.Cursor.getCursorTimeOrStartOfTime; import static google.registry.model.common.Cursor.getCursorTimeOrStartOfTime;
@ -26,6 +24,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import com.google.appengine.tools.mapreduce.Reducer; import com.google.appengine.tools.mapreduce.Reducer;
import com.google.appengine.tools.mapreduce.ReducerInput; import com.google.appengine.tools.mapreduce.ReducerInput;
import com.google.cloud.storage.BlobId; import com.google.cloud.storage.BlobId;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.flogger.FluentLogger; import com.google.common.flogger.FluentLogger;
import google.registry.config.RegistryConfig.Config; import google.registry.config.RegistryConfig.Config;
import google.registry.gcs.GcsUtils; import google.registry.gcs.GcsUtils;
@ -36,10 +35,11 @@ import google.registry.model.rde.RdeMode;
import google.registry.model.rde.RdeNamingUtils; import google.registry.model.rde.RdeNamingUtils;
import google.registry.model.rde.RdeRevision; import google.registry.model.rde.RdeRevision;
import google.registry.model.tld.Registry; import google.registry.model.tld.Registry;
import google.registry.request.Action.Service;
import google.registry.request.RequestParameters; import google.registry.request.RequestParameters;
import google.registry.request.lock.LockHandler; import google.registry.request.lock.LockHandler;
import google.registry.tldconfig.idn.IdnTableEnum; import google.registry.tldconfig.idn.IdnTableEnum;
import google.registry.util.TaskQueueUtils; import google.registry.util.CloudTasksUtils;
import google.registry.xjc.rdeheader.XjcRdeHeader; import google.registry.xjc.rdeheader.XjcRdeHeader;
import google.registry.xjc.rdeheader.XjcRdeHeaderElement; import google.registry.xjc.rdeheader.XjcRdeHeaderElement;
import google.registry.xml.ValidationMode; import google.registry.xml.ValidationMode;
@ -65,7 +65,7 @@ public final class RdeStagingReducer extends Reducer<PendingDeposit, DepositFrag
private static final FluentLogger logger = FluentLogger.forEnclosingClass(); private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private final TaskQueueUtils taskQueueUtils; private final CloudTasksUtils cloudTasksUtils;
private final LockHandler lockHandler; private final LockHandler lockHandler;
private final String bucket; private final String bucket;
private final Duration lockTimeout; private final Duration lockTimeout;
@ -74,14 +74,14 @@ public final class RdeStagingReducer extends Reducer<PendingDeposit, DepositFrag
private final GcsUtils gcsUtils; private final GcsUtils gcsUtils;
RdeStagingReducer( RdeStagingReducer(
TaskQueueUtils taskQueueUtils, CloudTasksUtils cloudTasksUtils,
LockHandler lockHandler, LockHandler lockHandler,
String bucket, String bucket,
Duration lockTimeout, Duration lockTimeout,
byte[] stagingKeyBytes, byte[] stagingKeyBytes,
ValidationMode validationMode, ValidationMode validationMode,
GcsUtils gcsUtils) { GcsUtils gcsUtils) {
this.taskQueueUtils = taskQueueUtils; this.cloudTasksUtils = cloudTasksUtils;
this.lockHandler = lockHandler; this.lockHandler = lockHandler;
this.bucket = bucket; this.bucket = bucket;
this.lockTimeout = lockTimeout; this.lockTimeout = lockTimeout;
@ -227,22 +227,30 @@ public final class RdeStagingReducer extends Reducer<PendingDeposit, DepositFrag
"Rolled forward %s on %s cursor to %s.", key.cursor(), tld, newPosition); "Rolled forward %s on %s cursor to %s.", key.cursor(), tld, newPosition);
RdeRevision.saveRevision(tld, watermark, mode, revision); RdeRevision.saveRevision(tld, watermark, mode, revision);
if (mode == RdeMode.FULL) { if (mode == RdeMode.FULL) {
taskQueueUtils.enqueue( cloudTasksUtils.enqueue(
getQueue("rde-upload"), "rde-upload",
withUrl(RdeUploadAction.PATH).param(RequestParameters.PARAM_TLD, tld)); CloudTasksUtils.createPostTask(
RdeUploadAction.PATH,
Service.BACKEND.toString(),
ImmutableMultimap.of(RequestParameters.PARAM_TLD, tld)));
} else { } else {
taskQueueUtils.enqueue( cloudTasksUtils.enqueue(
getQueue("brda"), "brda",
withUrl(BrdaCopyAction.PATH) CloudTasksUtils.createPostTask(
.param(RequestParameters.PARAM_TLD, tld) BrdaCopyAction.PATH,
.param(RdeModule.PARAM_WATERMARK, watermark.toString())); Service.BACKEND.toString(),
ImmutableMultimap.of(
RequestParameters.PARAM_TLD,
tld,
RdeModule.PARAM_WATERMARK,
watermark.toString())));
} }
}); });
} }
/** Injectible factory for creating {@link RdeStagingReducer}. */ /** Injectible factory for creating {@link RdeStagingReducer}. */
static class Factory { static class Factory {
@Inject TaskQueueUtils taskQueueUtils; @Inject CloudTasksUtils cloudTasksUtils;
@Inject LockHandler lockHandler; @Inject LockHandler lockHandler;
@Inject @Config("rdeBucket") String bucket; @Inject @Config("rdeBucket") String bucket;
@Inject @Config("rdeStagingLockTimeout") Duration lockTimeout; @Inject @Config("rdeStagingLockTimeout") Duration lockTimeout;
@ -252,7 +260,7 @@ public final class RdeStagingReducer extends Reducer<PendingDeposit, DepositFrag
RdeStagingReducer create(ValidationMode validationMode, GcsUtils gcsUtils) { RdeStagingReducer create(ValidationMode validationMode, GcsUtils gcsUtils) {
return new RdeStagingReducer( return new RdeStagingReducer(
taskQueueUtils, cloudTasksUtils,
lockHandler, lockHandler,
bucket, bucket,
lockTimeout, lockTimeout,

View file

@ -27,7 +27,6 @@ import static google.registry.testing.DatabaseHelper.persistResource;
import static google.registry.testing.DatabaseHelper.persistResourceWithCommitLog; import static google.registry.testing.DatabaseHelper.persistResourceWithCommitLog;
import static google.registry.testing.TaskQueueHelper.assertAtLeastOneTaskIsEnqueued; import static google.registry.testing.TaskQueueHelper.assertAtLeastOneTaskIsEnqueued;
import static google.registry.testing.TaskQueueHelper.assertNoTasksEnqueued; import static google.registry.testing.TaskQueueHelper.assertNoTasksEnqueued;
import static google.registry.testing.TaskQueueHelper.assertTasksEnqueued;
import static google.registry.testing.TestDataHelper.loadFile; import static google.registry.testing.TestDataHelper.loadFile;
import static google.registry.tldconfig.idn.IdnTableEnum.EXTENDED_LATIN; import static google.registry.tldconfig.idn.IdnTableEnum.EXTENDED_LATIN;
import static java.nio.charset.StandardCharsets.UTF_8; import static java.nio.charset.StandardCharsets.UTF_8;
@ -50,17 +49,15 @@ import google.registry.model.ofy.Ofy;
import google.registry.model.tld.Registry; import google.registry.model.tld.Registry;
import google.registry.request.HttpException.BadRequestException; import google.registry.request.HttpException.BadRequestException;
import google.registry.request.RequestParameters; import google.registry.request.RequestParameters;
import google.registry.testing.CloudTasksHelper;
import google.registry.testing.CloudTasksHelper.TaskMatcher;
import google.registry.testing.FakeClock; import google.registry.testing.FakeClock;
import google.registry.testing.FakeKeyringModule; import google.registry.testing.FakeKeyringModule;
import google.registry.testing.FakeLockHandler; import google.registry.testing.FakeLockHandler;
import google.registry.testing.FakeResponse; import google.registry.testing.FakeResponse;
import google.registry.testing.InjectExtension; import google.registry.testing.InjectExtension;
import google.registry.testing.TaskQueueHelper.TaskMatcher;
import google.registry.testing.mapreduce.MapreduceTestCase; import google.registry.testing.mapreduce.MapreduceTestCase;
import google.registry.tldconfig.idn.IdnTableEnum; import google.registry.tldconfig.idn.IdnTableEnum;
import google.registry.util.Retrier;
import google.registry.util.SystemSleeper;
import google.registry.util.TaskQueueUtils;
import google.registry.xjc.XjcXmlTransformer; import google.registry.xjc.XjcXmlTransformer;
import google.registry.xjc.rde.XjcRdeContentType; import google.registry.xjc.rde.XjcRdeContentType;
import google.registry.xjc.rde.XjcRdeDeposit; import google.registry.xjc.rde.XjcRdeDeposit;
@ -104,6 +101,7 @@ public class RdeStagingActionDatastoreTest extends MapreduceTestCase<RdeStagingA
private final FakeResponse response = new FakeResponse(); private final FakeResponse response = new FakeResponse();
private final GcsUtils gcsUtils = new GcsUtils(LocalStorageHelper.getOptions()); private final GcsUtils gcsUtils = new GcsUtils(LocalStorageHelper.getOptions());
private final List<? super XjcRdeContentType> alreadyExtracted = new ArrayList<>(); private final List<? super XjcRdeContentType> alreadyExtracted = new ArrayList<>();
private final CloudTasksHelper cloudTasksHelper = new CloudTasksHelper();
private static PGPPublicKey encryptKey; private static PGPPublicKey encryptKey;
private static PGPPrivateKey decryptKey; private static PGPPrivateKey decryptKey;
@ -124,7 +122,7 @@ public class RdeStagingActionDatastoreTest extends MapreduceTestCase<RdeStagingA
action.mrRunner = makeDefaultRunner(); action.mrRunner = makeDefaultRunner();
action.lenient = false; action.lenient = false;
action.reducerFactory = new RdeStagingReducer.Factory(); action.reducerFactory = new RdeStagingReducer.Factory();
action.reducerFactory.taskQueueUtils = new TaskQueueUtils(new Retrier(new SystemSleeper(), 1)); action.reducerFactory.cloudTasksUtils = cloudTasksHelper.getTestCloudTasksUtils();
action.reducerFactory.lockHandler = new FakeLockHandler(true); action.reducerFactory.lockHandler = new FakeLockHandler(true);
action.reducerFactory.bucket = "rde-bucket"; action.reducerFactory.bucket = "rde-bucket";
action.reducerFactory.lockTimeout = Duration.standardHours(1); action.reducerFactory.lockTimeout = Duration.standardHours(1);
@ -467,11 +465,11 @@ public class RdeStagingActionDatastoreTest extends MapreduceTestCase<RdeStagingA
clock.setTo(DateTime.parse("2000-01-04TZ")); // Tuesday clock.setTo(DateTime.parse("2000-01-04TZ")); // Tuesday
action.run(); action.run();
executeTasksUntilEmpty("mapreduce", clock); executeTasksUntilEmpty("mapreduce", clock);
assertTasksEnqueued("rde-upload", cloudTasksHelper.assertTasksEnqueued(
new TaskMatcher() "rde-upload",
.url(RdeUploadAction.PATH) new TaskMatcher().url(RdeUploadAction.PATH).param(RequestParameters.PARAM_TLD, "lol"));
.param(RequestParameters.PARAM_TLD, "lol")); cloudTasksHelper.assertTasksEnqueued(
assertTasksEnqueued("brda", "brda",
new TaskMatcher() new TaskMatcher()
.url(BrdaCopyAction.PATH) .url(BrdaCopyAction.PATH)
.param(RequestParameters.PARAM_TLD, "lol") .param(RequestParameters.PARAM_TLD, "lol")

View file

@ -20,8 +20,6 @@ import static google.registry.model.rde.RdeMode.FULL;
import static google.registry.model.rde.RdeMode.THIN; import static google.registry.model.rde.RdeMode.THIN;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm; import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import static google.registry.testing.DatabaseHelper.createTld; import static google.registry.testing.DatabaseHelper.createTld;
import static google.registry.testing.TaskQueueHelper.assertNoTasksEnqueued;
import static google.registry.testing.TaskQueueHelper.assertTasksEnqueued;
import static google.registry.util.ResourceUtils.readResourceUtf8; import static google.registry.util.ResourceUtils.readResourceUtf8;
import static java.nio.charset.StandardCharsets.UTF_8; import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
@ -41,13 +39,10 @@ import google.registry.model.rde.RdeRevision;
import google.registry.model.tld.Registry; import google.registry.model.tld.Registry;
import google.registry.request.RequestParameters; import google.registry.request.RequestParameters;
import google.registry.testing.AppEngineExtension; import google.registry.testing.AppEngineExtension;
import google.registry.testing.FakeClock; import google.registry.testing.CloudTasksHelper;
import google.registry.testing.CloudTasksHelper.TaskMatcher;
import google.registry.testing.FakeKeyringModule; import google.registry.testing.FakeKeyringModule;
import google.registry.testing.FakeLockHandler; import google.registry.testing.FakeLockHandler;
import google.registry.testing.FakeSleeper;
import google.registry.testing.TaskQueueHelper.TaskMatcher;
import google.registry.util.Retrier;
import google.registry.util.TaskQueueUtils;
import google.registry.xml.ValidationMode; import google.registry.xml.ValidationMode;
import java.io.IOException; import java.io.IOException;
import java.util.Iterator; import java.util.Iterator;
@ -74,6 +69,7 @@ class RdeStagingReducerTest {
private static final PGPPublicKey encryptionKey = private static final PGPPublicKey encryptionKey =
new FakeKeyringModule().get().getRdeStagingEncryptionKey(); new FakeKeyringModule().get().getRdeStagingEncryptionKey();
private static final DateTime now = DateTime.parse("2000-01-01TZ"); private static final DateTime now = DateTime.parse("2000-01-01TZ");
private final CloudTasksHelper cloudTasksHelper = new CloudTasksHelper();
private Fragments brdaFragments = private Fragments brdaFragments =
new Fragments( new Fragments(
@ -94,7 +90,7 @@ class RdeStagingReducerTest {
private RdeStagingReducer reducer = private RdeStagingReducer reducer =
new RdeStagingReducer( new RdeStagingReducer(
new TaskQueueUtils(new Retrier(new FakeSleeper(new FakeClock()), 1)), cloudTasksHelper.getTestCloudTasksUtils(),
new FakeLockHandler(true), new FakeLockHandler(true),
GCS_BUCKET, GCS_BUCKET,
Duration.ZERO, Duration.ZERO,
@ -133,7 +129,7 @@ class RdeStagingReducerTest {
assertThat(loadCursorTime(CursorType.BRDA)) assertThat(loadCursorTime(CursorType.BRDA))
.isEquivalentAccordingToCompareTo(now.plus(Duration.standardDays(1))); .isEquivalentAccordingToCompareTo(now.plus(Duration.standardDays(1)));
assertThat(loadRevision(THIN)).isEqualTo(1); assertThat(loadRevision(THIN)).isEqualTo(1);
assertTasksEnqueued( cloudTasksHelper.assertTasksEnqueued(
"brda", "brda",
new TaskMatcher() new TaskMatcher()
.url(BrdaCopyAction.PATH) .url(BrdaCopyAction.PATH)
@ -159,7 +155,7 @@ class RdeStagingReducerTest {
// No extra operations in manual mode. // No extra operations in manual mode.
assertThat(loadCursorTime(CursorType.BRDA)).isEquivalentAccordingToCompareTo(now); assertThat(loadCursorTime(CursorType.BRDA)).isEquivalentAccordingToCompareTo(now);
assertThat(loadRevision(THIN)).isEqualTo(0); assertThat(loadRevision(THIN)).isEqualTo(0);
assertNoTasksEnqueued("brda"); cloudTasksHelper.assertNoTasksEnqueued("brda");
} }
@Test @Test
@ -179,7 +175,7 @@ class RdeStagingReducerTest {
assertThat(loadCursorTime(CursorType.RDE_STAGING)) assertThat(loadCursorTime(CursorType.RDE_STAGING))
.isEquivalentAccordingToCompareTo(now.plus(Duration.standardDays(1))); .isEquivalentAccordingToCompareTo(now.plus(Duration.standardDays(1)));
assertThat(loadRevision(FULL)).isEqualTo(1); assertThat(loadRevision(FULL)).isEqualTo(1);
assertTasksEnqueued( cloudTasksHelper.assertTasksEnqueued(
"rde-upload", "rde-upload",
new TaskMatcher().url(RdeUploadAction.PATH).param(RequestParameters.PARAM_TLD, "soy")); new TaskMatcher().url(RdeUploadAction.PATH).param(RequestParameters.PARAM_TLD, "soy"));
} }
@ -200,7 +196,7 @@ class RdeStagingReducerTest {
// No extra operations in manual mode. // No extra operations in manual mode.
assertThat(loadCursorTime(CursorType.RDE_STAGING)).isEquivalentAccordingToCompareTo(now); assertThat(loadCursorTime(CursorType.RDE_STAGING)).isEquivalentAccordingToCompareTo(now);
assertThat(loadRevision(FULL)).isEqualTo(0); assertThat(loadRevision(FULL)).isEqualTo(0);
assertNoTasksEnqueued("rde-upload"); cloudTasksHelper.assertNoTasksEnqueued("rde-upload");
} }
private static void compareLength(String outputFile, String lengthFilename) throws IOException { private static void compareLength(String outputFile, String lengthFilename) throws IOException {