diff --git a/core/build.gradle b/core/build.gradle index 0f41f3434..769bc511f 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -783,27 +783,27 @@ createUberJar( // User should install gcloud and login to GCP before invoking this tasks. if (environment == 'alpha') { def pipelines = [ - InitSql : + initSql : [ mainClass: 'google.registry.beam.initsql.InitSqlPipeline', metaData : 'google/registry/beam/init_sql_pipeline_metadata.json' ], - BulkDeleteDatastore: + bulkDeleteDatastore: [ mainClass: 'google.registry.beam.datastore.BulkDeleteDatastorePipeline', metaData : 'google/registry/beam/bulk_delete_datastore_pipeline_metadata.json' ], - Spec11 : + spec11 : [ mainClass: 'google.registry.beam.spec11.Spec11Pipeline', metaData : 'google/registry/beam/spec11_pipeline_metadata.json' ], - Invoicing : + invoicing : [ mainClass: 'google.registry.beam.invoicing.InvoicingPipeline', metaData : 'google/registry/beam/invoicing_pipeline_metadata.json' ], - Rde : + rde : [ mainClass: 'google.registry.beam.rde.RdePipeline', metaData : 'google/registry/beam/rde_pipeline_metadata.json' diff --git a/core/src/main/java/google/registry/beam/rde/RdeIO.java b/core/src/main/java/google/registry/beam/rde/RdeIO.java index f7ef89d01..d340d86c0 100644 --- a/core/src/main/java/google/registry/beam/rde/RdeIO.java +++ b/core/src/main/java/google/registry/beam/rde/RdeIO.java @@ -19,10 +19,13 @@ import static com.google.common.base.Verify.verify; import static google.registry.model.common.Cursor.getCursorTimeOrStartOfTime; import static google.registry.persistence.transaction.TransactionManagerFactory.tm; import static google.registry.persistence.transaction.TransactionManagerUtil.transactIfJpaTm; +import static google.registry.rde.RdeModule.BRDA_QUEUE; +import static google.registry.rde.RdeModule.RDE_UPLOAD_QUEUE; import static java.nio.charset.StandardCharsets.UTF_8; import com.google.auto.value.AutoValue; import com.google.cloud.storage.BlobId; +import com.google.common.collect.ImmutableMultimap; import com.google.common.flogger.FluentLogger; import google.registry.gcs.GcsUtils; import google.registry.keyring.api.PgpHelper; @@ -31,14 +34,20 @@ import google.registry.model.rde.RdeMode; import google.registry.model.rde.RdeNamingUtils; import google.registry.model.rde.RdeRevision; import google.registry.model.tld.Registry; +import google.registry.rde.BrdaCopyAction; import google.registry.rde.DepositFragment; import google.registry.rde.Ghostryde; import google.registry.rde.PendingDeposit; import google.registry.rde.RdeCounter; import google.registry.rde.RdeMarshaller; +import google.registry.rde.RdeModule; import google.registry.rde.RdeResourceType; +import google.registry.rde.RdeUploadAction; import google.registry.rde.RdeUtil; +import google.registry.request.Action.Service; +import google.registry.request.RequestParameters; import google.registry.tldconfig.idn.IdnTableEnum; +import google.registry.util.CloudTasksUtils; import google.registry.xjc.rdeheader.XjcRdeHeader; import google.registry.xjc.rdeheader.XjcRdeHeaderElement; import google.registry.xml.ValidationMode; @@ -68,6 +77,8 @@ public class RdeIO { abstract GcsUtils gcsUtils(); + abstract CloudTasksUtils cloudTasksUtils(); + abstract String rdeBucket(); // It's OK to return a primitive array because we are only using it to construct the @@ -83,7 +94,9 @@ public class RdeIO { @AutoValue.Builder abstract static class Builder { - abstract Builder setGcsUtils(GcsUtils gcsUtils); + abstract Builder setGcsUtils(GcsUtils value); + + abstract Builder setCloudTasksUtils(CloudTasksUtils value); abstract Builder setRdeBucket(String value); @@ -100,7 +113,8 @@ public class RdeIO { .apply( "Write to GCS", ParDo.of(new RdeWriter(gcsUtils(), rdeBucket(), stagingKeyBytes(), validationMode()))) - .apply("Update cursors", ParDo.of(new CursorUpdater())); + .apply("Update cursors", ParDo.of(new CursorUpdater())) + .apply("Enqueue upload action", ParDo.of(new UploadEnqueuer(cloudTasksUtils()))); return PDone.in(input.getPipeline()); } } @@ -236,11 +250,12 @@ public class RdeIO { } } - private static class CursorUpdater extends DoFn, Void> { + private static class CursorUpdater extends DoFn, PendingDeposit> { private static final FluentLogger logger = FluentLogger.forEnclosingClass(); @ProcessElement - public void processElement(@Element KV input) { + public void processElement( + @Element KV input, OutputReceiver outputReceiver) { tm().transact( () -> { PendingDeposit key = input.getKey(); @@ -268,6 +283,45 @@ public class RdeIO { "Rolled forward %s on %s cursor to %s.", key.cursor(), key.tld(), newPosition); RdeRevision.saveRevision(key.tld(), key.watermark(), key.mode(), revision); }); + outputReceiver.output(input.getKey()); + } + } + + private static class UploadEnqueuer extends DoFn { + + private final CloudTasksUtils cloudTasksUtils; + + private UploadEnqueuer(CloudTasksUtils cloudTasksUtils) { + this.cloudTasksUtils = cloudTasksUtils; + } + + @ProcessElement + public void processElement(@Element PendingDeposit input, PipelineOptions options) { + if (input.mode() == RdeMode.FULL) { + cloudTasksUtils.enqueue( + RDE_UPLOAD_QUEUE, + CloudTasksUtils.createPostTask( + RdeUploadAction.PATH, + Service.BACKEND.getServiceId(), + ImmutableMultimap.of( + RequestParameters.PARAM_TLD, + input.tld(), + RdeModule.PARAM_PREFIX, + options.getJobName() + '/'))); + } else { + cloudTasksUtils.enqueue( + BRDA_QUEUE, + CloudTasksUtils.createPostTask( + BrdaCopyAction.PATH, + Service.BACKEND.getServiceId(), + ImmutableMultimap.of( + RequestParameters.PARAM_TLD, + input.tld(), + RdeModule.PARAM_WATERMARK, + input.watermark().toString(), + RdeModule.PARAM_PREFIX, + options.getJobName() + '/'))); + } } } } diff --git a/core/src/main/java/google/registry/beam/rde/RdePipeline.java b/core/src/main/java/google/registry/beam/rde/RdePipeline.java index 3c70a0071..f0b337e9e 100644 --- a/core/src/main/java/google/registry/beam/rde/RdePipeline.java +++ b/core/src/main/java/google/registry/beam/rde/RdePipeline.java @@ -27,6 +27,7 @@ import com.google.common.io.BaseEncoding; import dagger.BindsInstance; import dagger.Component; import google.registry.beam.common.RegistryJpaIO; +import google.registry.config.CloudTasksUtilsModule; import google.registry.config.CredentialModule; import google.registry.config.RegistryConfig.ConfigModule; import google.registry.gcs.GcsUtils; @@ -44,6 +45,8 @@ import google.registry.rde.PendingDeposit; import google.registry.rde.PendingDeposit.PendingDepositCoder; import google.registry.rde.RdeFragmenter; import google.registry.rde.RdeMarshaller; +import google.registry.util.CloudTasksUtils; +import google.registry.util.UtilsModule; import google.registry.xml.ValidationMode; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -66,7 +69,6 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.FlatMapElements; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.Reshuffle; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; @@ -93,6 +95,7 @@ public class RdePipeline implements Serializable { private final String rdeBucket; private final byte[] stagingKeyBytes; private final GcsUtils gcsUtils; + private final CloudTasksUtils cloudTasksUtils; // Registrars to be excluded from data escrow. Not including the sandbox-only OTE type so that // if sneaks into production we would get an extra signal. @@ -111,13 +114,14 @@ public class RdePipeline implements Serializable { } @Inject - RdePipeline(RdePipelineOptions options, GcsUtils gcsUtils) { + RdePipeline(RdePipelineOptions options, GcsUtils gcsUtils, CloudTasksUtils cloudTasksUtils) { this.options = options; this.mode = ValidationMode.valueOf(options.getValidationMode()); this.pendings = decodePendings(options.getPendings()); - this.rdeBucket = options.getGcsBucket(); + this.rdeBucket = options.getRdeStagingBucket(); this.stagingKeyBytes = BaseEncoding.base64Url().decode(options.getStagingKey()); this.gcsUtils = gcsUtils; + this.cloudTasksUtils = cloudTasksUtils; } PipelineResult run() { @@ -140,10 +144,11 @@ public class RdePipeline implements Serializable { void persistData(PCollection>> input) { input.apply( - "Write to GCS and update cursors", + "Write to GCS, update cursors, and enqueue upload tasks", RdeIO.Write.builder() .setRdeBucket(rdeBucket) .setGcsUtils(gcsUtils) + .setCloudTasksUtils(cloudTasksUtils) .setValidationMode(mode) .setStagingKeyBytes(stagingKeyBytes) .build()); @@ -177,18 +182,13 @@ public class RdePipeline implements Serializable { })); } - @SuppressWarnings("deprecation") // Reshuffle is still recommended by Dataflow. PCollection> processNonRegistrarEntities( Pipeline pipeline, Class clazz) { return createInputs(pipeline, clazz) .apply("Marshal " + clazz.getSimpleName() + " into DepositFragment", mapToFragments(clazz)) - .setCoder(KvCoder.of(PendingDepositCoder.of(), SerializableCoder.of(DepositFragment.class))) - .apply( - "Reshuffle KV of " - + clazz.getSimpleName() - + " to prevent fusion", - Reshuffle.of()); + .setCoder( + KvCoder.of(PendingDepositCoder.of(), SerializableCoder.of(DepositFragment.class))); } PCollection> createInputs(Pipeline pipeline, Class clazz) { @@ -202,7 +202,7 @@ public class RdePipeline implements Serializable { String.class, // TODO: consider adding coders for entities and pass them directly instead of using // VKeys. - x -> VKey.create(clazz, x))); + x -> VKey.createSql(clazz, x))); } @@ -270,7 +270,7 @@ public class RdePipeline implements Serializable { * Encodes the TLD to pending deposit map in an URL safe string that is sent to the pipeline * worker by the pipeline launcher as a pipeline option. */ - static String encodePendings(ImmutableSetMultimap pendings) + public static String encodePendings(ImmutableSetMultimap pendings) throws IOException { try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { ObjectOutputStream oos = new ObjectOutputStream(baos); @@ -282,13 +282,24 @@ public class RdePipeline implements Serializable { public static void main(String[] args) throws IOException, ClassNotFoundException { PipelineOptionsFactory.register(RdePipelineOptions.class); - RdePipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(RdePipelineOptions.class); + RdePipelineOptions options = + PipelineOptionsFactory.fromArgs(args).withValidation().as(RdePipelineOptions.class); + // RegistryPipelineWorkerInitializer only initializes before pipeline executions, after the + // main() function constructed the graph. We need the registry environment set up so that we + // can create a CloudTasksUtils which uses the environment-dependent config file. + options.getRegistryEnvironment().setup(); options.setIsolationOverride(TransactionIsolationLevel.TRANSACTION_READ_COMMITTED); DaggerRdePipeline_RdePipelineComponent.builder().options(options).build().rdePipeline().run(); } @Singleton - @Component(modules = {CredentialModule.class, ConfigModule.class}) + @Component( + modules = { + CredentialModule.class, + ConfigModule.class, + CloudTasksUtilsModule.class, + UtilsModule.class + }) interface RdePipelineComponent { RdePipeline rdePipeline(); diff --git a/core/src/main/java/google/registry/beam/rde/RdePipelineOptions.java b/core/src/main/java/google/registry/beam/rde/RdePipelineOptions.java index 333491ff0..c365494b6 100644 --- a/core/src/main/java/google/registry/beam/rde/RdePipelineOptions.java +++ b/core/src/main/java/google/registry/beam/rde/RdePipelineOptions.java @@ -31,9 +31,9 @@ public interface RdePipelineOptions extends RegistryPipelineOptions { void setValidationMode(String value); @Description("The GCS bucket where the encrypted RDE deposits will be uploaded to.") - String getGcsBucket(); + String getRdeStagingBucket(); - void setGcsBucket(String value); + void setRdeStagingBucket(String value); @Description("The Base64-encoded PGP public key to encrypt the deposits.") String getStagingKey(); diff --git a/core/src/main/java/google/registry/config/CloudTasksUtilsModule.java b/core/src/main/java/google/registry/config/CloudTasksUtilsModule.java index 6900cce3c..6e134292b 100644 --- a/core/src/main/java/google/registry/config/CloudTasksUtilsModule.java +++ b/core/src/main/java/google/registry/config/CloudTasksUtilsModule.java @@ -22,10 +22,13 @@ import dagger.Provides; import google.registry.config.CredentialModule.DefaultCredential; import google.registry.config.RegistryConfig.Config; import google.registry.util.CloudTasksUtils; +import google.registry.util.CloudTasksUtils.GcpCloudTasksClient; +import google.registry.util.CloudTasksUtils.SerializableCloudTasksClient; import google.registry.util.GoogleCredentialsBundle; import google.registry.util.Retrier; import java.io.IOException; -import javax.inject.Provider; +import java.io.Serializable; +import java.util.function.Supplier; import javax.inject.Singleton; /** @@ -42,24 +45,35 @@ public abstract class CloudTasksUtilsModule { public static CloudTasksUtils provideCloudTasksUtils( @Config("projectId") String projectId, @Config("locationId") String locationId, - // Use a provider so that we can use try-with-resources with the client, which implements - // Autocloseable. - Provider clientProvider, + SerializableCloudTasksClient client, Retrier retrier) { - return new CloudTasksUtils(retrier, projectId, locationId, clientProvider); + return new CloudTasksUtils(retrier, projectId, locationId, client); + } + + // Provides a supplier instead of using a Dagger @Provider because the latter is not serializable. + @Provides + public static Supplier provideCloudTasksClientSupplier( + @DefaultCredential GoogleCredentialsBundle credentials) { + return (Supplier & Serializable) + () -> { + CloudTasksClient client; + try { + client = + CloudTasksClient.create( + CloudTasksSettings.newBuilder() + .setCredentialsProvider( + FixedCredentialsProvider.create(credentials.getGoogleCredentials())) + .build()); + } catch (IOException e) { + throw new RuntimeException(e); + } + return client; + }; } @Provides - public static CloudTasksClient provideCloudTasksClient( - @DefaultCredential GoogleCredentialsBundle credentials) { - try { - return CloudTasksClient.create( - CloudTasksSettings.newBuilder() - .setCredentialsProvider( - FixedCredentialsProvider.create(credentials.getGoogleCredentials())) - .build()); - } catch (IOException e) { - throw new RuntimeException(e); - } + public static SerializableCloudTasksClient provideSerializableCloudTasksClient( + final Supplier clientSupplier) { + return new GcpCloudTasksClient(clientSupplier); } } diff --git a/core/src/main/java/google/registry/rde/BrdaCopyAction.java b/core/src/main/java/google/registry/rde/BrdaCopyAction.java index a1e96588e..e77f65c3a 100644 --- a/core/src/main/java/google/registry/rde/BrdaCopyAction.java +++ b/core/src/main/java/google/registry/rde/BrdaCopyAction.java @@ -31,6 +31,7 @@ import google.registry.request.auth.Auth; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.Optional; import javax.inject.Inject; import org.bouncycastle.openpgp.PGPKeyPair; import org.bouncycastle.openpgp.PGPPrivateKey; @@ -60,7 +61,7 @@ import org.joda.time.DateTime; auth = Auth.AUTH_INTERNAL_OR_ADMIN) public final class BrdaCopyAction implements Runnable { - static final String PATH = "/_dr/task/brdaCopy"; + public static final String PATH = "/_dr/task/brdaCopy"; private static final FluentLogger logger = FluentLogger.forEnclosingClass(); @@ -69,6 +70,7 @@ public final class BrdaCopyAction implements Runnable { @Inject @Config("rdeBucket") String stagingBucket; @Inject @Parameter(RequestParameters.PARAM_TLD) String tld; @Inject @Parameter(RdeModule.PARAM_WATERMARK) DateTime watermark; + @Inject @Parameter(RdeModule.PARAM_PREFIX) Optional prefix; @Inject @Key("brdaReceiverKey") PGPPublicKey receiverKey; @Inject @Key("brdaSigningKey") PGPKeyPair signingKey; @Inject @Key("rdeStagingDecryptionKey") PGPPrivateKey stagingDecryptionKey; @@ -84,11 +86,12 @@ public final class BrdaCopyAction implements Runnable { } private void copyAsRyde() throws IOException { - String prefix = RdeNamingUtils.makeRydeFilename(tld, watermark, THIN, 1, 0); - BlobId xmlFilename = BlobId.of(stagingBucket, prefix + ".xml.ghostryde"); - BlobId xmlLengthFilename = BlobId.of(stagingBucket, prefix + ".xml.length"); - BlobId rydeFile = BlobId.of(brdaBucket, prefix + ".ryde"); - BlobId sigFile = BlobId.of(brdaBucket, prefix + ".sig"); + String nameWithoutPrefix = RdeNamingUtils.makeRydeFilename(tld, watermark, THIN, 1, 0); + String name = prefix.orElse("") + nameWithoutPrefix; + BlobId xmlFilename = BlobId.of(stagingBucket, name + ".xml.ghostryde"); + BlobId xmlLengthFilename = BlobId.of(stagingBucket, name + ".xml.length"); + BlobId rydeFile = BlobId.of(brdaBucket, nameWithoutPrefix + ".ryde"); + BlobId sigFile = BlobId.of(brdaBucket, nameWithoutPrefix + ".sig"); long xmlLength = readXmlLength(xmlLengthFilename); @@ -97,11 +100,12 @@ public final class BrdaCopyAction implements Runnable { InputStream ghostrydeDecoder = Ghostryde.decoder(gcsInput, stagingDecryptionKey); OutputStream rydeOut = gcsUtils.openOutputStream(rydeFile); OutputStream sigOut = gcsUtils.openOutputStream(sigFile); - RydeEncoder rydeEncoder = new RydeEncoder.Builder() - .setRydeOutput(rydeOut, receiverKey) - .setSignatureOutput(sigOut, signingKey) - .setFileMetadata(prefix, xmlLength, watermark) - .build()) { + RydeEncoder rydeEncoder = + new RydeEncoder.Builder() + .setRydeOutput(rydeOut, receiverKey) + .setSignatureOutput(sigOut, signingKey) + .setFileMetadata(nameWithoutPrefix, xmlLength, watermark) + .build()) { ByteStreams.copy(ghostrydeDecoder, rydeEncoder); } } diff --git a/core/src/main/java/google/registry/rde/RdeModule.java b/core/src/main/java/google/registry/rde/RdeModule.java index d0a4ec5a3..ee3a3abee 100644 --- a/core/src/main/java/google/registry/rde/RdeModule.java +++ b/core/src/main/java/google/registry/rde/RdeModule.java @@ -49,6 +49,10 @@ public abstract class RdeModule { public static final String PARAM_MODE = "mode"; public static final String PARAM_REVISION = "revision"; public static final String PARAM_LENIENT = "lenient"; + public static final String PARAM_PREFIX = "prefix"; + public static final String RDE_UPLOAD_QUEUE = "rde-upload"; + public static final String RDE_REPORT_QUEUE = "rde-report"; + public static final String BRDA_QUEUE = "brda"; @Provides @Parameter(PARAM_WATERMARK) @@ -92,10 +96,11 @@ public abstract class RdeModule { return extractBooleanParameter(req, PARAM_LENIENT); } + // TODO (jianglai): Make it a required parameter once we migrate to Cloud SQL. @Provides - @Named("brda") - static Queue provideQueueBrda() { - return getQueue("brda"); + @Parameter(PARAM_PREFIX) + static Optional providePrefix(HttpServletRequest req) { + return extractOptionalParameter(req, PARAM_PREFIX); } @Provides diff --git a/core/src/main/java/google/registry/rde/RdeReportAction.java b/core/src/main/java/google/registry/rde/RdeReportAction.java index d6096eacb..c906b3503 100644 --- a/core/src/main/java/google/registry/rde/RdeReportAction.java +++ b/core/src/main/java/google/registry/rde/RdeReportAction.java @@ -68,6 +68,7 @@ public final class RdeReportAction implements Runnable, EscrowTask { @Inject Response response; @Inject RdeReporter reporter; @Inject @Parameter(RequestParameters.PARAM_TLD) String tld; + @Inject @Parameter(RdeModule.PARAM_PREFIX) Optional prefix; @Inject @Config("rdeBucket") String bucket; @Inject @Config("rdeInterval") Duration interval; @Inject @Config("rdeReportLockTimeout") Duration timeout; @@ -96,8 +97,9 @@ public final class RdeReportAction implements Runnable, EscrowTask { RdeRevision.getCurrentRevision(tld, watermark, FULL) .orElseThrow( () -> new IllegalStateException("RdeRevision was not set on generated deposit")); - String prefix = RdeNamingUtils.makeRydeFilename(tld, watermark, FULL, 1, revision); - BlobId reportFilename = BlobId.of(bucket, prefix + "-report.xml.ghostryde"); + String name = + prefix.orElse("") + RdeNamingUtils.makeRydeFilename(tld, watermark, FULL, 1, revision); + BlobId reportFilename = BlobId.of(bucket, name + "-report.xml.ghostryde"); verify(gcsUtils.existsAndNotEmpty(reportFilename), "Missing file: %s", reportFilename); reporter.send(readReportFromGcs(reportFilename)); response.setContentType(PLAIN_TEXT_UTF_8); diff --git a/core/src/main/java/google/registry/rde/RdeStagingAction.java b/core/src/main/java/google/registry/rde/RdeStagingAction.java index aa0827648..d1c3fc04f 100644 --- a/core/src/main/java/google/registry/rde/RdeStagingAction.java +++ b/core/src/main/java/google/registry/rde/RdeStagingAction.java @@ -14,20 +14,33 @@ package google.registry.rde; +import static google.registry.beam.BeamUtils.createJobName; +import static google.registry.persistence.transaction.TransactionManagerFactory.tm; import static google.registry.request.Action.Method.GET; import static google.registry.request.Action.Method.POST; import static google.registry.xml.ValidationMode.LENIENT; import static google.registry.xml.ValidationMode.STRICT; +import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR; import static javax.servlet.http.HttpServletResponse.SC_NO_CONTENT; +import static javax.servlet.http.HttpServletResponse.SC_OK; +import com.google.api.services.dataflow.Dataflow; +import com.google.api.services.dataflow.model.LaunchFlexTemplateParameter; +import com.google.api.services.dataflow.model.LaunchFlexTemplateRequest; +import com.google.api.services.dataflow.model.LaunchFlexTemplateResponse; import com.google.common.base.Ascii; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSetMultimap; import com.google.common.collect.Multimaps; import com.google.common.flogger.FluentLogger; +import com.google.common.io.BaseEncoding; +import google.registry.beam.rde.RdePipeline; import google.registry.config.RegistryConfig.Config; +import google.registry.config.RegistryEnvironment; import google.registry.gcs.GcsUtils; +import google.registry.keyring.api.KeyModule.Key; import google.registry.mapreduce.MapreduceRunner; import google.registry.mapreduce.inputs.EppResourceInputs; import google.registry.mapreduce.inputs.NullInput; @@ -47,6 +60,7 @@ import google.registry.request.Response; import google.registry.request.auth.Auth; import google.registry.util.Clock; import google.registry.xml.ValidationMode; +import java.io.IOException; import java.util.Optional; import javax.inject.Inject; import org.joda.time.DateTime; @@ -201,6 +215,7 @@ public final class RdeStagingAction implements Runnable { public static final String PATH = "/_dr/task/rdeStaging"; + private static final String PIPELINE_NAME = "rde_pipeline"; private static final FluentLogger logger = FluentLogger.forEnclosingClass(); @Inject Clock clock; @@ -209,7 +224,11 @@ public final class RdeStagingAction implements Runnable { @Inject Response response; @Inject GcsUtils gcsUtils; @Inject MapreduceRunner mrRunner; + @Inject @Config("projectId") String projectId; + @Inject @Config("defaultJobRegion") String jobRegion; @Inject @Config("transactionCooldown") Duration transactionCooldown; + @Inject @Config("beamStagingBucketUrl") String stagingBucketUrl; + @Inject @Config("rdeBucket") String rdeBucket; @Inject @Parameter(RdeModule.PARAM_MANUAL) boolean manual; @Inject @Parameter(RdeModule.PARAM_DIRECTORY) Optional directory; @Inject @Parameter(RdeModule.PARAM_MODE) ImmutableSet modeStrings; @@ -217,7 +236,8 @@ public final class RdeStagingAction implements Runnable { @Inject @Parameter(RdeModule.PARAM_WATERMARKS) ImmutableSet watermarks; @Inject @Parameter(RdeModule.PARAM_REVISION) Optional revision; @Inject @Parameter(RdeModule.PARAM_LENIENT) boolean lenient; - + @Inject @Key("rdeStagingEncryptionKey") byte[] stagingKeyBytes; + @Inject Dataflow dataflow; @Inject RdeStagingAction() {} @Override @@ -228,27 +248,66 @@ public final class RdeStagingAction implements Runnable { String message = "Nothing needs to be deposited."; logger.atInfo().log(message); response.setStatus(SC_NO_CONTENT); - response.setPayload(message); + // No need to set payload as HTTP 204 response status code does not allow a payload. return; } for (PendingDeposit pending : pendings.values()) { logger.atInfo().log("Pending deposit: %s", pending); } ValidationMode validationMode = lenient ? LENIENT : STRICT; - RdeStagingMapper mapper = new RdeStagingMapper(validationMode, pendings); - RdeStagingReducer reducer = reducerFactory.create(validationMode, gcsUtils); - - mrRunner - .setJobName("Stage escrow deposits for all TLDs") - .setModuleName("backend") - .setDefaultReduceShards(pendings.size()) - .runMapreduce( - mapper, - reducer, - ImmutableList.of( - // Add an extra shard that maps over a null resource. See the mapper code for why. - new NullInput<>(), EppResourceInputs.createEntityInput(EppResource.class))) - .sendLinkToMapreduceConsole(response); + if (tm().isOfy()) { + RdeStagingMapper mapper = new RdeStagingMapper(validationMode, pendings); + RdeStagingReducer reducer = reducerFactory.create(validationMode, gcsUtils); + mrRunner + .setJobName("Stage escrow deposits for all TLDs") + .setModuleName("backend") + .setDefaultReduceShards(pendings.size()) + .runMapreduce( + mapper, + reducer, + ImmutableList.of( + // Add an extra shard that maps over a null resource. See the mapper code for why. + new NullInput<>(), EppResourceInputs.createEntityInput(EppResource.class))) + .sendLinkToMapreduceConsole(response); + } else { + try { + LaunchFlexTemplateParameter parameter = + new LaunchFlexTemplateParameter() + .setJobName(createJobName("rde", clock)) + .setContainerSpecGcsPath( + String.format("%s/%s_metadata.json", stagingBucketUrl, PIPELINE_NAME)) + .setParameters( + ImmutableMap.of( + "pendings", + RdePipeline.encodePendings(pendings), + "validationMode", + validationMode.name(), + "rdeStagingBucket", + rdeBucket, + "stagingKey", + BaseEncoding.base64Url().omitPadding().encode(stagingKeyBytes), + "registryEnvironment", + RegistryEnvironment.get().name())); + LaunchFlexTemplateResponse launchResponse = + dataflow + .projects() + .locations() + .flexTemplates() + .launch( + projectId, + jobRegion, + new LaunchFlexTemplateRequest().setLaunchParameter(parameter)) + .execute(); + logger.atInfo().log("Got response: %s", launchResponse.getJob().toPrettyString()); + response.setStatus(SC_OK); + response.setPayload( + String.format("Launched RDE pipeline: %s", launchResponse.getJob().getId())); + } catch (IOException e) { + logger.atWarning().withCause(e).log("Pipeline Launch failed"); + response.setStatus(SC_INTERNAL_SERVER_ERROR); + response.setPayload(String.format("Pipeline launch failed: %s", e.getMessage())); + } + } } private ImmutableSetMultimap getStandardPendingDeposits() { diff --git a/core/src/main/java/google/registry/rde/RdeUploadAction.java b/core/src/main/java/google/registry/rde/RdeUploadAction.java index 5d1278ebe..9b80f7eec 100644 --- a/core/src/main/java/google/registry/rde/RdeUploadAction.java +++ b/core/src/main/java/google/registry/rde/RdeUploadAction.java @@ -14,7 +14,6 @@ package google.registry.rde; -import static com.google.appengine.api.taskqueue.TaskOptions.Builder.withUrl; import static com.google.common.base.Verify.verify; import static com.google.common.net.MediaType.PLAIN_TEXT_UTF_8; import static com.jcraft.jsch.ChannelSftp.OVERWRITE; @@ -24,14 +23,15 @@ import static google.registry.model.common.Cursor.getCursorTimeOrStartOfTime; import static google.registry.model.rde.RdeMode.FULL; import static google.registry.persistence.transaction.TransactionManagerFactory.tm; import static google.registry.persistence.transaction.TransactionManagerUtil.transactIfJpaTm; +import static google.registry.rde.RdeModule.RDE_REPORT_QUEUE; import static google.registry.request.Action.Method.POST; import static google.registry.util.DateTimeUtils.START_OF_TIME; import static google.registry.util.DateTimeUtils.isBeforeOrAt; import static java.util.Arrays.asList; -import com.google.appengine.api.taskqueue.Queue; import com.google.cloud.storage.BlobId; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.HashMultimap; import com.google.common.flogger.FluentLogger; import com.google.common.io.ByteStreams; import com.jcraft.jsch.JSch; @@ -49,14 +49,15 @@ import google.registry.model.tld.Registry; import google.registry.rde.EscrowTaskRunner.EscrowTask; import google.registry.rde.JSchSshSession.JSchSshSessionFactory; import google.registry.request.Action; +import google.registry.request.Action.Service; import google.registry.request.HttpException.NoContentException; import google.registry.request.Parameter; import google.registry.request.RequestParameters; import google.registry.request.Response; import google.registry.request.auth.Auth; import google.registry.util.Clock; +import google.registry.util.CloudTasksUtils; import google.registry.util.Retrier; -import google.registry.util.TaskQueueUtils; import google.registry.util.TeeOutputStream; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -66,7 +67,6 @@ import java.io.OutputStream; import java.net.URI; import java.util.Optional; import javax.inject.Inject; -import javax.inject.Named; import org.bouncycastle.openpgp.PGPKeyPair; import org.bouncycastle.openpgp.PGPPrivateKey; import org.bouncycastle.openpgp.PGPPublicKey; @@ -90,7 +90,7 @@ import org.joda.time.Duration; auth = Auth.AUTH_INTERNAL_OR_ADMIN) public final class RdeUploadAction implements Runnable, EscrowTask { - static final String PATH = "/_dr/task/rdeUpload"; + public static final String PATH = "/_dr/task/rdeUpload"; private static final FluentLogger logger = FluentLogger.forEnclosingClass(); @@ -109,9 +109,10 @@ public final class RdeUploadAction implements Runnable, EscrowTask { @Inject JSchSshSessionFactory jschSshSessionFactory; @Inject Response response; @Inject SftpProgressMonitor sftpProgressMonitor; - @Inject TaskQueueUtils taskQueueUtils; + @Inject CloudTasksUtils cloudTasksUtils; @Inject Retrier retrier; @Inject @Parameter(RequestParameters.PARAM_TLD) String tld; + @Inject @Parameter(RdeModule.PARAM_PREFIX) Optional prefix; @Inject @Config("rdeBucket") String bucket; @Inject @Config("rdeInterval") Duration interval; @Inject @Config("rdeUploadLockTimeout") Duration timeout; @@ -120,15 +121,21 @@ public final class RdeUploadAction implements Runnable, EscrowTask { @Inject @Key("rdeReceiverKey") PGPPublicKey receiverKey; @Inject @Key("rdeSigningKey") PGPKeyPair signingKey; @Inject @Key("rdeStagingDecryptionKey") PGPPrivateKey stagingDecryptionKey; - @Inject @Named("rde-report") Queue reportQueue; @Inject RdeUploadAction() {} @Override public void run() { logger.atInfo().log("Attempting to acquire RDE upload lock for TLD '%s'.", tld); runner.lockRunAndRollForward(this, Registry.get(tld), timeout, CursorType.RDE_UPLOAD, interval); - taskQueueUtils.enqueue( - reportQueue, withUrl(RdeReportAction.PATH).param(RequestParameters.PARAM_TLD, tld)); + HashMultimap params = HashMultimap.create(); + params.put(RequestParameters.PARAM_TLD, tld); + if (prefix.isPresent()) { + params.put(RdeModule.PARAM_PREFIX, prefix.get()); + } + cloudTasksUtils.enqueue( + RDE_REPORT_QUEUE, + CloudTasksUtils.createPostTask( + RdeReportAction.PATH, Service.BACKEND.getServiceId(), params)); } @Override @@ -164,7 +171,9 @@ public final class RdeUploadAction implements Runnable, EscrowTask { RdeRevision.getCurrentRevision(tld, watermark, FULL) .orElseThrow( () -> new IllegalStateException("RdeRevision was not set on generated deposit")); - final String name = RdeNamingUtils.makeRydeFilename(tld, watermark, FULL, 1, revision); + final String nameWithoutPrefix = + RdeNamingUtils.makeRydeFilename(tld, watermark, FULL, 1, revision); + final String name = prefix.orElse("") + nameWithoutPrefix; final BlobId xmlFilename = BlobId.of(bucket, name + ".xml.ghostryde"); final BlobId xmlLengthFilename = BlobId.of(bucket, name + ".xml.length"); BlobId reportFilename = BlobId.of(bucket, name + "-report.xml.ghostryde"); @@ -174,7 +183,8 @@ public final class RdeUploadAction implements Runnable, EscrowTask { logger.atInfo().log("Commencing RDE upload for TLD '%s' to '%s'.", tld, uploadUrl); final long xmlLength = readXmlLength(xmlLengthFilename); retrier.callWithRetry( - () -> upload(xmlFilename, xmlLength, watermark, name), JSchException.class); + () -> upload(xmlFilename, xmlLength, watermark, name, nameWithoutPrefix), + JSchException.class); logger.atInfo().log( "Updating RDE cursor '%s' for TLD '%s' following successful upload.", RDE_UPLOAD_SFTP, tld); tm().transact( @@ -210,7 +220,8 @@ public final class RdeUploadAction implements Runnable, EscrowTask { * } */ @VisibleForTesting - protected void upload(BlobId xmlFile, long xmlLength, DateTime watermark, String name) + protected void upload( + BlobId xmlFile, long xmlLength, DateTime watermark, String name, String nameWithoutPrefix) throws Exception { logger.atInfo().log("Uploading XML file '%s' to remote path '%s'.", xmlFile, uploadUrl); try (InputStream gcsInput = gcsUtils.openInputStream(xmlFile); @@ -218,8 +229,8 @@ public final class RdeUploadAction implements Runnable, EscrowTask { try (JSchSshSession session = jschSshSessionFactory.create(lazyJsch.get(), uploadUrl); JSchSftpChannel ftpChan = session.openSftpChannel()) { ByteArrayOutputStream sigOut = new ByteArrayOutputStream(); - String rydeFilename = name + ".ryde"; - BlobId rydeGcsFilename = BlobId.of(bucket, rydeFilename); + String rydeFilename = nameWithoutPrefix + ".ryde"; + BlobId rydeGcsFilename = BlobId.of(bucket, name + ".ryde"); try (OutputStream ftpOutput = ftpChan.get().put(rydeFilename, sftpProgressMonitor, OVERWRITE); OutputStream gcsOutput = gcsUtils.openOutputStream(rydeGcsFilename); @@ -228,14 +239,15 @@ public final class RdeUploadAction implements Runnable, EscrowTask { new RydeEncoder.Builder() .setRydeOutput(teeOutput, receiverKey) .setSignatureOutput(sigOut, signingKey) - .setFileMetadata(name, xmlLength, watermark) + .setFileMetadata(nameWithoutPrefix, xmlLength, watermark) .build()) { long bytesCopied = ByteStreams.copy(ghostrydeDecoder, rydeEncoder); logger.atInfo().log("Uploaded %,d bytes to path '%s'.", bytesCopied, rydeFilename); } - String sigFilename = name + ".sig"; + String sigFilename = nameWithoutPrefix + ".sig"; + BlobId sigGcsFilename = BlobId.of(bucket, name + ".sig"); byte[] signature = sigOut.toByteArray(); - gcsUtils.createFromBytes(BlobId.of(bucket, sigFilename), signature); + gcsUtils.createFromBytes(sigGcsFilename, signature); ftpChan.get().put(new ByteArrayInputStream(signature), sigFilename); logger.atInfo().log("Uploaded %,d bytes to path '%s'.", signature.length, sigFilename); } diff --git a/core/src/main/java/google/registry/reporting/billing/GenerateInvoicesAction.java b/core/src/main/java/google/registry/reporting/billing/GenerateInvoicesAction.java index 969233812..a180e9d32 100644 --- a/core/src/main/java/google/registry/reporting/billing/GenerateInvoicesAction.java +++ b/core/src/main/java/google/registry/reporting/billing/GenerateInvoicesAction.java @@ -30,6 +30,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.flogger.FluentLogger; import com.google.common.net.MediaType; import google.registry.config.RegistryConfig.Config; +import google.registry.config.RegistryEnvironment; import google.registry.model.common.DatabaseMigrationStateSchedule.PrimaryDatabase; import google.registry.reporting.ReportingModule; import google.registry.request.Action; @@ -127,7 +128,9 @@ public class GenerateInvoicesAction implements Runnable { "database", database.name(), "billingBucketUrl", - billingBucketUrl)); + billingBucketUrl, + "registryEnvironment", + RegistryEnvironment.get().name())); LaunchFlexTemplateResponse launchResponse = dataflow .projects() diff --git a/core/src/main/resources/google/registry/beam/invoicing_pipeline_metadata.json b/core/src/main/resources/google/registry/beam/invoicing_pipeline_metadata.json index 971e8112f..d531e6bba 100644 --- a/core/src/main/resources/google/registry/beam/invoicing_pipeline_metadata.json +++ b/core/src/main/resources/google/registry/beam/invoicing_pipeline_metadata.json @@ -6,9 +6,9 @@ "name": "registryEnvironment", "label": "The Registry environment.", "helpText": "The Registry environment, required if environment-specific initialization (such as JPA) is needed on worker VMs.", - "is_optional": true, + "is_optional": false, "regexes": [ - "^[0-9A-Z_]+$" + "^PRODUCTION|SANDBOX|CRASH|QA|ALPHA$" ] }, { diff --git a/core/src/main/resources/google/registry/beam/rde_pipeline_metadata.json b/core/src/main/resources/google/registry/beam/rde_pipeline_metadata.json index 5911623f0..6128150a0 100644 --- a/core/src/main/resources/google/registry/beam/rde_pipeline_metadata.json +++ b/core/src/main/resources/google/registry/beam/rde_pipeline_metadata.json @@ -2,12 +2,21 @@ "name": "RDE/BRDA Deposit Generation", "description": "An Apache Beam pipeline generates RDE or BRDA deposits and deposits them to GCS with GhostRyde encryption.", "parameters": [ + { + "name": "registryEnvironment", + "label": "The Registry environment.", + "helpText": "The Registry environment, required if environment-specific initialization (such as JPA) is needed on worker VMs.", + "is_optional": false, + "regexes": [ + "^PRODUCTION|SANDBOX|CRASH|QA|ALPHA$" + ] + }, { "name": "pendings", "label": "The pendings deposits to generate.", "helpText": "A TLD to PendingDeposit map that is serialized and Base64 URL-safe encoded.", "regexes": [ - "A-Za-z0-9\\-_" + "[A-Za-z0-9\\-_]+" ] }, { @@ -19,12 +28,12 @@ ] }, { - "name": "gcsBucket", - "label": "The GCS bucket that where the resulting files will be stored.", + "name": "rdeStagingBucket", + "label": "The GCS bucket that where the resulting files will be stored.", "helpText": "Only the bucket name itself, without the leading \"gs://\".", "is_optional": false, "regexes": [ - "^[a-zA-Z0-9_\\-]+$" + "[a-zA-Z0-9_\\-]+$" ] }, { @@ -32,7 +41,7 @@ "label": "The PGP public key used to encrypt the RDE/BRDA deposit files.", "helpText": "The key is Base64 URL-safe encoded.", "regexes": [ - "A-Za-z0-9\\-_" + "[A-Za-z0-9\\-_]+" ] } ] diff --git a/core/src/main/resources/google/registry/beam/spec11_pipeline_metadata.json b/core/src/main/resources/google/registry/beam/spec11_pipeline_metadata.json index 869b0268f..4e48a5eb9 100644 --- a/core/src/main/resources/google/registry/beam/spec11_pipeline_metadata.json +++ b/core/src/main/resources/google/registry/beam/spec11_pipeline_metadata.json @@ -6,9 +6,9 @@ "name": "registryEnvironment", "label": "The Registry environment.", "helpText": "The Registry environment, required if environment-specific initialization (such as JPA) is needed on worker VMs.", - "is_optional": true, + "is_optional": false, "regexes": [ - "^[0-9A-Z_]+$" + "^PRODUCTION|SANDBOX|CRASH|QA|ALPHA$" ] }, { diff --git a/core/src/test/java/google/registry/beam/BeamActionTestBase.java b/core/src/test/java/google/registry/beam/BeamActionTestBase.java index 473b871bc..065a3e9ff 100644 --- a/core/src/test/java/google/registry/beam/BeamActionTestBase.java +++ b/core/src/test/java/google/registry/beam/BeamActionTestBase.java @@ -40,13 +40,13 @@ public abstract class BeamActionTestBase { protected Dataflow dataflow = mock(Dataflow.class); private Projects projects = mock(Projects.class); private Locations locations = mock(Locations.class); - private FlexTemplates templates = mock(FlexTemplates.class); + protected FlexTemplates templates = mock(FlexTemplates.class); protected Launch launch = mock(Launch.class); private LaunchFlexTemplateResponse launchResponse = new LaunchFlexTemplateResponse().setJob(new Job().setId("jobid")); @BeforeEach - void beforeEach() throws Exception { + protected void beforeEach() throws Exception { when(dataflow.projects()).thenReturn(projects); when(projects.locations()).thenReturn(locations); when(locations.flexTemplates()).thenReturn(templates); diff --git a/core/src/test/java/google/registry/beam/rde/RdePipelineTest.java b/core/src/test/java/google/registry/beam/rde/RdePipelineTest.java index 35bd50785..d9abb8925 100644 --- a/core/src/test/java/google/registry/beam/rde/RdePipelineTest.java +++ b/core/src/test/java/google/registry/beam/rde/RdePipelineTest.java @@ -69,6 +69,8 @@ import google.registry.rde.DepositFragment; import google.registry.rde.Ghostryde; import google.registry.rde.PendingDeposit; import google.registry.rde.RdeResourceType; +import google.registry.testing.CloudTasksHelper; +import google.registry.testing.CloudTasksHelper.TaskMatcher; import google.registry.testing.DatastoreEntityExtension; import google.registry.testing.FakeClock; import google.registry.testing.FakeKeyringModule; @@ -134,6 +136,8 @@ public class RdePipelineTest { private final GcsUtils gcsUtils = new GcsUtils(LocalStorageHelper.getOptions()); + private final CloudTasksHelper cloudTasksHelper = new CloudTasksHelper(); + private final PGPPublicKey encryptionKey = new FakeKeyringModule().get().getRdeStagingEncryptionKey(); @@ -212,9 +216,9 @@ public class RdePipelineTest { options.setValidationMode("LENIENT"); options.setStagingKey( BaseEncoding.base64Url().encode(PgpHelper.convertPublicKeyToBytes(encryptionKey))); - options.setGcsBucket("gcs-bucket"); + options.setRdeStagingBucket("gcs-bucket"); options.setJobName("rde-job"); - rdePipeline = new RdePipeline(options, gcsUtils); + rdePipeline = new RdePipeline(options, gcsUtils, cloudTasksHelper.getTestCloudTasksUtils()); } @AfterEach @@ -314,6 +318,21 @@ public class RdePipelineTest { assertThat(loadCursorTime(CursorType.RDE_STAGING)) .isEquivalentAccordingToCompareTo(now.plus(Duration.standardDays(1))); assertThat(loadRevision(now, FULL)).isEqualTo(1); + cloudTasksHelper.assertTasksEnqueued( + "brda", + new TaskMatcher() + .url("/_dr/task/brdaCopy") + .service("backend") + .param("tld", "soy") + .param("watermark", now.toString()) + .param("prefix", "rde-job/")); + cloudTasksHelper.assertTasksEnqueued( + "rde-upload", + new TaskMatcher() + .url("/_dr/task/rdeUpload") + .service("backend") + .param("tld", "soy") + .param("prefix", "rde-job/")); } // The GCS folder listing can be a bit flaky, so retry if necessary @@ -337,6 +356,7 @@ public class RdePipelineTest { assertThat(loadCursorTime(CursorType.RDE_STAGING)).isEquivalentAccordingToCompareTo(now); assertThat(loadRevision(now, FULL)).isEqualTo(0); + cloudTasksHelper.assertNoTasksEnqueued("brda", "rde-upload"); } private void verifyFiles( diff --git a/core/src/test/java/google/registry/rde/BrdaCopyActionTest.java b/core/src/test/java/google/registry/rde/BrdaCopyActionTest.java index 81898ae17..1cee71b42 100644 --- a/core/src/test/java/google/registry/rde/BrdaCopyActionTest.java +++ b/core/src/test/java/google/registry/rde/BrdaCopyActionTest.java @@ -35,26 +35,28 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.util.Optional; import org.bouncycastle.openpgp.PGPKeyPair; import org.bouncycastle.openpgp.PGPPrivateKey; import org.bouncycastle.openpgp.PGPPublicKey; import org.joda.time.DateTime; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; /** Unit tests for {@link BrdaCopyAction}. */ public class BrdaCopyActionTest { private static final ByteSource DEPOSIT_XML = RdeTestData.loadBytes("deposit_full.xml"); + private static final String STAGE_FILENAME = "lol_2010-10-17_thin_S1_R0"; + private static final BlobId RYDE_FILE = + BlobId.of("tub", String.format("%s.ryde", STAGE_FILENAME)); + private static final BlobId SIG_FILE = BlobId.of("tub", String.format("%s.sig", STAGE_FILENAME)); - private static final BlobId STAGE_FILE = - BlobId.of("keg", "lol_2010-10-17_thin_S1_R0.xml.ghostryde"); - private static final BlobId STAGE_LENGTH_FILE = - BlobId.of("keg", "lol_2010-10-17_thin_S1_R0.xml.length"); - private static final BlobId RYDE_FILE = BlobId.of("tub", "lol_2010-10-17_thin_S1_R0.ryde"); - private static final BlobId SIG_FILE = BlobId.of("tub", "lol_2010-10-17_thin_S1_R0.sig"); + private BlobId stageFile; + private BlobId stageLengthFile; @RegisterExtension public final BouncyCastleProviderExtension bouncy = new BouncyCastleProviderExtension(); @@ -87,6 +89,16 @@ public class BrdaCopyActionTest { private final GcsUtils gcsUtils = new GcsUtils(LocalStorageHelper.getOptions()); private final BrdaCopyAction action = new BrdaCopyAction(); + private void runAction(String prefix) throws IOException { + stageFile = BlobId.of("keg", String.format("%s%s.xml.ghostryde", prefix, STAGE_FILENAME)); + stageLengthFile = BlobId.of("keg", String.format("%s%s.xml.length", prefix, STAGE_FILENAME)); + byte[] xml = DEPOSIT_XML.read(); + gcsUtils.createFromBytes(stageFile, Ghostryde.encode(xml, encryptKey)); + gcsUtils.createFromBytes(stageLengthFile, Long.toString(xml.length).getBytes(UTF_8)); + action.prefix = prefix.isEmpty() ? Optional.empty() : Optional.of(prefix); + action.run(); + } + @BeforeEach void beforeEach() throws Exception { action.gcsUtils = gcsUtils; @@ -97,24 +109,22 @@ public class BrdaCopyActionTest { action.receiverKey = receiverKey; action.signingKey = signingKey; action.stagingDecryptionKey = decryptKey; - - byte[] xml = DEPOSIT_XML.read(); - gcsUtils.createFromBytes(STAGE_FILE, Ghostryde.encode(xml, encryptKey)); - gcsUtils.createFromBytes(STAGE_LENGTH_FILE, Long.toString(xml.length).getBytes(UTF_8)); } - @Test - void testRun() { - action.run(); - assertThat(gcsUtils.existsAndNotEmpty(STAGE_FILE)).isTrue(); - assertThat(gcsUtils.existsAndNotEmpty(RYDE_FILE)).isTrue(); + @ParameterizedTest + @ValueSource(strings = {"", "job-name/"}) + void testRun(String prefix) throws Exception { + runAction(prefix); + assertThat(gcsUtils.existsAndNotEmpty(stageFile)).isTrue(); + assertThat(gcsUtils.existsAndNotEmpty(stageLengthFile)).isTrue(); assertThat(gcsUtils.existsAndNotEmpty(SIG_FILE)).isTrue(); } - @Test - void testRun_rydeFormat() throws Exception { + @ParameterizedTest + @ValueSource(strings = {"", "job-name/"}) + void testRun_rydeFormat(String prefix) throws Exception { assumeTrue(hasCommand("gpg --version")); - action.run(); + runAction(prefix); File rydeTmp = new File(gpg.getCwd(), "ryde"); Files.write(gcsUtils.readBytesFrom(RYDE_FILE), rydeTmp); @@ -158,10 +168,11 @@ public class BrdaCopyActionTest { .contains("ID 7F9084EE54E1EB0F"); } - @Test - void testRun_rydeSignature() throws Exception { + @ParameterizedTest + @ValueSource(strings = {"", "job-name/"}) + void testRun_rydeSignature(String prefix) throws Exception { assumeTrue(hasCommand("gpg --version")); - action.run(); + runAction(prefix); File rydeTmp = new File(gpg.getCwd(), "ryde"); File sigTmp = new File(gpg.getCwd(), "ryde.sig"); diff --git a/core/src/test/java/google/registry/rde/RdeReportActionTest.java b/core/src/test/java/google/registry/rde/RdeReportActionTest.java index 297e73a41..6890caae0 100644 --- a/core/src/test/java/google/registry/rde/RdeReportActionTest.java +++ b/core/src/test/java/google/registry/rde/RdeReportActionTest.java @@ -65,6 +65,7 @@ import google.registry.xml.XmlException; import java.io.ByteArrayInputStream; import java.net.SocketTimeoutException; import java.util.Map; +import java.util.Optional; import org.bouncycastle.openpgp.PGPPublicKey; import org.joda.time.DateTime; import org.junit.jupiter.api.BeforeEach; @@ -91,7 +92,8 @@ public class RdeReportActionTest { private final URLFetchService urlFetchService = mock(URLFetchService.class); private final ArgumentCaptor request = ArgumentCaptor.forClass(HTTPRequest.class); private final HTTPResponse httpResponse = mock(HTTPResponse.class); - + private final PGPPublicKey encryptKey = + new FakeKeyringModule().get().getRdeStagingEncryptionKey(); private final GcsUtils gcsUtils = new GcsUtils(LocalStorageHelper.getOptions()); private final BlobId reportFile = BlobId.of("tub", "test_2006-06-06_full_S1_R0-report.xml.ghostryde"); @@ -112,12 +114,12 @@ public class RdeReportActionTest { action.timeout = standardSeconds(30); action.stagingDecryptionKey = new FakeKeyringModule().get().getRdeStagingDecryptionKey(); action.runner = runner; + action.prefix = Optional.empty(); return action; } @BeforeEach void beforeEach() throws Exception { - PGPPublicKey encryptKey = new FakeKeyringModule().get().getRdeStagingEncryptionKey(); createTld("test"); persistResource( Cursor.create(RDE_REPORT, DateTime.parse("2006-06-06TZ"), Registry.get("test"))); @@ -148,6 +150,37 @@ public class RdeReportActionTest { assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8); assertThat(response.getPayload()).isEqualTo("OK test 2006-06-06T00:00:00.000Z\n"); + // Verify the HTTP request was correct. + assertThat(request.getValue().getMethod()).isSameInstanceAs(PUT); + assertThat(request.getValue().getURL().getProtocol()).isEqualTo("https"); + assertThat(request.getValue().getURL().getPath()).endsWith("/test/20101017001"); + Map headers = mapifyHeaders(request.getValue().getHeaders()); + assertThat(headers).containsEntry("CONTENT_TYPE", "text/xml"); + assertThat(headers).containsEntry("AUTHORIZATION", "Basic dGVzdF9yeTpmb28="); + + // Verify the payload XML was the same as what's in testdata/report.xml. + XjcRdeReportReport report = parseReport(request.getValue().getPayload()); + assertThat(report.getId()).isEqualTo("20101017001"); + assertThat(report.getCrDate()).isEqualTo(DateTime.parse("2010-10-17T00:15:00.0Z")); + assertThat(report.getWatermark()).isEqualTo(DateTime.parse("2010-10-17T00:00:00Z")); + } + + @TestOfyAndSql + void testRunWithLock_withPrefix() throws Exception { + when(httpResponse.getResponseCode()).thenReturn(SC_OK); + when(httpResponse.getContent()).thenReturn(IIRDEA_GOOD_XML.read()); + when(urlFetchService.fetch(request.capture())).thenReturn(httpResponse); + RdeReportAction action = createAction(); + action.prefix = Optional.of("job-name/"); + gcsUtils.delete(reportFile); + gcsUtils.createFromBytes( + BlobId.of("tub", "job-name/test_2006-06-06_full_S1_R0-report.xml.ghostryde"), + Ghostryde.encode(REPORT_XML.read(), encryptKey)); + action.runWithLock(loadRdeReportCursor()); + assertThat(response.getStatus()).isEqualTo(200); + assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8); + assertThat(response.getPayload()).isEqualTo("OK test 2006-06-06T00:00:00.000Z\n"); + // Verify the HTTP request was correct. assertThat(request.getValue().getMethod()).isSameInstanceAs(PUT); assertThat(request.getValue().getURL().getProtocol()).isEqualTo("https"); diff --git a/core/src/test/java/google/registry/rde/RdeStagingActionCloudSqlTest.java b/core/src/test/java/google/registry/rde/RdeStagingActionCloudSqlTest.java new file mode 100644 index 000000000..27b61d232 --- /dev/null +++ b/core/src/test/java/google/registry/rde/RdeStagingActionCloudSqlTest.java @@ -0,0 +1,263 @@ +// Copyright 2017 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.rde; + +import static com.google.common.truth.Truth.assertThat; +import static google.registry.testing.DatabaseHelper.createTld; +import static google.registry.testing.DatabaseHelper.persistResource; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; + +import com.google.api.services.dataflow.model.LaunchFlexTemplateRequest; +import com.google.cloud.storage.contrib.nio.testing.LocalStorageHelper; +import com.google.common.collect.ImmutableSet; +import google.registry.beam.BeamActionTestBase; +import google.registry.gcs.GcsUtils; +import google.registry.model.tld.Registry; +import google.registry.request.HttpException.BadRequestException; +import google.registry.testing.AppEngineExtension; +import google.registry.testing.DualDatabaseTest; +import google.registry.testing.FakeClock; +import google.registry.testing.TestSqlOnly; +import java.nio.charset.StandardCharsets; +import java.util.Optional; +import org.joda.time.DateTime; +import org.joda.time.DateTimeConstants; +import org.joda.time.Duration; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.RegisterExtension; + +/** Unit tests for {@link RdeStagingAction} in Cloud SQL. */ +@DualDatabaseTest +public class RdeStagingActionCloudSqlTest extends BeamActionTestBase { + + private final FakeClock clock = new FakeClock(); + private final GcsUtils gcsUtils = new GcsUtils(LocalStorageHelper.getOptions()); + private final RdeStagingAction action = new RdeStagingAction(); + + @RegisterExtension + public final AppEngineExtension extension = + AppEngineExtension.builder().withClock(clock).withDatastoreAndCloudSql().build(); + + @BeforeEach + @Override + public void beforeEach() throws Exception { + super.beforeEach(); + action.clock = clock; + action.lenient = false; + action.projectId = "projectId"; + action.jobRegion = "jobRegion"; + action.rdeBucket = "rde-bucket"; + action.pendingDepositChecker = new PendingDepositChecker(); + action.pendingDepositChecker.brdaDayOfWeek = DateTimeConstants.TUESDAY; + action.pendingDepositChecker.brdaInterval = Duration.standardDays(7); + action.pendingDepositChecker.clock = clock; + action.pendingDepositChecker.rdeInterval = Duration.standardDays(1); + action.gcsUtils = gcsUtils; + action.response = response; + action.transactionCooldown = Duration.ZERO; + action.stagingKeyBytes = "ABCE".getBytes(StandardCharsets.UTF_8); + action.directory = Optional.empty(); + action.modeStrings = ImmutableSet.of(); + action.tlds = ImmutableSet.of(); + action.watermarks = ImmutableSet.of(); + action.revision = Optional.empty(); + action.dataflow = dataflow; + } + + @TestSqlOnly + void testRun_modeInNonManualMode_throwsException() { + createTldWithEscrowEnabled("lol"); + clock.setTo(DateTime.parse("2000-01-01TZ")); + action.modeStrings = ImmutableSet.of("full"); + assertThrows(BadRequestException.class, action::run); + verifyNoMoreInteractions(dataflow); + } + + @TestSqlOnly + void testRun_tldInNonManualMode_throwsException() { + createTldWithEscrowEnabled("lol"); + clock.setTo(DateTime.parse("2000-01-01TZ")); + action.tlds = ImmutableSet.of("tld"); + assertThrows(BadRequestException.class, action::run); + verifyNoMoreInteractions(dataflow); + } + + @TestSqlOnly + void testRun_watermarkInNonManualMode_throwsException() { + createTldWithEscrowEnabled("lol"); + clock.setTo(DateTime.parse("2000-01-01TZ")); + action.watermarks = ImmutableSet.of(clock.nowUtc()); + assertThrows(BadRequestException.class, action::run); + verifyNoMoreInteractions(dataflow); + } + + @TestSqlOnly + void testRun_revisionInNonManualMode_throwsException() { + createTldWithEscrowEnabled("lol"); + clock.setTo(DateTime.parse("2000-01-01TZ")); + action.revision = Optional.of(42); + assertThrows(BadRequestException.class, action::run); + verifyNoMoreInteractions(dataflow); + } + + @TestSqlOnly + void testRun_noTlds_returns204() { + action.run(); + assertThat(response.getStatus()).isEqualTo(204); + verifyNoMoreInteractions(dataflow); + } + + @TestSqlOnly + void testRun_tldWithoutEscrowEnabled_returns204() { + createTld("lol"); + persistResource(Registry.get("lol").asBuilder().setEscrowEnabled(false).build()); + clock.setTo(DateTime.parse("2000-01-01TZ")); + action.run(); + assertThat(response.getStatus()).isEqualTo(204); + verifyNoMoreInteractions(dataflow); + } + + @TestSqlOnly + void testRun_tldWithEscrowEnabled_launchesPipeline() throws Exception { + createTldWithEscrowEnabled("lol"); + clock.setTo(DateTime.parse("2000-01-01TZ")); + action.run(); + assertThat(response.getStatus()).isEqualTo(200); + assertThat(response.getPayload()).contains("Launched RDE pipeline: jobid"); + verify(templates, times(1)) + .launch(eq("projectId"), eq("jobRegion"), any(LaunchFlexTemplateRequest.class)); + } + + @TestSqlOnly + void testRun_withinTransactionCooldown_getsExcludedAndReturns204() { + createTldWithEscrowEnabled("lol"); + clock.setTo(DateTime.parse("2000-01-01T00:04:59Z")); + action.transactionCooldown = Duration.standardMinutes(5); + action.run(); + assertThat(response.getStatus()).isEqualTo(204); + verifyNoMoreInteractions(dataflow); + } + + @TestSqlOnly + void testRun_afterTransactionCooldown_runsMapReduce() throws Exception { + createTldWithEscrowEnabled("lol"); + clock.setTo(DateTime.parse("2000-01-01T00:05:00Z")); + action.transactionCooldown = Duration.standardMinutes(5); + action.run(); + assertThat(response.getStatus()).isEqualTo(200); + assertThat(response.getPayload()).contains("Launched RDE pipeline: jobid"); + verify(templates, times(1)) + .launch(eq("projectId"), eq("jobRegion"), any(LaunchFlexTemplateRequest.class)); + } + + @TestSqlOnly + void testManualRun_emptyMode_throwsException() { + createTldWithEscrowEnabled("lol"); + clock.setTo(DateTime.parse("2000-01-01TZ")); + action.manual = true; + action.directory = Optional.of("test/"); + action.modeStrings = ImmutableSet.of(); + action.tlds = ImmutableSet.of("lol"); + action.watermarks = ImmutableSet.of(clock.nowUtc()); + assertThrows(BadRequestException.class, action::run); + } + + @TestSqlOnly + void testManualRun_invalidMode_throwsException() { + createTldWithEscrowEnabled("lol"); + clock.setTo(DateTime.parse("2000-01-01TZ")); + action.manual = true; + action.directory = Optional.of("test/"); + action.modeStrings = ImmutableSet.of("full", "thing"); + action.tlds = ImmutableSet.of("lol"); + action.watermarks = ImmutableSet.of(clock.nowUtc()); + assertThrows(BadRequestException.class, action::run); + } + + @TestSqlOnly + void testManualRun_emptyTld_throwsException() { + createTldWithEscrowEnabled("lol"); + clock.setTo(DateTime.parse("2000-01-01TZ")); + action.manual = true; + action.directory = Optional.of("test/"); + action.modeStrings = ImmutableSet.of("full"); + action.tlds = ImmutableSet.of(); + action.watermarks = ImmutableSet.of(clock.nowUtc()); + assertThrows(BadRequestException.class, action::run); + } + + @TestSqlOnly + void testManualRun_emptyWatermark_throwsException() { + createTldWithEscrowEnabled("lol"); + clock.setTo(DateTime.parse("2000-01-01TZ")); + action.manual = true; + action.directory = Optional.of("test/"); + action.modeStrings = ImmutableSet.of("full"); + action.tlds = ImmutableSet.of("lol"); + action.watermarks = ImmutableSet.of(); + assertThrows(BadRequestException.class, action::run); + } + + @TestSqlOnly + void testManualRun_nonDayStartWatermark_throwsException() { + createTldWithEscrowEnabled("lol"); + clock.setTo(DateTime.parse("2000-01-01TZ")); + action.manual = true; + action.directory = Optional.of("test/"); + action.modeStrings = ImmutableSet.of("full"); + action.tlds = ImmutableSet.of("lol"); + action.watermarks = ImmutableSet.of(DateTime.parse("2001-01-01T01:36:45Z")); + assertThrows(BadRequestException.class, action::run); + } + + @TestSqlOnly + void testManualRun_invalidRevision_throwsException() { + createTldWithEscrowEnabled("lol"); + clock.setTo(DateTime.parse("2000-01-01TZ")); + action.manual = true; + action.directory = Optional.of("test/"); + action.modeStrings = ImmutableSet.of("full"); + action.tlds = ImmutableSet.of("lol"); + action.watermarks = ImmutableSet.of(DateTime.parse("2001-01-01T00:00:00Z")); + action.revision = Optional.of(-1); + assertThrows(BadRequestException.class, action::run); + } + + @TestSqlOnly + void testManualRun_validParameters_runsMapReduce() throws Exception { + createTldWithEscrowEnabled("lol"); + clock.setTo(DateTime.parse("2000-01-01TZ")); + action.manual = true; + action.directory = Optional.of("test/"); + action.modeStrings = ImmutableSet.of("full"); + action.tlds = ImmutableSet.of("lol"); + action.watermarks = ImmutableSet.of(DateTime.parse("2001-01-01TZ")); + action.run(); + assertThat(response.getStatus()).isEqualTo(200); + assertThat(response.getPayload()).contains("Launched RDE pipeline: jobid"); + verify(templates, times(1)) + .launch(eq("projectId"), eq("jobRegion"), any(LaunchFlexTemplateRequest.class)); + } + + private static void createTldWithEscrowEnabled(final String tld) { + createTld(tld); + persistResource(Registry.get(tld).asBuilder().setEscrowEnabled(true).build()); + } +} diff --git a/core/src/test/java/google/registry/rde/RdeStagingActionTest.java b/core/src/test/java/google/registry/rde/RdeStagingActionDatastoreTest.java similarity index 99% rename from core/src/test/java/google/registry/rde/RdeStagingActionTest.java rename to core/src/test/java/google/registry/rde/RdeStagingActionDatastoreTest.java index de2e04404..5458b6016 100644 --- a/core/src/test/java/google/registry/rde/RdeStagingActionTest.java +++ b/core/src/test/java/google/registry/rde/RdeStagingActionDatastoreTest.java @@ -90,8 +90,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -/** Unit tests for {@link RdeStagingAction}. */ -public class RdeStagingActionTest extends MapreduceTestCase { +/** Unit tests for {@link RdeStagingAction} in Datastore. */ +public class RdeStagingActionDatastoreTest extends MapreduceTestCase { private static final BlobId XML_FILE = BlobId.of("rde-bucket", "lol_2000-01-01_full_S1_R0.xml.ghostryde"); diff --git a/core/src/test/java/google/registry/rde/RdeTestSuite.java b/core/src/test/java/google/registry/rde/RdeTestSuite.java index f0d0d7282..eaad3eb6a 100644 --- a/core/src/test/java/google/registry/rde/RdeTestSuite.java +++ b/core/src/test/java/google/registry/rde/RdeTestSuite.java @@ -27,7 +27,8 @@ import org.junit.runner.RunWith; GhostrydeGpgIntegrationTest.class, GhostrydeTest.class, HostResourceToXjcConverterTest.class, - RdeStagingActionTest.class, + RdeStagingActionDatastoreTest.class, + RdeStagingActionCloudSqlTest.class, RdeUploadActionTest.class, RdeReportActionTest.class, RegistrarToXjcConverterTest.class, diff --git a/core/src/test/java/google/registry/rde/RdeUploadActionTest.java b/core/src/test/java/google/registry/rde/RdeUploadActionTest.java index 5ab4e5f4c..bcb40a0c9 100644 --- a/core/src/test/java/google/registry/rde/RdeUploadActionTest.java +++ b/core/src/test/java/google/registry/rde/RdeUploadActionTest.java @@ -25,8 +25,6 @@ import static google.registry.testing.DatabaseHelper.createTld; import static google.registry.testing.DatabaseHelper.persistResource; import static google.registry.testing.DatabaseHelper.persistSimpleResource; import static google.registry.testing.SystemInfo.hasCommand; -import static google.registry.testing.TaskQueueHelper.assertNoTasksEnqueued; -import static google.registry.testing.TaskQueueHelper.assertTasksEnqueued; import static java.nio.charset.StandardCharsets.UTF_8; import static org.joda.time.Duration.standardDays; import static org.joda.time.Duration.standardHours; @@ -41,7 +39,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -import com.google.appengine.api.taskqueue.QueueFactory; import com.google.appengine.api.utils.SystemProperty; import com.google.cloud.storage.BlobId; import com.google.cloud.storage.contrib.nio.testing.LocalStorageHelper; @@ -62,6 +59,8 @@ import google.registry.request.HttpException.NoContentException; import google.registry.request.RequestParameters; import google.registry.testing.AppEngineExtension; import google.registry.testing.BouncyCastleProviderExtension; +import google.registry.testing.CloudTasksHelper; +import google.registry.testing.CloudTasksHelper.TaskMatcher; import google.registry.testing.DualDatabaseTest; import google.registry.testing.FakeClock; import google.registry.testing.FakeKeyringModule; @@ -69,17 +68,16 @@ import google.registry.testing.FakeResponse; import google.registry.testing.FakeSleeper; import google.registry.testing.GpgSystemCommandExtension; import google.registry.testing.Lazies; -import google.registry.testing.TaskQueueHelper.TaskMatcher; import google.registry.testing.TestOfyAndSql; import google.registry.testing.sftp.SftpServerExtension; import google.registry.util.Retrier; -import google.registry.util.TaskQueueUtils; import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.Socket; import java.net.URI; +import java.util.Optional; import org.bouncycastle.openpgp.PGPPublicKey; import org.joda.time.DateTime; import org.junit.jupiter.api.BeforeEach; @@ -100,6 +98,12 @@ public class RdeUploadActionTest { BlobId.of("bucket", "tld_2010-10-17_full_S1_R0.xml.length"); private static final BlobId REPORT_FILE = BlobId.of("bucket", "tld_2010-10-17_full_S1_R0-report.xml.ghostryde"); + private static final BlobId GHOSTRYDE_FILE_WITH_PREFIX = + BlobId.of("bucket", "job-name/tld_2010-10-17_full_S1_R0.xml.ghostryde"); + private static final BlobId LENGTH_FILE_WITH_PREFIX = + BlobId.of("bucket", "job-name/tld_2010-10-17_full_S1_R0.xml.length"); + private static final BlobId REPORT_FILE_WITH_PREFIX = + BlobId.of("bucket", "job-name/tld_2010-10-17_full_S1_R0-report.xml.ghostryde"); private static final BlobId GHOSTRYDE_R1_FILE = BlobId.of("bucket", "tld_2010-10-17_full_S1_R1.xml.ghostryde"); @@ -109,6 +113,7 @@ public class RdeUploadActionTest { BlobId.of("bucket", "tld_2010-10-17_full_S1_R1-report.xml.ghostryde"); private final GcsUtils gcsUtils = new GcsUtils(LocalStorageHelper.getOptions()); + private final CloudTasksHelper cloudTasksHelper = new CloudTasksHelper(); @RegisterExtension final SftpServerExtension sftpd = new SftpServerExtension(); @@ -129,6 +134,8 @@ public class RdeUploadActionTest { public final AppEngineExtension appEngine = AppEngineExtension.builder().withDatastoreAndCloudSql().withTaskQueue().build(); + private final PGPPublicKey encryptKey = + new FakeKeyringModule().get().getRdeStagingEncryptionKey(); private final FakeResponse response = new FakeResponse(); private final EscrowTaskRunner runner = mock(EscrowTaskRunner.class); private final FakeClock clock = new FakeClock(DateTime.parse("2010-10-17TZ")); @@ -155,10 +162,10 @@ public class RdeUploadActionTest { action.receiverKey = keyring.getRdeReceiverKey(); action.signingKey = keyring.getRdeSigningKey(); action.stagingDecryptionKey = keyring.getRdeStagingDecryptionKey(); - action.reportQueue = QueueFactory.getQueue("rde-report"); action.runner = runner; - action.taskQueueUtils = new TaskQueueUtils(new Retrier(null, 1)); + action.cloudTasksUtils = cloudTasksHelper.getTestCloudTasksUtils(); action.retrier = new Retrier(new FakeSleeper(clock), 3); + action.prefix = Optional.empty(); return action; } } @@ -181,15 +188,13 @@ public class RdeUploadActionTest { SystemProperty.environment.set(SystemProperty.Environment.Value.Development); createTld("tld"); - PGPPublicKey encryptKey = new FakeKeyringModule().get().getRdeStagingEncryptionKey(); gcsUtils.createFromBytes(GHOSTRYDE_FILE, Ghostryde.encode(DEPOSIT_XML.read(), encryptKey)); gcsUtils.createFromBytes(GHOSTRYDE_R1_FILE, Ghostryde.encode(DEPOSIT_XML.read(), encryptKey)); gcsUtils.createFromBytes(LENGTH_FILE, Long.toString(DEPOSIT_XML.size()).getBytes(UTF_8)); gcsUtils.createFromBytes(LENGTH_R1_FILE, Long.toString(DEPOSIT_XML.size()).getBytes(UTF_8)); gcsUtils.createFromBytes(REPORT_FILE, Ghostryde.encode(REPORT_XML.read(), encryptKey)); gcsUtils.createFromBytes(REPORT_R1_FILE, Ghostryde.encode(REPORT_XML.read(), encryptKey)); - tm() - .transact( + tm().transact( () -> { RdeRevision.saveRevision("lol", DateTime.parse("2010-10-17TZ"), FULL, 0); RdeRevision.saveRevision("tld", DateTime.parse("2010-10-17TZ"), FULL, 0); @@ -210,11 +215,48 @@ public class RdeUploadActionTest { RdeUploadAction action = createAction(null); action.tld = "lol"; action.run(); - verify(runner).lockRunAndRollForward( - action, Registry.get("lol"), standardSeconds(23), CursorType.RDE_UPLOAD, standardDays(1)); - assertTasksEnqueued("rde-report", new TaskMatcher() - .url(RdeReportAction.PATH) - .param(RequestParameters.PARAM_TLD, "lol")); + verify(runner) + .lockRunAndRollForward( + action, + Registry.get("lol"), + standardSeconds(23), + CursorType.RDE_UPLOAD, + standardDays(1)); + cloudTasksHelper.assertTasksEnqueued( + "rde-report", + new TaskMatcher().url(RdeReportAction.PATH).param(RequestParameters.PARAM_TLD, "lol")); + verifyNoMoreInteractions(runner); + } + + @TestOfyAndSql + void testRun_withPrefix() throws Exception { + createTld("lol"); + RdeUploadAction action = createAction(null); + action.prefix = Optional.of("job-name/"); + action.tld = "lol"; + gcsUtils.delete(GHOSTRYDE_FILE); + gcsUtils.createFromBytes( + GHOSTRYDE_FILE_WITH_PREFIX, Ghostryde.encode(DEPOSIT_XML.read(), encryptKey)); + gcsUtils.delete(LENGTH_FILE); + gcsUtils.createFromBytes( + LENGTH_FILE_WITH_PREFIX, Long.toString(DEPOSIT_XML.size()).getBytes(UTF_8)); + gcsUtils.delete(REPORT_FILE); + gcsUtils.createFromBytes( + REPORT_FILE_WITH_PREFIX, Ghostryde.encode(REPORT_XML.read(), encryptKey)); + action.run(); + verify(runner) + .lockRunAndRollForward( + action, + Registry.get("lol"), + standardSeconds(23), + CursorType.RDE_UPLOAD, + standardDays(1)); + cloudTasksHelper.assertTasksEnqueued( + "rde-report", + new TaskMatcher() + .url(RdeReportAction.PATH) + .param(RequestParameters.PARAM_TLD, "lol") + .param(RdeModule.PARAM_PREFIX, "job-name/")); verifyNoMoreInteractions(runner); } @@ -231,7 +273,7 @@ public class RdeUploadActionTest { assertThat(response.getStatus()).isEqualTo(200); assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8); assertThat(response.getPayload()).isEqualTo("OK tld 2010-10-17T00:00:00.000Z\n"); - assertNoTasksEnqueued("rde-upload"); + cloudTasksHelper.assertNoTasksEnqueued("rde-upload"); assertThat(folder.list()) .asList() .containsExactly("tld_2010-10-17_full_S1_R0.ryde", "tld_2010-10-17_full_S1_R0.sig"); @@ -262,7 +304,7 @@ public class RdeUploadActionTest { assertThat(response.getStatus()).isEqualTo(200); assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8); assertThat(response.getPayload()).isEqualTo("OK tld 2010-10-17T00:00:00.000Z\n"); - assertNoTasksEnqueued("rde-upload"); + cloudTasksHelper.assertNoTasksEnqueued("rde-upload"); // Assert that both files are written to SFTP and GCS, and that the contents are identical. String rydeFilename = "tld_2010-10-17_full_S1_R0.ryde"; String sigFilename = "tld_2010-10-17_full_S1_R0.sig"; @@ -273,6 +315,41 @@ public class RdeUploadActionTest { .isEqualTo(Files.toByteArray(new File(folder, sigFilename))); } + @TestOfyAndSql + void testRunWithLock_copiesOnGcs_withPrefix() throws Exception { + int port = sftpd.serve("user", "password", folder); + URI uploadUrl = URI.create(String.format("sftp://user:password@localhost:%d/", port)); + DateTime stagingCursor = DateTime.parse("2010-10-18TZ"); + DateTime uploadCursor = DateTime.parse("2010-10-17TZ"); + persistResource(Cursor.create(RDE_STAGING, stagingCursor, Registry.get("tld"))); + RdeUploadAction action = createAction(uploadUrl); + action.prefix = Optional.of("job-name/"); + gcsUtils.delete(GHOSTRYDE_FILE); + gcsUtils.createFromBytes( + GHOSTRYDE_FILE_WITH_PREFIX, Ghostryde.encode(DEPOSIT_XML.read(), encryptKey)); + gcsUtils.delete(LENGTH_FILE); + gcsUtils.createFromBytes( + LENGTH_FILE_WITH_PREFIX, Long.toString(DEPOSIT_XML.size()).getBytes(UTF_8)); + gcsUtils.delete(REPORT_FILE); + gcsUtils.createFromBytes( + REPORT_FILE_WITH_PREFIX, Ghostryde.encode(REPORT_XML.read(), encryptKey)); + action.runWithLock(uploadCursor); + assertThat(response.getStatus()).isEqualTo(200); + assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8); + assertThat(response.getPayload()).isEqualTo("OK tld 2010-10-17T00:00:00.000Z\n"); + cloudTasksHelper.assertNoTasksEnqueued("rde-upload"); + // Assert that both files are written to SFTP and GCS, and that the contents are identical. + String rydeFilename = "tld_2010-10-17_full_S1_R0.ryde"; + String rydeGcsFilename = "job-name/tld_2010-10-17_full_S1_R0.ryde"; + String sigFilename = "tld_2010-10-17_full_S1_R0.sig"; + String sigGcsFilename = "job-name/tld_2010-10-17_full_S1_R0.sig"; + assertThat(folder.list()).asList().containsExactly(rydeFilename, sigFilename); + assertThat(gcsUtils.readBytesFrom(BlobId.of("bucket", rydeGcsFilename))) + .isEqualTo(Files.toByteArray(new File(folder, rydeFilename))); + assertThat(gcsUtils.readBytesFrom(BlobId.of("bucket", sigGcsFilename))) + .isEqualTo(Files.toByteArray(new File(folder, sigFilename))); + } + @TestOfyAndSql void testRunWithLock_resend() throws Exception { tm().transact(() -> RdeRevision.saveRevision("tld", DateTime.parse("2010-10-17TZ"), FULL, 1)); @@ -285,7 +362,7 @@ public class RdeUploadActionTest { assertThat(response.getStatus()).isEqualTo(200); assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8); assertThat(response.getPayload()).isEqualTo("OK tld 2010-10-17T00:00:00.000Z\n"); - assertNoTasksEnqueued("rde-upload"); + cloudTasksHelper.assertNoTasksEnqueued("rde-upload"); assertThat(folder.list()) .asList() .containsExactly("tld_2010-10-17_full_S1_R1.ryde", "tld_2010-10-17_full_S1_R1.sig"); @@ -327,7 +404,7 @@ public class RdeUploadActionTest { .isEqualTo( "Waiting on RdeStagingAction for TLD tld to send 2010-10-17T00:00:00.000Z upload; last" + " RDE staging completion was at 1970-01-01T00:00:00.000Z"); - assertNoTasksEnqueued("rde-upload"); + cloudTasksHelper.assertNoTasksEnqueued("rde-upload"); assertThat(folder.list()).isEmpty(); } diff --git a/core/src/test/java/google/registry/testing/CloudTasksHelper.java b/core/src/test/java/google/registry/testing/CloudTasksHelper.java index 342dc2cd6..382968742 100644 --- a/core/src/test/java/google/registry/testing/CloudTasksHelper.java +++ b/core/src/test/java/google/registry/testing/CloudTasksHelper.java @@ -23,13 +23,8 @@ import static com.google.common.truth.Truth.assertWithMessage; import static google.registry.util.DiffUtils.prettyPrintEntityDeepDiff; import static java.util.Arrays.asList; import static java.util.stream.Collectors.joining; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import com.google.cloud.tasks.v2.CloudTasksClient; import com.google.cloud.tasks.v2.HttpMethod; -import com.google.cloud.tasks.v2.QueueName; import com.google.cloud.tasks.v2.Task; import com.google.common.base.Ascii; import com.google.common.base.Joiner; @@ -37,16 +32,20 @@ import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.ImmutableMultiset; import com.google.common.collect.LinkedListMultimap; +import com.google.common.collect.ListMultimap; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; +import com.google.common.collect.Multimaps; import com.google.common.net.HttpHeaders; import com.google.common.net.MediaType; import com.google.common.truth.Truth8; import google.registry.model.ImmutableObject; import google.registry.util.CloudTasksUtils; import google.registry.util.Retrier; +import java.io.Serializable; import java.net.URI; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -54,6 +53,9 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.function.Predicate; import javax.annotation.Nonnull; @@ -65,30 +67,37 @@ import javax.annotation.Nonnull; * helper methods because we have not yet encountered all the use cases with Cloud Tasks. As more * and more Task Queue API usage is migrated to Cloud Tasks we may replicate more methods from the * latter. + * + *

Note the use of {@link AtomicInteger} {@code nextInstanceId} here. When a {@link + * FakeCloudTasksClient} instance, and by extension the {@link CloudTasksHelper} instance that + * contains it is serialized/deserialized, as happens in a Beam pipeline, we to want to push tasks + * to the same test task container that the original instance pushes to, so that we can make + * assertions on them by accessing the original instance. We cannot make the test task container + * itself static because we do not want tasks enqueued in previous tests to interfere with latter + * tests, when they run on the same JVM (and therefore share the same static class members). To + * solve this we put the test container in a static map whose keys are the instance IDs. An + * explicitly created new {@link CloudTasksHelper} (as would be created for a new test method) would + * have a new ID allocated to it, and therefore stores its tasks in a distinct container. A + * deserialized {@link CloudTasksHelper}, on the other hand, will have the same instance ID and + * share the same test class container with its progenitor. */ -public class CloudTasksHelper { +public class CloudTasksHelper implements Serializable { + + private static final long serialVersionUID = -8949359648199614677L; + private static final AtomicInteger nextInstanceId = new AtomicInteger(0); + protected static ConcurrentMap> testTasks = + new ConcurrentHashMap<>(); private static final String PROJECT_ID = "test-project"; private static final String LOCATION_ID = "test-location"; private final Retrier retrier = new Retrier(new FakeSleeper(new FakeClock()), 1); - private final LinkedListMultimap testTasks = LinkedListMultimap.create(); - private final CloudTasksClient mockClient = mock(CloudTasksClient.class); + private final int instanceId = nextInstanceId.getAndIncrement(); private final CloudTasksUtils cloudTasksUtils = - new CloudTasksUtils(retrier, PROJECT_ID, LOCATION_ID, () -> mockClient); + new CloudTasksUtils(retrier, PROJECT_ID, LOCATION_ID, new FakeCloudTasksClient()); public CloudTasksHelper() { - when(mockClient.createTask(any(QueueName.class), any(Task.class))) - .thenAnswer( - invocation -> { - QueueName queue = invocation.getArgument(0); - Task task = invocation.getArgument(1); - if (task.getName().isEmpty()) { - task = task.toBuilder().setName(String.format("test-%d", testTasks.size())).build(); - } - testTasks.put(queue.getQueue(), task); - return task; - }); + testTasks.put(instanceId, Multimaps.synchronizedListMultimap(LinkedListMultimap.create())); } public CloudTasksUtils getTestCloudTasksUtils() { @@ -96,7 +105,7 @@ public class CloudTasksHelper { } public List getTestTasksFor(String queue) { - return testTasks.get(queue); + return new ArrayList<>(testTasks.get(instanceId).get(queue)); } /** @@ -160,6 +169,20 @@ public class CloudTasksHelper { } } + private class FakeCloudTasksClient extends CloudTasksUtils.SerializableCloudTasksClient { + + private static final long serialVersionUID = 6661964844791720639L; + + @Override + public Task enqueue(String projectId, String locationId, String queueName, Task task) { + if (task.getName().isEmpty()) { + task = task.toBuilder().setName(String.format("test-%d", testTasks.size())).build(); + } + testTasks.get(instanceId).put(queueName, task); + return task; + } + } + /** An adapter to clean up a {@link Task} for ease of matching. */ private static class MatchableTask extends ImmutableObject { diff --git a/util/src/main/java/google/registry/util/CloudTasksUtils.java b/util/src/main/java/google/registry/util/CloudTasksUtils.java index 312790c54..5604f254c 100644 --- a/util/src/main/java/google/registry/util/CloudTasksUtils.java +++ b/util/src/main/java/google/registry/util/CloudTasksUtils.java @@ -37,7 +37,7 @@ import com.google.protobuf.ByteString; import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.util.Arrays; -import javax.inject.Provider; +import java.util.function.Supplier; /** Utilities for dealing with Cloud Tasks. */ public class CloudTasksUtils implements Serializable { @@ -48,17 +48,14 @@ public class CloudTasksUtils implements Serializable { private final Retrier retrier; private final String projectId; private final String locationId; - private final Provider clientProvider; + private final SerializableCloudTasksClient client; public CloudTasksUtils( - Retrier retrier, - String projectId, - String locationId, - Provider clientProvider) { + Retrier retrier, String projectId, String locationId, SerializableCloudTasksClient client) { this.retrier = retrier; this.projectId = projectId; this.locationId = locationId; - this.clientProvider = clientProvider; + this.client = client; } public Task enqueue(String queue, Task task) { @@ -69,9 +66,7 @@ public class CloudTasksUtils implements Serializable { queue, task.getAppEngineHttpRequest().getRelativeUri(), task.getAppEngineHttpRequest().getAppEngineRouting().getService()); - try (CloudTasksClient client = clientProvider.get()) { - return client.createTask(QueueName.of(projectId, locationId, queue), task); - } + return client.enqueue(projectId, locationId, queue, task); }, ApiException.class); } @@ -141,4 +136,28 @@ public class CloudTasksUtils implements Serializable { public static Task createGetTask(String path, String service, Multimap params) { return createTask(path, HttpMethod.GET, service, params); } + + public abstract static class SerializableCloudTasksClient implements Serializable { + public abstract Task enqueue(String projectId, String locationId, String queueName, Task task); + } + + public static class GcpCloudTasksClient extends SerializableCloudTasksClient { + + private static final long serialVersionUID = -5959253033129154037L; + + // Use a supplier so that we can use try-with-resources with the client, which implements + // Autocloseable. + private final Supplier clientSupplier; + + public GcpCloudTasksClient(Supplier clientSupplier) { + this.clientSupplier = clientSupplier; + } + + @Override + public Task enqueue(String projectId, String locationId, String queueName, Task task) { + try (CloudTasksClient client = clientSupplier.get()) { + return client.createTask(QueueName.of(projectId, locationId, queueName), task); + } + } + } } diff --git a/util/src/main/java/google/registry/util/GoogleCredentialsBundle.java b/util/src/main/java/google/registry/util/GoogleCredentialsBundle.java index b42fb13f5..b0d374100 100644 --- a/util/src/main/java/google/registry/util/GoogleCredentialsBundle.java +++ b/util/src/main/java/google/registry/util/GoogleCredentialsBundle.java @@ -22,25 +22,23 @@ import com.google.api.client.http.HttpTransport; import com.google.api.client.json.JsonFactory; import com.google.auth.http.HttpCredentialsAdapter; import com.google.auth.oauth2.GoogleCredentials; +import java.io.Serializable; /** * Helper class to provide {@link HttpTransport}, {@link JsonFactory} and {@link * HttpRequestInitializer} for a given {@link GoogleCredentials}. These classes are normally needed * for creating the instance of a GCP client. */ -public class GoogleCredentialsBundle { +public class GoogleCredentialsBundle implements Serializable { + + private static final HttpTransport HTTP_TRANSPORT = Utils.getDefaultTransport(); + private static final JsonFactory JSON_FACTORY = Utils.getDefaultJsonFactory(); private GoogleCredentials googleCredentials; - private HttpTransport httpTransport; - private JsonFactory jsonFactory; - private HttpRequestInitializer httpRequestInitializer; private GoogleCredentialsBundle(GoogleCredentials googleCredentials) { checkNotNull(googleCredentials); this.googleCredentials = googleCredentials; - this.httpTransport = Utils.getDefaultTransport(); - this.jsonFactory = Utils.getDefaultJsonFactory(); - this.httpRequestInitializer = new HttpCredentialsAdapter(googleCredentials); } /** Creates a {@link GoogleCredentialsBundle} instance from given {@link GoogleCredentials}. */ @@ -55,16 +53,16 @@ public class GoogleCredentialsBundle { /** Returns the instance of {@link HttpTransport}. */ public HttpTransport getHttpTransport() { - return httpTransport; + return HTTP_TRANSPORT; } /** Returns the instance of {@link JsonFactory}. */ public JsonFactory getJsonFactory() { - return jsonFactory; + return JSON_FACTORY; } /** Returns the instance of {@link HttpRequestInitializer}. */ public HttpRequestInitializer getHttpRequestInitializer() { - return httpRequestInitializer; + return new HttpCredentialsAdapter(googleCredentials); } } diff --git a/util/src/main/java/google/registry/util/PlaceholderEnvironment.java b/util/src/main/java/google/registry/util/PlaceholderEnvironment.java new file mode 100644 index 000000000..c8b80401b --- /dev/null +++ b/util/src/main/java/google/registry/util/PlaceholderEnvironment.java @@ -0,0 +1,82 @@ +// Copyright 2020 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.util; + +import com.google.apphosting.api.ApiProxy.Environment; +import com.google.common.collect.ImmutableMap; +import java.util.Map; + +/** A placeholder GAE environment class that is used when masquerading a thread as a GAE thread. */ +public final class PlaceholderEnvironment implements Environment { + + private static final PlaceholderEnvironment INSTANCE = new PlaceholderEnvironment(); + + public static PlaceholderEnvironment get() { + return INSTANCE; + } + + private PlaceholderEnvironment() {} + + @Override + public String getAppId() { + return "PlaceholderAppId"; + } + + @Override + public Map getAttributes() { + return ImmutableMap.of(); + } + + @Override + public String getModuleId() { + throw new UnsupportedOperationException(); + } + + @Override + public String getVersionId() { + throw new UnsupportedOperationException(); + } + + @Override + public String getEmail() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isLoggedIn() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isAdmin() { + throw new UnsupportedOperationException(); + } + + @Override + public String getAuthDomain() { + throw new UnsupportedOperationException(); + } + + @SuppressWarnings("deprecation") + @Override + public String getRequestNamespace() { + throw new UnsupportedOperationException(); + } + + @Override + public long getRemainingMillis() { + throw new UnsupportedOperationException(); + } +} diff --git a/util/src/test/java/google/registry/util/CloudTasksUtilsTest.java b/util/src/test/java/google/registry/util/CloudTasksUtilsTest.java index ea73df506..6a0bc6e36 100644 --- a/util/src/test/java/google/registry/util/CloudTasksUtilsTest.java +++ b/util/src/test/java/google/registry/util/CloudTasksUtilsTest.java @@ -17,18 +17,18 @@ package google.registry.util; import static com.google.common.truth.Truth.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import com.google.cloud.tasks.v2.CloudTasksClient; import com.google.cloud.tasks.v2.HttpMethod; -import com.google.cloud.tasks.v2.QueueName; import com.google.cloud.tasks.v2.Task; import com.google.common.collect.ImmutableList; import com.google.common.collect.LinkedListMultimap; import google.registry.testing.FakeClock; import google.registry.testing.FakeSleeper; +import google.registry.util.CloudTasksUtils.SerializableCloudTasksClient; import java.nio.charset.StandardCharsets; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -37,21 +37,18 @@ import org.junit.jupiter.api.Test; public class CloudTasksUtilsTest { // Use a LinkedListMultimap to preserve order of the inserted entries for assertion. private final LinkedListMultimap params = LinkedListMultimap.create(); - private final CloudTasksClient mockClient = mock(CloudTasksClient.class); + private final SerializableCloudTasksClient mockClient = mock(SerializableCloudTasksClient.class); private final CloudTasksUtils cloudTasksUtils = new CloudTasksUtils( - new Retrier(new FakeSleeper(new FakeClock()), 1), - "project", - "location", - () -> mockClient); + new Retrier(new FakeSleeper(new FakeClock()), 1), "project", "location", mockClient); @BeforeEach void beforeEach() { params.put("key1", "val1"); params.put("key2", "val2"); params.put("key1", "val3"); - when(mockClient.createTask(any(QueueName.class), any(Task.class))) - .thenAnswer(invocation -> invocation.getArgument(1)); + when(mockClient.enqueue(anyString(), anyString(), anyString(), any(Task.class))) + .thenAnswer(invocation -> invocation.getArgument(3)); } @Test @@ -94,7 +91,7 @@ public class CloudTasksUtilsTest { void testSuccess_enqueueTask() { Task task = CloudTasksUtils.createGetTask("/the/path", "myservice", params); cloudTasksUtils.enqueue("test-queue", task); - verify(mockClient).createTask(QueueName.of("project", "location", "test-queue"), task); + verify(mockClient).enqueue("project", "location", "test-queue", task); } @Test @@ -102,8 +99,8 @@ public class CloudTasksUtilsTest { Task task1 = CloudTasksUtils.createGetTask("/the/path", "myservice", params); Task task2 = CloudTasksUtils.createGetTask("/other/path", "yourservice", params); cloudTasksUtils.enqueue("test-queue", task1, task2); - verify(mockClient).createTask(QueueName.of("project", "location", "test-queue"), task1); - verify(mockClient).createTask(QueueName.of("project", "location", "test-queue"), task2); + verify(mockClient).enqueue("project", "location", "test-queue", task1); + verify(mockClient).enqueue("project", "location", "test-queue", task2); } @Test @@ -111,7 +108,7 @@ public class CloudTasksUtilsTest { Task task1 = CloudTasksUtils.createGetTask("/the/path", "myservice", params); Task task2 = CloudTasksUtils.createGetTask("/other/path", "yourservice", params); cloudTasksUtils.enqueue("test-queue", ImmutableList.of(task1, task2)); - verify(mockClient).createTask(QueueName.of("project", "location", "test-queue"), task1); - verify(mockClient).createTask(QueueName.of("project", "location", "test-queue"), task2); + verify(mockClient).enqueue("project", "location", "test-queue", task1); + verify(mockClient).enqueue("project", "location", "test-queue", task2); } }