Finish RDE pipeline implementation in SQL mode (#1330)

This PR adds the final step in RDE pipeline (enqueueing the next action
  to Cloud Tasks) and makes some necessary changes, namely by making all
  CloudTasksUtils related classes serializable, so that they can be used
  on Beam.

<!-- Reviewable:start -->
This change is [<img src="https://reviewable.io/review_button.svg" height="34" align="absmiddle" alt="Reviewable"/>](https://reviewable.io/reviews/google/nomulus/1330)
<!-- Reviewable:end -->
This commit is contained in:
Lai Jiang 2021-10-04 21:02:44 -04:00 committed by GitHub
parent 6cb0cf5f6d
commit 57e58ce8b7
27 changed files with 903 additions and 206 deletions

View file

@ -783,27 +783,27 @@ createUberJar(
// User should install gcloud and login to GCP before invoking this tasks.
if (environment == 'alpha') {
def pipelines = [
InitSql :
initSql :
[
mainClass: 'google.registry.beam.initsql.InitSqlPipeline',
metaData : 'google/registry/beam/init_sql_pipeline_metadata.json'
],
BulkDeleteDatastore:
bulkDeleteDatastore:
[
mainClass: 'google.registry.beam.datastore.BulkDeleteDatastorePipeline',
metaData : 'google/registry/beam/bulk_delete_datastore_pipeline_metadata.json'
],
Spec11 :
spec11 :
[
mainClass: 'google.registry.beam.spec11.Spec11Pipeline',
metaData : 'google/registry/beam/spec11_pipeline_metadata.json'
],
Invoicing :
invoicing :
[
mainClass: 'google.registry.beam.invoicing.InvoicingPipeline',
metaData : 'google/registry/beam/invoicing_pipeline_metadata.json'
],
Rde :
rde :
[
mainClass: 'google.registry.beam.rde.RdePipeline',
metaData : 'google/registry/beam/rde_pipeline_metadata.json'

View file

@ -19,10 +19,13 @@ import static com.google.common.base.Verify.verify;
import static google.registry.model.common.Cursor.getCursorTimeOrStartOfTime;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import static google.registry.persistence.transaction.TransactionManagerUtil.transactIfJpaTm;
import static google.registry.rde.RdeModule.BRDA_QUEUE;
import static google.registry.rde.RdeModule.RDE_UPLOAD_QUEUE;
import static java.nio.charset.StandardCharsets.UTF_8;
import com.google.auto.value.AutoValue;
import com.google.cloud.storage.BlobId;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.flogger.FluentLogger;
import google.registry.gcs.GcsUtils;
import google.registry.keyring.api.PgpHelper;
@ -31,14 +34,20 @@ import google.registry.model.rde.RdeMode;
import google.registry.model.rde.RdeNamingUtils;
import google.registry.model.rde.RdeRevision;
import google.registry.model.tld.Registry;
import google.registry.rde.BrdaCopyAction;
import google.registry.rde.DepositFragment;
import google.registry.rde.Ghostryde;
import google.registry.rde.PendingDeposit;
import google.registry.rde.RdeCounter;
import google.registry.rde.RdeMarshaller;
import google.registry.rde.RdeModule;
import google.registry.rde.RdeResourceType;
import google.registry.rde.RdeUploadAction;
import google.registry.rde.RdeUtil;
import google.registry.request.Action.Service;
import google.registry.request.RequestParameters;
import google.registry.tldconfig.idn.IdnTableEnum;
import google.registry.util.CloudTasksUtils;
import google.registry.xjc.rdeheader.XjcRdeHeader;
import google.registry.xjc.rdeheader.XjcRdeHeaderElement;
import google.registry.xml.ValidationMode;
@ -68,6 +77,8 @@ public class RdeIO {
abstract GcsUtils gcsUtils();
abstract CloudTasksUtils cloudTasksUtils();
abstract String rdeBucket();
// It's OK to return a primitive array because we are only using it to construct the
@ -83,7 +94,9 @@ public class RdeIO {
@AutoValue.Builder
abstract static class Builder {
abstract Builder setGcsUtils(GcsUtils gcsUtils);
abstract Builder setGcsUtils(GcsUtils value);
abstract Builder setCloudTasksUtils(CloudTasksUtils value);
abstract Builder setRdeBucket(String value);
@ -100,7 +113,8 @@ public class RdeIO {
.apply(
"Write to GCS",
ParDo.of(new RdeWriter(gcsUtils(), rdeBucket(), stagingKeyBytes(), validationMode())))
.apply("Update cursors", ParDo.of(new CursorUpdater()));
.apply("Update cursors", ParDo.of(new CursorUpdater()))
.apply("Enqueue upload action", ParDo.of(new UploadEnqueuer(cloudTasksUtils())));
return PDone.in(input.getPipeline());
}
}
@ -236,11 +250,12 @@ public class RdeIO {
}
}
private static class CursorUpdater extends DoFn<KV<PendingDeposit, Integer>, Void> {
private static class CursorUpdater extends DoFn<KV<PendingDeposit, Integer>, PendingDeposit> {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
@ProcessElement
public void processElement(@Element KV<PendingDeposit, Integer> input) {
public void processElement(
@Element KV<PendingDeposit, Integer> input, OutputReceiver<PendingDeposit> outputReceiver) {
tm().transact(
() -> {
PendingDeposit key = input.getKey();
@ -268,6 +283,45 @@ public class RdeIO {
"Rolled forward %s on %s cursor to %s.", key.cursor(), key.tld(), newPosition);
RdeRevision.saveRevision(key.tld(), key.watermark(), key.mode(), revision);
});
outputReceiver.output(input.getKey());
}
}
private static class UploadEnqueuer extends DoFn<PendingDeposit, Void> {
private final CloudTasksUtils cloudTasksUtils;
private UploadEnqueuer(CloudTasksUtils cloudTasksUtils) {
this.cloudTasksUtils = cloudTasksUtils;
}
@ProcessElement
public void processElement(@Element PendingDeposit input, PipelineOptions options) {
if (input.mode() == RdeMode.FULL) {
cloudTasksUtils.enqueue(
RDE_UPLOAD_QUEUE,
CloudTasksUtils.createPostTask(
RdeUploadAction.PATH,
Service.BACKEND.getServiceId(),
ImmutableMultimap.of(
RequestParameters.PARAM_TLD,
input.tld(),
RdeModule.PARAM_PREFIX,
options.getJobName() + '/')));
} else {
cloudTasksUtils.enqueue(
BRDA_QUEUE,
CloudTasksUtils.createPostTask(
BrdaCopyAction.PATH,
Service.BACKEND.getServiceId(),
ImmutableMultimap.of(
RequestParameters.PARAM_TLD,
input.tld(),
RdeModule.PARAM_WATERMARK,
input.watermark().toString(),
RdeModule.PARAM_PREFIX,
options.getJobName() + '/')));
}
}
}
}

View file

@ -27,6 +27,7 @@ import com.google.common.io.BaseEncoding;
import dagger.BindsInstance;
import dagger.Component;
import google.registry.beam.common.RegistryJpaIO;
import google.registry.config.CloudTasksUtilsModule;
import google.registry.config.CredentialModule;
import google.registry.config.RegistryConfig.ConfigModule;
import google.registry.gcs.GcsUtils;
@ -44,6 +45,8 @@ import google.registry.rde.PendingDeposit;
import google.registry.rde.PendingDeposit.PendingDepositCoder;
import google.registry.rde.RdeFragmenter;
import google.registry.rde.RdeMarshaller;
import google.registry.util.CloudTasksUtils;
import google.registry.util.UtilsModule;
import google.registry.xml.ValidationMode;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@ -66,7 +69,6 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
@ -93,6 +95,7 @@ public class RdePipeline implements Serializable {
private final String rdeBucket;
private final byte[] stagingKeyBytes;
private final GcsUtils gcsUtils;
private final CloudTasksUtils cloudTasksUtils;
// Registrars to be excluded from data escrow. Not including the sandbox-only OTE type so that
// if sneaks into production we would get an extra signal.
@ -111,13 +114,14 @@ public class RdePipeline implements Serializable {
}
@Inject
RdePipeline(RdePipelineOptions options, GcsUtils gcsUtils) {
RdePipeline(RdePipelineOptions options, GcsUtils gcsUtils, CloudTasksUtils cloudTasksUtils) {
this.options = options;
this.mode = ValidationMode.valueOf(options.getValidationMode());
this.pendings = decodePendings(options.getPendings());
this.rdeBucket = options.getGcsBucket();
this.rdeBucket = options.getRdeStagingBucket();
this.stagingKeyBytes = BaseEncoding.base64Url().decode(options.getStagingKey());
this.gcsUtils = gcsUtils;
this.cloudTasksUtils = cloudTasksUtils;
}
PipelineResult run() {
@ -140,10 +144,11 @@ public class RdePipeline implements Serializable {
void persistData(PCollection<KV<PendingDeposit, Iterable<DepositFragment>>> input) {
input.apply(
"Write to GCS and update cursors",
"Write to GCS, update cursors, and enqueue upload tasks",
RdeIO.Write.builder()
.setRdeBucket(rdeBucket)
.setGcsUtils(gcsUtils)
.setCloudTasksUtils(cloudTasksUtils)
.setValidationMode(mode)
.setStagingKeyBytes(stagingKeyBytes)
.build());
@ -177,18 +182,13 @@ public class RdePipeline implements Serializable {
}));
}
@SuppressWarnings("deprecation") // Reshuffle is still recommended by Dataflow.
<T extends EppResource>
PCollection<KV<PendingDeposit, DepositFragment>> processNonRegistrarEntities(
Pipeline pipeline, Class<T> clazz) {
return createInputs(pipeline, clazz)
.apply("Marshal " + clazz.getSimpleName() + " into DepositFragment", mapToFragments(clazz))
.setCoder(KvCoder.of(PendingDepositCoder.of(), SerializableCoder.of(DepositFragment.class)))
.apply(
"Reshuffle KV<PendingDeposit, DepositFragment> of "
+ clazz.getSimpleName()
+ " to prevent fusion",
Reshuffle.of());
.setCoder(
KvCoder.of(PendingDepositCoder.of(), SerializableCoder.of(DepositFragment.class)));
}
<T extends EppResource> PCollection<VKey<T>> createInputs(Pipeline pipeline, Class<T> clazz) {
@ -202,7 +202,7 @@ public class RdePipeline implements Serializable {
String.class,
// TODO: consider adding coders for entities and pass them directly instead of using
// VKeys.
x -> VKey.create(clazz, x)));
x -> VKey.createSql(clazz, x)));
}
<T extends EppResource>
@ -270,7 +270,7 @@ public class RdePipeline implements Serializable {
* Encodes the TLD to pending deposit map in an URL safe string that is sent to the pipeline
* worker by the pipeline launcher as a pipeline option.
*/
static String encodePendings(ImmutableSetMultimap<String, PendingDeposit> pendings)
public static String encodePendings(ImmutableSetMultimap<String, PendingDeposit> pendings)
throws IOException {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
ObjectOutputStream oos = new ObjectOutputStream(baos);
@ -282,13 +282,24 @@ public class RdePipeline implements Serializable {
public static void main(String[] args) throws IOException, ClassNotFoundException {
PipelineOptionsFactory.register(RdePipelineOptions.class);
RdePipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(RdePipelineOptions.class);
RdePipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(RdePipelineOptions.class);
// RegistryPipelineWorkerInitializer only initializes before pipeline executions, after the
// main() function constructed the graph. We need the registry environment set up so that we
// can create a CloudTasksUtils which uses the environment-dependent config file.
options.getRegistryEnvironment().setup();
options.setIsolationOverride(TransactionIsolationLevel.TRANSACTION_READ_COMMITTED);
DaggerRdePipeline_RdePipelineComponent.builder().options(options).build().rdePipeline().run();
}
@Singleton
@Component(modules = {CredentialModule.class, ConfigModule.class})
@Component(
modules = {
CredentialModule.class,
ConfigModule.class,
CloudTasksUtilsModule.class,
UtilsModule.class
})
interface RdePipelineComponent {
RdePipeline rdePipeline();

View file

@ -31,9 +31,9 @@ public interface RdePipelineOptions extends RegistryPipelineOptions {
void setValidationMode(String value);
@Description("The GCS bucket where the encrypted RDE deposits will be uploaded to.")
String getGcsBucket();
String getRdeStagingBucket();
void setGcsBucket(String value);
void setRdeStagingBucket(String value);
@Description("The Base64-encoded PGP public key to encrypt the deposits.")
String getStagingKey();

View file

@ -22,10 +22,13 @@ import dagger.Provides;
import google.registry.config.CredentialModule.DefaultCredential;
import google.registry.config.RegistryConfig.Config;
import google.registry.util.CloudTasksUtils;
import google.registry.util.CloudTasksUtils.GcpCloudTasksClient;
import google.registry.util.CloudTasksUtils.SerializableCloudTasksClient;
import google.registry.util.GoogleCredentialsBundle;
import google.registry.util.Retrier;
import java.io.IOException;
import javax.inject.Provider;
import java.io.Serializable;
import java.util.function.Supplier;
import javax.inject.Singleton;
/**
@ -42,24 +45,35 @@ public abstract class CloudTasksUtilsModule {
public static CloudTasksUtils provideCloudTasksUtils(
@Config("projectId") String projectId,
@Config("locationId") String locationId,
// Use a provider so that we can use try-with-resources with the client, which implements
// Autocloseable.
Provider<CloudTasksClient> clientProvider,
SerializableCloudTasksClient client,
Retrier retrier) {
return new CloudTasksUtils(retrier, projectId, locationId, clientProvider);
return new CloudTasksUtils(retrier, projectId, locationId, client);
}
// Provides a supplier instead of using a Dagger @Provider because the latter is not serializable.
@Provides
public static Supplier<CloudTasksClient> provideCloudTasksClientSupplier(
@DefaultCredential GoogleCredentialsBundle credentials) {
return (Supplier<CloudTasksClient> & Serializable)
() -> {
CloudTasksClient client;
try {
client =
CloudTasksClient.create(
CloudTasksSettings.newBuilder()
.setCredentialsProvider(
FixedCredentialsProvider.create(credentials.getGoogleCredentials()))
.build());
} catch (IOException e) {
throw new RuntimeException(e);
}
return client;
};
}
@Provides
public static CloudTasksClient provideCloudTasksClient(
@DefaultCredential GoogleCredentialsBundle credentials) {
try {
return CloudTasksClient.create(
CloudTasksSettings.newBuilder()
.setCredentialsProvider(
FixedCredentialsProvider.create(credentials.getGoogleCredentials()))
.build());
} catch (IOException e) {
throw new RuntimeException(e);
}
public static SerializableCloudTasksClient provideSerializableCloudTasksClient(
final Supplier<CloudTasksClient> clientSupplier) {
return new GcpCloudTasksClient(clientSupplier);
}
}

View file

@ -31,6 +31,7 @@ import google.registry.request.auth.Auth;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Optional;
import javax.inject.Inject;
import org.bouncycastle.openpgp.PGPKeyPair;
import org.bouncycastle.openpgp.PGPPrivateKey;
@ -60,7 +61,7 @@ import org.joda.time.DateTime;
auth = Auth.AUTH_INTERNAL_OR_ADMIN)
public final class BrdaCopyAction implements Runnable {
static final String PATH = "/_dr/task/brdaCopy";
public static final String PATH = "/_dr/task/brdaCopy";
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
@ -69,6 +70,7 @@ public final class BrdaCopyAction implements Runnable {
@Inject @Config("rdeBucket") String stagingBucket;
@Inject @Parameter(RequestParameters.PARAM_TLD) String tld;
@Inject @Parameter(RdeModule.PARAM_WATERMARK) DateTime watermark;
@Inject @Parameter(RdeModule.PARAM_PREFIX) Optional<String> prefix;
@Inject @Key("brdaReceiverKey") PGPPublicKey receiverKey;
@Inject @Key("brdaSigningKey") PGPKeyPair signingKey;
@Inject @Key("rdeStagingDecryptionKey") PGPPrivateKey stagingDecryptionKey;
@ -84,11 +86,12 @@ public final class BrdaCopyAction implements Runnable {
}
private void copyAsRyde() throws IOException {
String prefix = RdeNamingUtils.makeRydeFilename(tld, watermark, THIN, 1, 0);
BlobId xmlFilename = BlobId.of(stagingBucket, prefix + ".xml.ghostryde");
BlobId xmlLengthFilename = BlobId.of(stagingBucket, prefix + ".xml.length");
BlobId rydeFile = BlobId.of(brdaBucket, prefix + ".ryde");
BlobId sigFile = BlobId.of(brdaBucket, prefix + ".sig");
String nameWithoutPrefix = RdeNamingUtils.makeRydeFilename(tld, watermark, THIN, 1, 0);
String name = prefix.orElse("") + nameWithoutPrefix;
BlobId xmlFilename = BlobId.of(stagingBucket, name + ".xml.ghostryde");
BlobId xmlLengthFilename = BlobId.of(stagingBucket, name + ".xml.length");
BlobId rydeFile = BlobId.of(brdaBucket, nameWithoutPrefix + ".ryde");
BlobId sigFile = BlobId.of(brdaBucket, nameWithoutPrefix + ".sig");
long xmlLength = readXmlLength(xmlLengthFilename);
@ -97,11 +100,12 @@ public final class BrdaCopyAction implements Runnable {
InputStream ghostrydeDecoder = Ghostryde.decoder(gcsInput, stagingDecryptionKey);
OutputStream rydeOut = gcsUtils.openOutputStream(rydeFile);
OutputStream sigOut = gcsUtils.openOutputStream(sigFile);
RydeEncoder rydeEncoder = new RydeEncoder.Builder()
.setRydeOutput(rydeOut, receiverKey)
.setSignatureOutput(sigOut, signingKey)
.setFileMetadata(prefix, xmlLength, watermark)
.build()) {
RydeEncoder rydeEncoder =
new RydeEncoder.Builder()
.setRydeOutput(rydeOut, receiverKey)
.setSignatureOutput(sigOut, signingKey)
.setFileMetadata(nameWithoutPrefix, xmlLength, watermark)
.build()) {
ByteStreams.copy(ghostrydeDecoder, rydeEncoder);
}
}

View file

@ -49,6 +49,10 @@ public abstract class RdeModule {
public static final String PARAM_MODE = "mode";
public static final String PARAM_REVISION = "revision";
public static final String PARAM_LENIENT = "lenient";
public static final String PARAM_PREFIX = "prefix";
public static final String RDE_UPLOAD_QUEUE = "rde-upload";
public static final String RDE_REPORT_QUEUE = "rde-report";
public static final String BRDA_QUEUE = "brda";
@Provides
@Parameter(PARAM_WATERMARK)
@ -92,10 +96,11 @@ public abstract class RdeModule {
return extractBooleanParameter(req, PARAM_LENIENT);
}
// TODO (jianglai): Make it a required parameter once we migrate to Cloud SQL.
@Provides
@Named("brda")
static Queue provideQueueBrda() {
return getQueue("brda");
@Parameter(PARAM_PREFIX)
static Optional<String> providePrefix(HttpServletRequest req) {
return extractOptionalParameter(req, PARAM_PREFIX);
}
@Provides

View file

@ -68,6 +68,7 @@ public final class RdeReportAction implements Runnable, EscrowTask {
@Inject Response response;
@Inject RdeReporter reporter;
@Inject @Parameter(RequestParameters.PARAM_TLD) String tld;
@Inject @Parameter(RdeModule.PARAM_PREFIX) Optional<String> prefix;
@Inject @Config("rdeBucket") String bucket;
@Inject @Config("rdeInterval") Duration interval;
@Inject @Config("rdeReportLockTimeout") Duration timeout;
@ -96,8 +97,9 @@ public final class RdeReportAction implements Runnable, EscrowTask {
RdeRevision.getCurrentRevision(tld, watermark, FULL)
.orElseThrow(
() -> new IllegalStateException("RdeRevision was not set on generated deposit"));
String prefix = RdeNamingUtils.makeRydeFilename(tld, watermark, FULL, 1, revision);
BlobId reportFilename = BlobId.of(bucket, prefix + "-report.xml.ghostryde");
String name =
prefix.orElse("") + RdeNamingUtils.makeRydeFilename(tld, watermark, FULL, 1, revision);
BlobId reportFilename = BlobId.of(bucket, name + "-report.xml.ghostryde");
verify(gcsUtils.existsAndNotEmpty(reportFilename), "Missing file: %s", reportFilename);
reporter.send(readReportFromGcs(reportFilename));
response.setContentType(PLAIN_TEXT_UTF_8);

View file

@ -14,20 +14,33 @@
package google.registry.rde;
import static google.registry.beam.BeamUtils.createJobName;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import static google.registry.request.Action.Method.GET;
import static google.registry.request.Action.Method.POST;
import static google.registry.xml.ValidationMode.LENIENT;
import static google.registry.xml.ValidationMode.STRICT;
import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
import static javax.servlet.http.HttpServletResponse.SC_NO_CONTENT;
import static javax.servlet.http.HttpServletResponse.SC_OK;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.LaunchFlexTemplateParameter;
import com.google.api.services.dataflow.model.LaunchFlexTemplateRequest;
import com.google.api.services.dataflow.model.LaunchFlexTemplateResponse;
import com.google.common.base.Ascii;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.collect.Multimaps;
import com.google.common.flogger.FluentLogger;
import com.google.common.io.BaseEncoding;
import google.registry.beam.rde.RdePipeline;
import google.registry.config.RegistryConfig.Config;
import google.registry.config.RegistryEnvironment;
import google.registry.gcs.GcsUtils;
import google.registry.keyring.api.KeyModule.Key;
import google.registry.mapreduce.MapreduceRunner;
import google.registry.mapreduce.inputs.EppResourceInputs;
import google.registry.mapreduce.inputs.NullInput;
@ -47,6 +60,7 @@ import google.registry.request.Response;
import google.registry.request.auth.Auth;
import google.registry.util.Clock;
import google.registry.xml.ValidationMode;
import java.io.IOException;
import java.util.Optional;
import javax.inject.Inject;
import org.joda.time.DateTime;
@ -201,6 +215,7 @@ public final class RdeStagingAction implements Runnable {
public static final String PATH = "/_dr/task/rdeStaging";
private static final String PIPELINE_NAME = "rde_pipeline";
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
@Inject Clock clock;
@ -209,7 +224,11 @@ public final class RdeStagingAction implements Runnable {
@Inject Response response;
@Inject GcsUtils gcsUtils;
@Inject MapreduceRunner mrRunner;
@Inject @Config("projectId") String projectId;
@Inject @Config("defaultJobRegion") String jobRegion;
@Inject @Config("transactionCooldown") Duration transactionCooldown;
@Inject @Config("beamStagingBucketUrl") String stagingBucketUrl;
@Inject @Config("rdeBucket") String rdeBucket;
@Inject @Parameter(RdeModule.PARAM_MANUAL) boolean manual;
@Inject @Parameter(RdeModule.PARAM_DIRECTORY) Optional<String> directory;
@Inject @Parameter(RdeModule.PARAM_MODE) ImmutableSet<String> modeStrings;
@ -217,7 +236,8 @@ public final class RdeStagingAction implements Runnable {
@Inject @Parameter(RdeModule.PARAM_WATERMARKS) ImmutableSet<DateTime> watermarks;
@Inject @Parameter(RdeModule.PARAM_REVISION) Optional<Integer> revision;
@Inject @Parameter(RdeModule.PARAM_LENIENT) boolean lenient;
@Inject @Key("rdeStagingEncryptionKey") byte[] stagingKeyBytes;
@Inject Dataflow dataflow;
@Inject RdeStagingAction() {}
@Override
@ -228,27 +248,66 @@ public final class RdeStagingAction implements Runnable {
String message = "Nothing needs to be deposited.";
logger.atInfo().log(message);
response.setStatus(SC_NO_CONTENT);
response.setPayload(message);
// No need to set payload as HTTP 204 response status code does not allow a payload.
return;
}
for (PendingDeposit pending : pendings.values()) {
logger.atInfo().log("Pending deposit: %s", pending);
}
ValidationMode validationMode = lenient ? LENIENT : STRICT;
RdeStagingMapper mapper = new RdeStagingMapper(validationMode, pendings);
RdeStagingReducer reducer = reducerFactory.create(validationMode, gcsUtils);
mrRunner
.setJobName("Stage escrow deposits for all TLDs")
.setModuleName("backend")
.setDefaultReduceShards(pendings.size())
.runMapreduce(
mapper,
reducer,
ImmutableList.of(
// Add an extra shard that maps over a null resource. See the mapper code for why.
new NullInput<>(), EppResourceInputs.createEntityInput(EppResource.class)))
.sendLinkToMapreduceConsole(response);
if (tm().isOfy()) {
RdeStagingMapper mapper = new RdeStagingMapper(validationMode, pendings);
RdeStagingReducer reducer = reducerFactory.create(validationMode, gcsUtils);
mrRunner
.setJobName("Stage escrow deposits for all TLDs")
.setModuleName("backend")
.setDefaultReduceShards(pendings.size())
.runMapreduce(
mapper,
reducer,
ImmutableList.of(
// Add an extra shard that maps over a null resource. See the mapper code for why.
new NullInput<>(), EppResourceInputs.createEntityInput(EppResource.class)))
.sendLinkToMapreduceConsole(response);
} else {
try {
LaunchFlexTemplateParameter parameter =
new LaunchFlexTemplateParameter()
.setJobName(createJobName("rde", clock))
.setContainerSpecGcsPath(
String.format("%s/%s_metadata.json", stagingBucketUrl, PIPELINE_NAME))
.setParameters(
ImmutableMap.of(
"pendings",
RdePipeline.encodePendings(pendings),
"validationMode",
validationMode.name(),
"rdeStagingBucket",
rdeBucket,
"stagingKey",
BaseEncoding.base64Url().omitPadding().encode(stagingKeyBytes),
"registryEnvironment",
RegistryEnvironment.get().name()));
LaunchFlexTemplateResponse launchResponse =
dataflow
.projects()
.locations()
.flexTemplates()
.launch(
projectId,
jobRegion,
new LaunchFlexTemplateRequest().setLaunchParameter(parameter))
.execute();
logger.atInfo().log("Got response: %s", launchResponse.getJob().toPrettyString());
response.setStatus(SC_OK);
response.setPayload(
String.format("Launched RDE pipeline: %s", launchResponse.getJob().getId()));
} catch (IOException e) {
logger.atWarning().withCause(e).log("Pipeline Launch failed");
response.setStatus(SC_INTERNAL_SERVER_ERROR);
response.setPayload(String.format("Pipeline launch failed: %s", e.getMessage()));
}
}
}
private ImmutableSetMultimap<String, PendingDeposit> getStandardPendingDeposits() {

View file

@ -14,7 +14,6 @@
package google.registry.rde;
import static com.google.appengine.api.taskqueue.TaskOptions.Builder.withUrl;
import static com.google.common.base.Verify.verify;
import static com.google.common.net.MediaType.PLAIN_TEXT_UTF_8;
import static com.jcraft.jsch.ChannelSftp.OVERWRITE;
@ -24,14 +23,15 @@ import static google.registry.model.common.Cursor.getCursorTimeOrStartOfTime;
import static google.registry.model.rde.RdeMode.FULL;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import static google.registry.persistence.transaction.TransactionManagerUtil.transactIfJpaTm;
import static google.registry.rde.RdeModule.RDE_REPORT_QUEUE;
import static google.registry.request.Action.Method.POST;
import static google.registry.util.DateTimeUtils.START_OF_TIME;
import static google.registry.util.DateTimeUtils.isBeforeOrAt;
import static java.util.Arrays.asList;
import com.google.appengine.api.taskqueue.Queue;
import com.google.cloud.storage.BlobId;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashMultimap;
import com.google.common.flogger.FluentLogger;
import com.google.common.io.ByteStreams;
import com.jcraft.jsch.JSch;
@ -49,14 +49,15 @@ import google.registry.model.tld.Registry;
import google.registry.rde.EscrowTaskRunner.EscrowTask;
import google.registry.rde.JSchSshSession.JSchSshSessionFactory;
import google.registry.request.Action;
import google.registry.request.Action.Service;
import google.registry.request.HttpException.NoContentException;
import google.registry.request.Parameter;
import google.registry.request.RequestParameters;
import google.registry.request.Response;
import google.registry.request.auth.Auth;
import google.registry.util.Clock;
import google.registry.util.CloudTasksUtils;
import google.registry.util.Retrier;
import google.registry.util.TaskQueueUtils;
import google.registry.util.TeeOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@ -66,7 +67,6 @@ import java.io.OutputStream;
import java.net.URI;
import java.util.Optional;
import javax.inject.Inject;
import javax.inject.Named;
import org.bouncycastle.openpgp.PGPKeyPair;
import org.bouncycastle.openpgp.PGPPrivateKey;
import org.bouncycastle.openpgp.PGPPublicKey;
@ -90,7 +90,7 @@ import org.joda.time.Duration;
auth = Auth.AUTH_INTERNAL_OR_ADMIN)
public final class RdeUploadAction implements Runnable, EscrowTask {
static final String PATH = "/_dr/task/rdeUpload";
public static final String PATH = "/_dr/task/rdeUpload";
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
@ -109,9 +109,10 @@ public final class RdeUploadAction implements Runnable, EscrowTask {
@Inject JSchSshSessionFactory jschSshSessionFactory;
@Inject Response response;
@Inject SftpProgressMonitor sftpProgressMonitor;
@Inject TaskQueueUtils taskQueueUtils;
@Inject CloudTasksUtils cloudTasksUtils;
@Inject Retrier retrier;
@Inject @Parameter(RequestParameters.PARAM_TLD) String tld;
@Inject @Parameter(RdeModule.PARAM_PREFIX) Optional<String> prefix;
@Inject @Config("rdeBucket") String bucket;
@Inject @Config("rdeInterval") Duration interval;
@Inject @Config("rdeUploadLockTimeout") Duration timeout;
@ -120,15 +121,21 @@ public final class RdeUploadAction implements Runnable, EscrowTask {
@Inject @Key("rdeReceiverKey") PGPPublicKey receiverKey;
@Inject @Key("rdeSigningKey") PGPKeyPair signingKey;
@Inject @Key("rdeStagingDecryptionKey") PGPPrivateKey stagingDecryptionKey;
@Inject @Named("rde-report") Queue reportQueue;
@Inject RdeUploadAction() {}
@Override
public void run() {
logger.atInfo().log("Attempting to acquire RDE upload lock for TLD '%s'.", tld);
runner.lockRunAndRollForward(this, Registry.get(tld), timeout, CursorType.RDE_UPLOAD, interval);
taskQueueUtils.enqueue(
reportQueue, withUrl(RdeReportAction.PATH).param(RequestParameters.PARAM_TLD, tld));
HashMultimap<String, String> params = HashMultimap.create();
params.put(RequestParameters.PARAM_TLD, tld);
if (prefix.isPresent()) {
params.put(RdeModule.PARAM_PREFIX, prefix.get());
}
cloudTasksUtils.enqueue(
RDE_REPORT_QUEUE,
CloudTasksUtils.createPostTask(
RdeReportAction.PATH, Service.BACKEND.getServiceId(), params));
}
@Override
@ -164,7 +171,9 @@ public final class RdeUploadAction implements Runnable, EscrowTask {
RdeRevision.getCurrentRevision(tld, watermark, FULL)
.orElseThrow(
() -> new IllegalStateException("RdeRevision was not set on generated deposit"));
final String name = RdeNamingUtils.makeRydeFilename(tld, watermark, FULL, 1, revision);
final String nameWithoutPrefix =
RdeNamingUtils.makeRydeFilename(tld, watermark, FULL, 1, revision);
final String name = prefix.orElse("") + nameWithoutPrefix;
final BlobId xmlFilename = BlobId.of(bucket, name + ".xml.ghostryde");
final BlobId xmlLengthFilename = BlobId.of(bucket, name + ".xml.length");
BlobId reportFilename = BlobId.of(bucket, name + "-report.xml.ghostryde");
@ -174,7 +183,8 @@ public final class RdeUploadAction implements Runnable, EscrowTask {
logger.atInfo().log("Commencing RDE upload for TLD '%s' to '%s'.", tld, uploadUrl);
final long xmlLength = readXmlLength(xmlLengthFilename);
retrier.callWithRetry(
() -> upload(xmlFilename, xmlLength, watermark, name), JSchException.class);
() -> upload(xmlFilename, xmlLength, watermark, name, nameWithoutPrefix),
JSchException.class);
logger.atInfo().log(
"Updating RDE cursor '%s' for TLD '%s' following successful upload.", RDE_UPLOAD_SFTP, tld);
tm().transact(
@ -210,7 +220,8 @@ public final class RdeUploadAction implements Runnable, EscrowTask {
* }</pre>
*/
@VisibleForTesting
protected void upload(BlobId xmlFile, long xmlLength, DateTime watermark, String name)
protected void upload(
BlobId xmlFile, long xmlLength, DateTime watermark, String name, String nameWithoutPrefix)
throws Exception {
logger.atInfo().log("Uploading XML file '%s' to remote path '%s'.", xmlFile, uploadUrl);
try (InputStream gcsInput = gcsUtils.openInputStream(xmlFile);
@ -218,8 +229,8 @@ public final class RdeUploadAction implements Runnable, EscrowTask {
try (JSchSshSession session = jschSshSessionFactory.create(lazyJsch.get(), uploadUrl);
JSchSftpChannel ftpChan = session.openSftpChannel()) {
ByteArrayOutputStream sigOut = new ByteArrayOutputStream();
String rydeFilename = name + ".ryde";
BlobId rydeGcsFilename = BlobId.of(bucket, rydeFilename);
String rydeFilename = nameWithoutPrefix + ".ryde";
BlobId rydeGcsFilename = BlobId.of(bucket, name + ".ryde");
try (OutputStream ftpOutput =
ftpChan.get().put(rydeFilename, sftpProgressMonitor, OVERWRITE);
OutputStream gcsOutput = gcsUtils.openOutputStream(rydeGcsFilename);
@ -228,14 +239,15 @@ public final class RdeUploadAction implements Runnable, EscrowTask {
new RydeEncoder.Builder()
.setRydeOutput(teeOutput, receiverKey)
.setSignatureOutput(sigOut, signingKey)
.setFileMetadata(name, xmlLength, watermark)
.setFileMetadata(nameWithoutPrefix, xmlLength, watermark)
.build()) {
long bytesCopied = ByteStreams.copy(ghostrydeDecoder, rydeEncoder);
logger.atInfo().log("Uploaded %,d bytes to path '%s'.", bytesCopied, rydeFilename);
}
String sigFilename = name + ".sig";
String sigFilename = nameWithoutPrefix + ".sig";
BlobId sigGcsFilename = BlobId.of(bucket, name + ".sig");
byte[] signature = sigOut.toByteArray();
gcsUtils.createFromBytes(BlobId.of(bucket, sigFilename), signature);
gcsUtils.createFromBytes(sigGcsFilename, signature);
ftpChan.get().put(new ByteArrayInputStream(signature), sigFilename);
logger.atInfo().log("Uploaded %,d bytes to path '%s'.", signature.length, sigFilename);
}

View file

@ -30,6 +30,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.flogger.FluentLogger;
import com.google.common.net.MediaType;
import google.registry.config.RegistryConfig.Config;
import google.registry.config.RegistryEnvironment;
import google.registry.model.common.DatabaseMigrationStateSchedule.PrimaryDatabase;
import google.registry.reporting.ReportingModule;
import google.registry.request.Action;
@ -127,7 +128,9 @@ public class GenerateInvoicesAction implements Runnable {
"database",
database.name(),
"billingBucketUrl",
billingBucketUrl));
billingBucketUrl,
"registryEnvironment",
RegistryEnvironment.get().name()));
LaunchFlexTemplateResponse launchResponse =
dataflow
.projects()

View file

@ -6,9 +6,9 @@
"name": "registryEnvironment",
"label": "The Registry environment.",
"helpText": "The Registry environment, required if environment-specific initialization (such as JPA) is needed on worker VMs.",
"is_optional": true,
"is_optional": false,
"regexes": [
"^[0-9A-Z_]+$"
"^PRODUCTION|SANDBOX|CRASH|QA|ALPHA$"
]
},
{

View file

@ -2,12 +2,21 @@
"name": "RDE/BRDA Deposit Generation",
"description": "An Apache Beam pipeline generates RDE or BRDA deposits and deposits them to GCS with GhostRyde encryption.",
"parameters": [
{
"name": "registryEnvironment",
"label": "The Registry environment.",
"helpText": "The Registry environment, required if environment-specific initialization (such as JPA) is needed on worker VMs.",
"is_optional": false,
"regexes": [
"^PRODUCTION|SANDBOX|CRASH|QA|ALPHA$"
]
},
{
"name": "pendings",
"label": "The pendings deposits to generate.",
"helpText": "A TLD to PendingDeposit map that is serialized and Base64 URL-safe encoded.",
"regexes": [
"A-Za-z0-9\\-_"
"[A-Za-z0-9\\-_]+"
]
},
{
@ -19,12 +28,12 @@
]
},
{
"name": "gcsBucket",
"label": "The GCS bucket that where the resulting files will be stored.",
"name": "rdeStagingBucket",
"label": "The GCS bucket that where the resulting files will be stored.",
"helpText": "Only the bucket name itself, without the leading \"gs://\".",
"is_optional": false,
"regexes": [
"^[a-zA-Z0-9_\\-]+$"
"[a-zA-Z0-9_\\-]+$"
]
},
{
@ -32,7 +41,7 @@
"label": "The PGP public key used to encrypt the RDE/BRDA deposit files.",
"helpText": "The key is Base64 URL-safe encoded.",
"regexes": [
"A-Za-z0-9\\-_"
"[A-Za-z0-9\\-_]+"
]
}
]

View file

@ -6,9 +6,9 @@
"name": "registryEnvironment",
"label": "The Registry environment.",
"helpText": "The Registry environment, required if environment-specific initialization (such as JPA) is needed on worker VMs.",
"is_optional": true,
"is_optional": false,
"regexes": [
"^[0-9A-Z_]+$"
"^PRODUCTION|SANDBOX|CRASH|QA|ALPHA$"
]
},
{

View file

@ -40,13 +40,13 @@ public abstract class BeamActionTestBase {
protected Dataflow dataflow = mock(Dataflow.class);
private Projects projects = mock(Projects.class);
private Locations locations = mock(Locations.class);
private FlexTemplates templates = mock(FlexTemplates.class);
protected FlexTemplates templates = mock(FlexTemplates.class);
protected Launch launch = mock(Launch.class);
private LaunchFlexTemplateResponse launchResponse =
new LaunchFlexTemplateResponse().setJob(new Job().setId("jobid"));
@BeforeEach
void beforeEach() throws Exception {
protected void beforeEach() throws Exception {
when(dataflow.projects()).thenReturn(projects);
when(projects.locations()).thenReturn(locations);
when(locations.flexTemplates()).thenReturn(templates);

View file

@ -69,6 +69,8 @@ import google.registry.rde.DepositFragment;
import google.registry.rde.Ghostryde;
import google.registry.rde.PendingDeposit;
import google.registry.rde.RdeResourceType;
import google.registry.testing.CloudTasksHelper;
import google.registry.testing.CloudTasksHelper.TaskMatcher;
import google.registry.testing.DatastoreEntityExtension;
import google.registry.testing.FakeClock;
import google.registry.testing.FakeKeyringModule;
@ -134,6 +136,8 @@ public class RdePipelineTest {
private final GcsUtils gcsUtils = new GcsUtils(LocalStorageHelper.getOptions());
private final CloudTasksHelper cloudTasksHelper = new CloudTasksHelper();
private final PGPPublicKey encryptionKey =
new FakeKeyringModule().get().getRdeStagingEncryptionKey();
@ -212,9 +216,9 @@ public class RdePipelineTest {
options.setValidationMode("LENIENT");
options.setStagingKey(
BaseEncoding.base64Url().encode(PgpHelper.convertPublicKeyToBytes(encryptionKey)));
options.setGcsBucket("gcs-bucket");
options.setRdeStagingBucket("gcs-bucket");
options.setJobName("rde-job");
rdePipeline = new RdePipeline(options, gcsUtils);
rdePipeline = new RdePipeline(options, gcsUtils, cloudTasksHelper.getTestCloudTasksUtils());
}
@AfterEach
@ -314,6 +318,21 @@ public class RdePipelineTest {
assertThat(loadCursorTime(CursorType.RDE_STAGING))
.isEquivalentAccordingToCompareTo(now.plus(Duration.standardDays(1)));
assertThat(loadRevision(now, FULL)).isEqualTo(1);
cloudTasksHelper.assertTasksEnqueued(
"brda",
new TaskMatcher()
.url("/_dr/task/brdaCopy")
.service("backend")
.param("tld", "soy")
.param("watermark", now.toString())
.param("prefix", "rde-job/"));
cloudTasksHelper.assertTasksEnqueued(
"rde-upload",
new TaskMatcher()
.url("/_dr/task/rdeUpload")
.service("backend")
.param("tld", "soy")
.param("prefix", "rde-job/"));
}
// The GCS folder listing can be a bit flaky, so retry if necessary
@ -337,6 +356,7 @@ public class RdePipelineTest {
assertThat(loadCursorTime(CursorType.RDE_STAGING)).isEquivalentAccordingToCompareTo(now);
assertThat(loadRevision(now, FULL)).isEqualTo(0);
cloudTasksHelper.assertNoTasksEnqueued("brda", "rde-upload");
}
private void verifyFiles(

View file

@ -35,26 +35,28 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Optional;
import org.bouncycastle.openpgp.PGPKeyPair;
import org.bouncycastle.openpgp.PGPPrivateKey;
import org.bouncycastle.openpgp.PGPPublicKey;
import org.joda.time.DateTime;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
/** Unit tests for {@link BrdaCopyAction}. */
public class BrdaCopyActionTest {
private static final ByteSource DEPOSIT_XML = RdeTestData.loadBytes("deposit_full.xml");
private static final String STAGE_FILENAME = "lol_2010-10-17_thin_S1_R0";
private static final BlobId RYDE_FILE =
BlobId.of("tub", String.format("%s.ryde", STAGE_FILENAME));
private static final BlobId SIG_FILE = BlobId.of("tub", String.format("%s.sig", STAGE_FILENAME));
private static final BlobId STAGE_FILE =
BlobId.of("keg", "lol_2010-10-17_thin_S1_R0.xml.ghostryde");
private static final BlobId STAGE_LENGTH_FILE =
BlobId.of("keg", "lol_2010-10-17_thin_S1_R0.xml.length");
private static final BlobId RYDE_FILE = BlobId.of("tub", "lol_2010-10-17_thin_S1_R0.ryde");
private static final BlobId SIG_FILE = BlobId.of("tub", "lol_2010-10-17_thin_S1_R0.sig");
private BlobId stageFile;
private BlobId stageLengthFile;
@RegisterExtension
public final BouncyCastleProviderExtension bouncy = new BouncyCastleProviderExtension();
@ -87,6 +89,16 @@ public class BrdaCopyActionTest {
private final GcsUtils gcsUtils = new GcsUtils(LocalStorageHelper.getOptions());
private final BrdaCopyAction action = new BrdaCopyAction();
private void runAction(String prefix) throws IOException {
stageFile = BlobId.of("keg", String.format("%s%s.xml.ghostryde", prefix, STAGE_FILENAME));
stageLengthFile = BlobId.of("keg", String.format("%s%s.xml.length", prefix, STAGE_FILENAME));
byte[] xml = DEPOSIT_XML.read();
gcsUtils.createFromBytes(stageFile, Ghostryde.encode(xml, encryptKey));
gcsUtils.createFromBytes(stageLengthFile, Long.toString(xml.length).getBytes(UTF_8));
action.prefix = prefix.isEmpty() ? Optional.empty() : Optional.of(prefix);
action.run();
}
@BeforeEach
void beforeEach() throws Exception {
action.gcsUtils = gcsUtils;
@ -97,24 +109,22 @@ public class BrdaCopyActionTest {
action.receiverKey = receiverKey;
action.signingKey = signingKey;
action.stagingDecryptionKey = decryptKey;
byte[] xml = DEPOSIT_XML.read();
gcsUtils.createFromBytes(STAGE_FILE, Ghostryde.encode(xml, encryptKey));
gcsUtils.createFromBytes(STAGE_LENGTH_FILE, Long.toString(xml.length).getBytes(UTF_8));
}
@Test
void testRun() {
action.run();
assertThat(gcsUtils.existsAndNotEmpty(STAGE_FILE)).isTrue();
assertThat(gcsUtils.existsAndNotEmpty(RYDE_FILE)).isTrue();
@ParameterizedTest
@ValueSource(strings = {"", "job-name/"})
void testRun(String prefix) throws Exception {
runAction(prefix);
assertThat(gcsUtils.existsAndNotEmpty(stageFile)).isTrue();
assertThat(gcsUtils.existsAndNotEmpty(stageLengthFile)).isTrue();
assertThat(gcsUtils.existsAndNotEmpty(SIG_FILE)).isTrue();
}
@Test
void testRun_rydeFormat() throws Exception {
@ParameterizedTest
@ValueSource(strings = {"", "job-name/"})
void testRun_rydeFormat(String prefix) throws Exception {
assumeTrue(hasCommand("gpg --version"));
action.run();
runAction(prefix);
File rydeTmp = new File(gpg.getCwd(), "ryde");
Files.write(gcsUtils.readBytesFrom(RYDE_FILE), rydeTmp);
@ -158,10 +168,11 @@ public class BrdaCopyActionTest {
.contains("ID 7F9084EE54E1EB0F");
}
@Test
void testRun_rydeSignature() throws Exception {
@ParameterizedTest
@ValueSource(strings = {"", "job-name/"})
void testRun_rydeSignature(String prefix) throws Exception {
assumeTrue(hasCommand("gpg --version"));
action.run();
runAction(prefix);
File rydeTmp = new File(gpg.getCwd(), "ryde");
File sigTmp = new File(gpg.getCwd(), "ryde.sig");

View file

@ -65,6 +65,7 @@ import google.registry.xml.XmlException;
import java.io.ByteArrayInputStream;
import java.net.SocketTimeoutException;
import java.util.Map;
import java.util.Optional;
import org.bouncycastle.openpgp.PGPPublicKey;
import org.joda.time.DateTime;
import org.junit.jupiter.api.BeforeEach;
@ -91,7 +92,8 @@ public class RdeReportActionTest {
private final URLFetchService urlFetchService = mock(URLFetchService.class);
private final ArgumentCaptor<HTTPRequest> request = ArgumentCaptor.forClass(HTTPRequest.class);
private final HTTPResponse httpResponse = mock(HTTPResponse.class);
private final PGPPublicKey encryptKey =
new FakeKeyringModule().get().getRdeStagingEncryptionKey();
private final GcsUtils gcsUtils = new GcsUtils(LocalStorageHelper.getOptions());
private final BlobId reportFile =
BlobId.of("tub", "test_2006-06-06_full_S1_R0-report.xml.ghostryde");
@ -112,12 +114,12 @@ public class RdeReportActionTest {
action.timeout = standardSeconds(30);
action.stagingDecryptionKey = new FakeKeyringModule().get().getRdeStagingDecryptionKey();
action.runner = runner;
action.prefix = Optional.empty();
return action;
}
@BeforeEach
void beforeEach() throws Exception {
PGPPublicKey encryptKey = new FakeKeyringModule().get().getRdeStagingEncryptionKey();
createTld("test");
persistResource(
Cursor.create(RDE_REPORT, DateTime.parse("2006-06-06TZ"), Registry.get("test")));
@ -148,6 +150,37 @@ public class RdeReportActionTest {
assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8);
assertThat(response.getPayload()).isEqualTo("OK test 2006-06-06T00:00:00.000Z\n");
// Verify the HTTP request was correct.
assertThat(request.getValue().getMethod()).isSameInstanceAs(PUT);
assertThat(request.getValue().getURL().getProtocol()).isEqualTo("https");
assertThat(request.getValue().getURL().getPath()).endsWith("/test/20101017001");
Map<String, String> headers = mapifyHeaders(request.getValue().getHeaders());
assertThat(headers).containsEntry("CONTENT_TYPE", "text/xml");
assertThat(headers).containsEntry("AUTHORIZATION", "Basic dGVzdF9yeTpmb28=");
// Verify the payload XML was the same as what's in testdata/report.xml.
XjcRdeReportReport report = parseReport(request.getValue().getPayload());
assertThat(report.getId()).isEqualTo("20101017001");
assertThat(report.getCrDate()).isEqualTo(DateTime.parse("2010-10-17T00:15:00.0Z"));
assertThat(report.getWatermark()).isEqualTo(DateTime.parse("2010-10-17T00:00:00Z"));
}
@TestOfyAndSql
void testRunWithLock_withPrefix() throws Exception {
when(httpResponse.getResponseCode()).thenReturn(SC_OK);
when(httpResponse.getContent()).thenReturn(IIRDEA_GOOD_XML.read());
when(urlFetchService.fetch(request.capture())).thenReturn(httpResponse);
RdeReportAction action = createAction();
action.prefix = Optional.of("job-name/");
gcsUtils.delete(reportFile);
gcsUtils.createFromBytes(
BlobId.of("tub", "job-name/test_2006-06-06_full_S1_R0-report.xml.ghostryde"),
Ghostryde.encode(REPORT_XML.read(), encryptKey));
action.runWithLock(loadRdeReportCursor());
assertThat(response.getStatus()).isEqualTo(200);
assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8);
assertThat(response.getPayload()).isEqualTo("OK test 2006-06-06T00:00:00.000Z\n");
// Verify the HTTP request was correct.
assertThat(request.getValue().getMethod()).isSameInstanceAs(PUT);
assertThat(request.getValue().getURL().getProtocol()).isEqualTo("https");

View file

@ -0,0 +1,263 @@
// Copyright 2017 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.rde;
import static com.google.common.truth.Truth.assertThat;
import static google.registry.testing.DatabaseHelper.createTld;
import static google.registry.testing.DatabaseHelper.persistResource;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import com.google.api.services.dataflow.model.LaunchFlexTemplateRequest;
import com.google.cloud.storage.contrib.nio.testing.LocalStorageHelper;
import com.google.common.collect.ImmutableSet;
import google.registry.beam.BeamActionTestBase;
import google.registry.gcs.GcsUtils;
import google.registry.model.tld.Registry;
import google.registry.request.HttpException.BadRequestException;
import google.registry.testing.AppEngineExtension;
import google.registry.testing.DualDatabaseTest;
import google.registry.testing.FakeClock;
import google.registry.testing.TestSqlOnly;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import org.joda.time.DateTime;
import org.joda.time.DateTimeConstants;
import org.joda.time.Duration;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.RegisterExtension;
/** Unit tests for {@link RdeStagingAction} in Cloud SQL. */
@DualDatabaseTest
public class RdeStagingActionCloudSqlTest extends BeamActionTestBase {
private final FakeClock clock = new FakeClock();
private final GcsUtils gcsUtils = new GcsUtils(LocalStorageHelper.getOptions());
private final RdeStagingAction action = new RdeStagingAction();
@RegisterExtension
public final AppEngineExtension extension =
AppEngineExtension.builder().withClock(clock).withDatastoreAndCloudSql().build();
@BeforeEach
@Override
public void beforeEach() throws Exception {
super.beforeEach();
action.clock = clock;
action.lenient = false;
action.projectId = "projectId";
action.jobRegion = "jobRegion";
action.rdeBucket = "rde-bucket";
action.pendingDepositChecker = new PendingDepositChecker();
action.pendingDepositChecker.brdaDayOfWeek = DateTimeConstants.TUESDAY;
action.pendingDepositChecker.brdaInterval = Duration.standardDays(7);
action.pendingDepositChecker.clock = clock;
action.pendingDepositChecker.rdeInterval = Duration.standardDays(1);
action.gcsUtils = gcsUtils;
action.response = response;
action.transactionCooldown = Duration.ZERO;
action.stagingKeyBytes = "ABCE".getBytes(StandardCharsets.UTF_8);
action.directory = Optional.empty();
action.modeStrings = ImmutableSet.of();
action.tlds = ImmutableSet.of();
action.watermarks = ImmutableSet.of();
action.revision = Optional.empty();
action.dataflow = dataflow;
}
@TestSqlOnly
void testRun_modeInNonManualMode_throwsException() {
createTldWithEscrowEnabled("lol");
clock.setTo(DateTime.parse("2000-01-01TZ"));
action.modeStrings = ImmutableSet.of("full");
assertThrows(BadRequestException.class, action::run);
verifyNoMoreInteractions(dataflow);
}
@TestSqlOnly
void testRun_tldInNonManualMode_throwsException() {
createTldWithEscrowEnabled("lol");
clock.setTo(DateTime.parse("2000-01-01TZ"));
action.tlds = ImmutableSet.of("tld");
assertThrows(BadRequestException.class, action::run);
verifyNoMoreInteractions(dataflow);
}
@TestSqlOnly
void testRun_watermarkInNonManualMode_throwsException() {
createTldWithEscrowEnabled("lol");
clock.setTo(DateTime.parse("2000-01-01TZ"));
action.watermarks = ImmutableSet.of(clock.nowUtc());
assertThrows(BadRequestException.class, action::run);
verifyNoMoreInteractions(dataflow);
}
@TestSqlOnly
void testRun_revisionInNonManualMode_throwsException() {
createTldWithEscrowEnabled("lol");
clock.setTo(DateTime.parse("2000-01-01TZ"));
action.revision = Optional.of(42);
assertThrows(BadRequestException.class, action::run);
verifyNoMoreInteractions(dataflow);
}
@TestSqlOnly
void testRun_noTlds_returns204() {
action.run();
assertThat(response.getStatus()).isEqualTo(204);
verifyNoMoreInteractions(dataflow);
}
@TestSqlOnly
void testRun_tldWithoutEscrowEnabled_returns204() {
createTld("lol");
persistResource(Registry.get("lol").asBuilder().setEscrowEnabled(false).build());
clock.setTo(DateTime.parse("2000-01-01TZ"));
action.run();
assertThat(response.getStatus()).isEqualTo(204);
verifyNoMoreInteractions(dataflow);
}
@TestSqlOnly
void testRun_tldWithEscrowEnabled_launchesPipeline() throws Exception {
createTldWithEscrowEnabled("lol");
clock.setTo(DateTime.parse("2000-01-01TZ"));
action.run();
assertThat(response.getStatus()).isEqualTo(200);
assertThat(response.getPayload()).contains("Launched RDE pipeline: jobid");
verify(templates, times(1))
.launch(eq("projectId"), eq("jobRegion"), any(LaunchFlexTemplateRequest.class));
}
@TestSqlOnly
void testRun_withinTransactionCooldown_getsExcludedAndReturns204() {
createTldWithEscrowEnabled("lol");
clock.setTo(DateTime.parse("2000-01-01T00:04:59Z"));
action.transactionCooldown = Duration.standardMinutes(5);
action.run();
assertThat(response.getStatus()).isEqualTo(204);
verifyNoMoreInteractions(dataflow);
}
@TestSqlOnly
void testRun_afterTransactionCooldown_runsMapReduce() throws Exception {
createTldWithEscrowEnabled("lol");
clock.setTo(DateTime.parse("2000-01-01T00:05:00Z"));
action.transactionCooldown = Duration.standardMinutes(5);
action.run();
assertThat(response.getStatus()).isEqualTo(200);
assertThat(response.getPayload()).contains("Launched RDE pipeline: jobid");
verify(templates, times(1))
.launch(eq("projectId"), eq("jobRegion"), any(LaunchFlexTemplateRequest.class));
}
@TestSqlOnly
void testManualRun_emptyMode_throwsException() {
createTldWithEscrowEnabled("lol");
clock.setTo(DateTime.parse("2000-01-01TZ"));
action.manual = true;
action.directory = Optional.of("test/");
action.modeStrings = ImmutableSet.of();
action.tlds = ImmutableSet.of("lol");
action.watermarks = ImmutableSet.of(clock.nowUtc());
assertThrows(BadRequestException.class, action::run);
}
@TestSqlOnly
void testManualRun_invalidMode_throwsException() {
createTldWithEscrowEnabled("lol");
clock.setTo(DateTime.parse("2000-01-01TZ"));
action.manual = true;
action.directory = Optional.of("test/");
action.modeStrings = ImmutableSet.of("full", "thing");
action.tlds = ImmutableSet.of("lol");
action.watermarks = ImmutableSet.of(clock.nowUtc());
assertThrows(BadRequestException.class, action::run);
}
@TestSqlOnly
void testManualRun_emptyTld_throwsException() {
createTldWithEscrowEnabled("lol");
clock.setTo(DateTime.parse("2000-01-01TZ"));
action.manual = true;
action.directory = Optional.of("test/");
action.modeStrings = ImmutableSet.of("full");
action.tlds = ImmutableSet.of();
action.watermarks = ImmutableSet.of(clock.nowUtc());
assertThrows(BadRequestException.class, action::run);
}
@TestSqlOnly
void testManualRun_emptyWatermark_throwsException() {
createTldWithEscrowEnabled("lol");
clock.setTo(DateTime.parse("2000-01-01TZ"));
action.manual = true;
action.directory = Optional.of("test/");
action.modeStrings = ImmutableSet.of("full");
action.tlds = ImmutableSet.of("lol");
action.watermarks = ImmutableSet.of();
assertThrows(BadRequestException.class, action::run);
}
@TestSqlOnly
void testManualRun_nonDayStartWatermark_throwsException() {
createTldWithEscrowEnabled("lol");
clock.setTo(DateTime.parse("2000-01-01TZ"));
action.manual = true;
action.directory = Optional.of("test/");
action.modeStrings = ImmutableSet.of("full");
action.tlds = ImmutableSet.of("lol");
action.watermarks = ImmutableSet.of(DateTime.parse("2001-01-01T01:36:45Z"));
assertThrows(BadRequestException.class, action::run);
}
@TestSqlOnly
void testManualRun_invalidRevision_throwsException() {
createTldWithEscrowEnabled("lol");
clock.setTo(DateTime.parse("2000-01-01TZ"));
action.manual = true;
action.directory = Optional.of("test/");
action.modeStrings = ImmutableSet.of("full");
action.tlds = ImmutableSet.of("lol");
action.watermarks = ImmutableSet.of(DateTime.parse("2001-01-01T00:00:00Z"));
action.revision = Optional.of(-1);
assertThrows(BadRequestException.class, action::run);
}
@TestSqlOnly
void testManualRun_validParameters_runsMapReduce() throws Exception {
createTldWithEscrowEnabled("lol");
clock.setTo(DateTime.parse("2000-01-01TZ"));
action.manual = true;
action.directory = Optional.of("test/");
action.modeStrings = ImmutableSet.of("full");
action.tlds = ImmutableSet.of("lol");
action.watermarks = ImmutableSet.of(DateTime.parse("2001-01-01TZ"));
action.run();
assertThat(response.getStatus()).isEqualTo(200);
assertThat(response.getPayload()).contains("Launched RDE pipeline: jobid");
verify(templates, times(1))
.launch(eq("projectId"), eq("jobRegion"), any(LaunchFlexTemplateRequest.class));
}
private static void createTldWithEscrowEnabled(final String tld) {
createTld(tld);
persistResource(Registry.get(tld).asBuilder().setEscrowEnabled(true).build());
}
}

View file

@ -90,8 +90,8 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
/** Unit tests for {@link RdeStagingAction}. */
public class RdeStagingActionTest extends MapreduceTestCase<RdeStagingAction> {
/** Unit tests for {@link RdeStagingAction} in Datastore. */
public class RdeStagingActionDatastoreTest extends MapreduceTestCase<RdeStagingAction> {
private static final BlobId XML_FILE =
BlobId.of("rde-bucket", "lol_2000-01-01_full_S1_R0.xml.ghostryde");

View file

@ -27,7 +27,8 @@ import org.junit.runner.RunWith;
GhostrydeGpgIntegrationTest.class,
GhostrydeTest.class,
HostResourceToXjcConverterTest.class,
RdeStagingActionTest.class,
RdeStagingActionDatastoreTest.class,
RdeStagingActionCloudSqlTest.class,
RdeUploadActionTest.class,
RdeReportActionTest.class,
RegistrarToXjcConverterTest.class,

View file

@ -25,8 +25,6 @@ import static google.registry.testing.DatabaseHelper.createTld;
import static google.registry.testing.DatabaseHelper.persistResource;
import static google.registry.testing.DatabaseHelper.persistSimpleResource;
import static google.registry.testing.SystemInfo.hasCommand;
import static google.registry.testing.TaskQueueHelper.assertNoTasksEnqueued;
import static google.registry.testing.TaskQueueHelper.assertTasksEnqueued;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.joda.time.Duration.standardDays;
import static org.joda.time.Duration.standardHours;
@ -41,7 +39,6 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import com.google.appengine.api.taskqueue.QueueFactory;
import com.google.appengine.api.utils.SystemProperty;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.contrib.nio.testing.LocalStorageHelper;
@ -62,6 +59,8 @@ import google.registry.request.HttpException.NoContentException;
import google.registry.request.RequestParameters;
import google.registry.testing.AppEngineExtension;
import google.registry.testing.BouncyCastleProviderExtension;
import google.registry.testing.CloudTasksHelper;
import google.registry.testing.CloudTasksHelper.TaskMatcher;
import google.registry.testing.DualDatabaseTest;
import google.registry.testing.FakeClock;
import google.registry.testing.FakeKeyringModule;
@ -69,17 +68,16 @@ import google.registry.testing.FakeResponse;
import google.registry.testing.FakeSleeper;
import google.registry.testing.GpgSystemCommandExtension;
import google.registry.testing.Lazies;
import google.registry.testing.TaskQueueHelper.TaskMatcher;
import google.registry.testing.TestOfyAndSql;
import google.registry.testing.sftp.SftpServerExtension;
import google.registry.util.Retrier;
import google.registry.util.TaskQueueUtils;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.Socket;
import java.net.URI;
import java.util.Optional;
import org.bouncycastle.openpgp.PGPPublicKey;
import org.joda.time.DateTime;
import org.junit.jupiter.api.BeforeEach;
@ -100,6 +98,12 @@ public class RdeUploadActionTest {
BlobId.of("bucket", "tld_2010-10-17_full_S1_R0.xml.length");
private static final BlobId REPORT_FILE =
BlobId.of("bucket", "tld_2010-10-17_full_S1_R0-report.xml.ghostryde");
private static final BlobId GHOSTRYDE_FILE_WITH_PREFIX =
BlobId.of("bucket", "job-name/tld_2010-10-17_full_S1_R0.xml.ghostryde");
private static final BlobId LENGTH_FILE_WITH_PREFIX =
BlobId.of("bucket", "job-name/tld_2010-10-17_full_S1_R0.xml.length");
private static final BlobId REPORT_FILE_WITH_PREFIX =
BlobId.of("bucket", "job-name/tld_2010-10-17_full_S1_R0-report.xml.ghostryde");
private static final BlobId GHOSTRYDE_R1_FILE =
BlobId.of("bucket", "tld_2010-10-17_full_S1_R1.xml.ghostryde");
@ -109,6 +113,7 @@ public class RdeUploadActionTest {
BlobId.of("bucket", "tld_2010-10-17_full_S1_R1-report.xml.ghostryde");
private final GcsUtils gcsUtils = new GcsUtils(LocalStorageHelper.getOptions());
private final CloudTasksHelper cloudTasksHelper = new CloudTasksHelper();
@RegisterExtension final SftpServerExtension sftpd = new SftpServerExtension();
@ -129,6 +134,8 @@ public class RdeUploadActionTest {
public final AppEngineExtension appEngine =
AppEngineExtension.builder().withDatastoreAndCloudSql().withTaskQueue().build();
private final PGPPublicKey encryptKey =
new FakeKeyringModule().get().getRdeStagingEncryptionKey();
private final FakeResponse response = new FakeResponse();
private final EscrowTaskRunner runner = mock(EscrowTaskRunner.class);
private final FakeClock clock = new FakeClock(DateTime.parse("2010-10-17TZ"));
@ -155,10 +162,10 @@ public class RdeUploadActionTest {
action.receiverKey = keyring.getRdeReceiverKey();
action.signingKey = keyring.getRdeSigningKey();
action.stagingDecryptionKey = keyring.getRdeStagingDecryptionKey();
action.reportQueue = QueueFactory.getQueue("rde-report");
action.runner = runner;
action.taskQueueUtils = new TaskQueueUtils(new Retrier(null, 1));
action.cloudTasksUtils = cloudTasksHelper.getTestCloudTasksUtils();
action.retrier = new Retrier(new FakeSleeper(clock), 3);
action.prefix = Optional.empty();
return action;
}
}
@ -181,15 +188,13 @@ public class RdeUploadActionTest {
SystemProperty.environment.set(SystemProperty.Environment.Value.Development);
createTld("tld");
PGPPublicKey encryptKey = new FakeKeyringModule().get().getRdeStagingEncryptionKey();
gcsUtils.createFromBytes(GHOSTRYDE_FILE, Ghostryde.encode(DEPOSIT_XML.read(), encryptKey));
gcsUtils.createFromBytes(GHOSTRYDE_R1_FILE, Ghostryde.encode(DEPOSIT_XML.read(), encryptKey));
gcsUtils.createFromBytes(LENGTH_FILE, Long.toString(DEPOSIT_XML.size()).getBytes(UTF_8));
gcsUtils.createFromBytes(LENGTH_R1_FILE, Long.toString(DEPOSIT_XML.size()).getBytes(UTF_8));
gcsUtils.createFromBytes(REPORT_FILE, Ghostryde.encode(REPORT_XML.read(), encryptKey));
gcsUtils.createFromBytes(REPORT_R1_FILE, Ghostryde.encode(REPORT_XML.read(), encryptKey));
tm()
.transact(
tm().transact(
() -> {
RdeRevision.saveRevision("lol", DateTime.parse("2010-10-17TZ"), FULL, 0);
RdeRevision.saveRevision("tld", DateTime.parse("2010-10-17TZ"), FULL, 0);
@ -210,11 +215,48 @@ public class RdeUploadActionTest {
RdeUploadAction action = createAction(null);
action.tld = "lol";
action.run();
verify(runner).lockRunAndRollForward(
action, Registry.get("lol"), standardSeconds(23), CursorType.RDE_UPLOAD, standardDays(1));
assertTasksEnqueued("rde-report", new TaskMatcher()
.url(RdeReportAction.PATH)
.param(RequestParameters.PARAM_TLD, "lol"));
verify(runner)
.lockRunAndRollForward(
action,
Registry.get("lol"),
standardSeconds(23),
CursorType.RDE_UPLOAD,
standardDays(1));
cloudTasksHelper.assertTasksEnqueued(
"rde-report",
new TaskMatcher().url(RdeReportAction.PATH).param(RequestParameters.PARAM_TLD, "lol"));
verifyNoMoreInteractions(runner);
}
@TestOfyAndSql
void testRun_withPrefix() throws Exception {
createTld("lol");
RdeUploadAction action = createAction(null);
action.prefix = Optional.of("job-name/");
action.tld = "lol";
gcsUtils.delete(GHOSTRYDE_FILE);
gcsUtils.createFromBytes(
GHOSTRYDE_FILE_WITH_PREFIX, Ghostryde.encode(DEPOSIT_XML.read(), encryptKey));
gcsUtils.delete(LENGTH_FILE);
gcsUtils.createFromBytes(
LENGTH_FILE_WITH_PREFIX, Long.toString(DEPOSIT_XML.size()).getBytes(UTF_8));
gcsUtils.delete(REPORT_FILE);
gcsUtils.createFromBytes(
REPORT_FILE_WITH_PREFIX, Ghostryde.encode(REPORT_XML.read(), encryptKey));
action.run();
verify(runner)
.lockRunAndRollForward(
action,
Registry.get("lol"),
standardSeconds(23),
CursorType.RDE_UPLOAD,
standardDays(1));
cloudTasksHelper.assertTasksEnqueued(
"rde-report",
new TaskMatcher()
.url(RdeReportAction.PATH)
.param(RequestParameters.PARAM_TLD, "lol")
.param(RdeModule.PARAM_PREFIX, "job-name/"));
verifyNoMoreInteractions(runner);
}
@ -231,7 +273,7 @@ public class RdeUploadActionTest {
assertThat(response.getStatus()).isEqualTo(200);
assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8);
assertThat(response.getPayload()).isEqualTo("OK tld 2010-10-17T00:00:00.000Z\n");
assertNoTasksEnqueued("rde-upload");
cloudTasksHelper.assertNoTasksEnqueued("rde-upload");
assertThat(folder.list())
.asList()
.containsExactly("tld_2010-10-17_full_S1_R0.ryde", "tld_2010-10-17_full_S1_R0.sig");
@ -262,7 +304,7 @@ public class RdeUploadActionTest {
assertThat(response.getStatus()).isEqualTo(200);
assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8);
assertThat(response.getPayload()).isEqualTo("OK tld 2010-10-17T00:00:00.000Z\n");
assertNoTasksEnqueued("rde-upload");
cloudTasksHelper.assertNoTasksEnqueued("rde-upload");
// Assert that both files are written to SFTP and GCS, and that the contents are identical.
String rydeFilename = "tld_2010-10-17_full_S1_R0.ryde";
String sigFilename = "tld_2010-10-17_full_S1_R0.sig";
@ -273,6 +315,41 @@ public class RdeUploadActionTest {
.isEqualTo(Files.toByteArray(new File(folder, sigFilename)));
}
@TestOfyAndSql
void testRunWithLock_copiesOnGcs_withPrefix() throws Exception {
int port = sftpd.serve("user", "password", folder);
URI uploadUrl = URI.create(String.format("sftp://user:password@localhost:%d/", port));
DateTime stagingCursor = DateTime.parse("2010-10-18TZ");
DateTime uploadCursor = DateTime.parse("2010-10-17TZ");
persistResource(Cursor.create(RDE_STAGING, stagingCursor, Registry.get("tld")));
RdeUploadAction action = createAction(uploadUrl);
action.prefix = Optional.of("job-name/");
gcsUtils.delete(GHOSTRYDE_FILE);
gcsUtils.createFromBytes(
GHOSTRYDE_FILE_WITH_PREFIX, Ghostryde.encode(DEPOSIT_XML.read(), encryptKey));
gcsUtils.delete(LENGTH_FILE);
gcsUtils.createFromBytes(
LENGTH_FILE_WITH_PREFIX, Long.toString(DEPOSIT_XML.size()).getBytes(UTF_8));
gcsUtils.delete(REPORT_FILE);
gcsUtils.createFromBytes(
REPORT_FILE_WITH_PREFIX, Ghostryde.encode(REPORT_XML.read(), encryptKey));
action.runWithLock(uploadCursor);
assertThat(response.getStatus()).isEqualTo(200);
assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8);
assertThat(response.getPayload()).isEqualTo("OK tld 2010-10-17T00:00:00.000Z\n");
cloudTasksHelper.assertNoTasksEnqueued("rde-upload");
// Assert that both files are written to SFTP and GCS, and that the contents are identical.
String rydeFilename = "tld_2010-10-17_full_S1_R0.ryde";
String rydeGcsFilename = "job-name/tld_2010-10-17_full_S1_R0.ryde";
String sigFilename = "tld_2010-10-17_full_S1_R0.sig";
String sigGcsFilename = "job-name/tld_2010-10-17_full_S1_R0.sig";
assertThat(folder.list()).asList().containsExactly(rydeFilename, sigFilename);
assertThat(gcsUtils.readBytesFrom(BlobId.of("bucket", rydeGcsFilename)))
.isEqualTo(Files.toByteArray(new File(folder, rydeFilename)));
assertThat(gcsUtils.readBytesFrom(BlobId.of("bucket", sigGcsFilename)))
.isEqualTo(Files.toByteArray(new File(folder, sigFilename)));
}
@TestOfyAndSql
void testRunWithLock_resend() throws Exception {
tm().transact(() -> RdeRevision.saveRevision("tld", DateTime.parse("2010-10-17TZ"), FULL, 1));
@ -285,7 +362,7 @@ public class RdeUploadActionTest {
assertThat(response.getStatus()).isEqualTo(200);
assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8);
assertThat(response.getPayload()).isEqualTo("OK tld 2010-10-17T00:00:00.000Z\n");
assertNoTasksEnqueued("rde-upload");
cloudTasksHelper.assertNoTasksEnqueued("rde-upload");
assertThat(folder.list())
.asList()
.containsExactly("tld_2010-10-17_full_S1_R1.ryde", "tld_2010-10-17_full_S1_R1.sig");
@ -327,7 +404,7 @@ public class RdeUploadActionTest {
.isEqualTo(
"Waiting on RdeStagingAction for TLD tld to send 2010-10-17T00:00:00.000Z upload; last"
+ " RDE staging completion was at 1970-01-01T00:00:00.000Z");
assertNoTasksEnqueued("rde-upload");
cloudTasksHelper.assertNoTasksEnqueued("rde-upload");
assertThat(folder.list()).isEmpty();
}

View file

@ -23,13 +23,8 @@ import static com.google.common.truth.Truth.assertWithMessage;
import static google.registry.util.DiffUtils.prettyPrintEntityDeepDiff;
import static java.util.Arrays.asList;
import static java.util.stream.Collectors.joining;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.google.cloud.tasks.v2.CloudTasksClient;
import com.google.cloud.tasks.v2.HttpMethod;
import com.google.cloud.tasks.v2.QueueName;
import com.google.cloud.tasks.v2.Task;
import com.google.common.base.Ascii;
import com.google.common.base.Joiner;
@ -37,16 +32,20 @@ import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableMultiset;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.google.common.net.HttpHeaders;
import com.google.common.net.MediaType;
import com.google.common.truth.Truth8;
import google.registry.model.ImmutableObject;
import google.registry.util.CloudTasksUtils;
import google.registry.util.Retrier;
import java.io.Serializable;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@ -54,6 +53,9 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Predicate;
import javax.annotation.Nonnull;
@ -65,30 +67,37 @@ import javax.annotation.Nonnull;
* helper methods because we have not yet encountered all the use cases with Cloud Tasks. As more
* and more Task Queue API usage is migrated to Cloud Tasks we may replicate more methods from the
* latter.
*
* <p>Note the use of {@link AtomicInteger} {@code nextInstanceId} here. When a {@link
* FakeCloudTasksClient} instance, and by extension the {@link CloudTasksHelper} instance that
* contains it is serialized/deserialized, as happens in a Beam pipeline, we to want to push tasks
* to the same test task container that the original instance pushes to, so that we can make
* assertions on them by accessing the original instance. We cannot make the test task container
* itself static because we do not want tasks enqueued in previous tests to interfere with latter
* tests, when they run on the same JVM (and therefore share the same static class members). To
* solve this we put the test container in a static map whose keys are the instance IDs. An
* explicitly created new {@link CloudTasksHelper} (as would be created for a new test method) would
* have a new ID allocated to it, and therefore stores its tasks in a distinct container. A
* deserialized {@link CloudTasksHelper}, on the other hand, will have the same instance ID and
* share the same test class container with its progenitor.
*/
public class CloudTasksHelper {
public class CloudTasksHelper implements Serializable {
private static final long serialVersionUID = -8949359648199614677L;
private static final AtomicInteger nextInstanceId = new AtomicInteger(0);
protected static ConcurrentMap<Integer, ListMultimap<String, Task>> testTasks =
new ConcurrentHashMap<>();
private static final String PROJECT_ID = "test-project";
private static final String LOCATION_ID = "test-location";
private final Retrier retrier = new Retrier(new FakeSleeper(new FakeClock()), 1);
private final LinkedListMultimap<String, Task> testTasks = LinkedListMultimap.create();
private final CloudTasksClient mockClient = mock(CloudTasksClient.class);
private final int instanceId = nextInstanceId.getAndIncrement();
private final CloudTasksUtils cloudTasksUtils =
new CloudTasksUtils(retrier, PROJECT_ID, LOCATION_ID, () -> mockClient);
new CloudTasksUtils(retrier, PROJECT_ID, LOCATION_ID, new FakeCloudTasksClient());
public CloudTasksHelper() {
when(mockClient.createTask(any(QueueName.class), any(Task.class)))
.thenAnswer(
invocation -> {
QueueName queue = invocation.getArgument(0);
Task task = invocation.getArgument(1);
if (task.getName().isEmpty()) {
task = task.toBuilder().setName(String.format("test-%d", testTasks.size())).build();
}
testTasks.put(queue.getQueue(), task);
return task;
});
testTasks.put(instanceId, Multimaps.synchronizedListMultimap(LinkedListMultimap.create()));
}
public CloudTasksUtils getTestCloudTasksUtils() {
@ -96,7 +105,7 @@ public class CloudTasksHelper {
}
public List<Task> getTestTasksFor(String queue) {
return testTasks.get(queue);
return new ArrayList<>(testTasks.get(instanceId).get(queue));
}
/**
@ -160,6 +169,20 @@ public class CloudTasksHelper {
}
}
private class FakeCloudTasksClient extends CloudTasksUtils.SerializableCloudTasksClient {
private static final long serialVersionUID = 6661964844791720639L;
@Override
public Task enqueue(String projectId, String locationId, String queueName, Task task) {
if (task.getName().isEmpty()) {
task = task.toBuilder().setName(String.format("test-%d", testTasks.size())).build();
}
testTasks.get(instanceId).put(queueName, task);
return task;
}
}
/** An adapter to clean up a {@link Task} for ease of matching. */
private static class MatchableTask extends ImmutableObject {

View file

@ -37,7 +37,7 @@ import com.google.protobuf.ByteString;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import javax.inject.Provider;
import java.util.function.Supplier;
/** Utilities for dealing with Cloud Tasks. */
public class CloudTasksUtils implements Serializable {
@ -48,17 +48,14 @@ public class CloudTasksUtils implements Serializable {
private final Retrier retrier;
private final String projectId;
private final String locationId;
private final Provider<CloudTasksClient> clientProvider;
private final SerializableCloudTasksClient client;
public CloudTasksUtils(
Retrier retrier,
String projectId,
String locationId,
Provider<CloudTasksClient> clientProvider) {
Retrier retrier, String projectId, String locationId, SerializableCloudTasksClient client) {
this.retrier = retrier;
this.projectId = projectId;
this.locationId = locationId;
this.clientProvider = clientProvider;
this.client = client;
}
public Task enqueue(String queue, Task task) {
@ -69,9 +66,7 @@ public class CloudTasksUtils implements Serializable {
queue,
task.getAppEngineHttpRequest().getRelativeUri(),
task.getAppEngineHttpRequest().getAppEngineRouting().getService());
try (CloudTasksClient client = clientProvider.get()) {
return client.createTask(QueueName.of(projectId, locationId, queue), task);
}
return client.enqueue(projectId, locationId, queue, task);
},
ApiException.class);
}
@ -141,4 +136,28 @@ public class CloudTasksUtils implements Serializable {
public static Task createGetTask(String path, String service, Multimap<String, String> params) {
return createTask(path, HttpMethod.GET, service, params);
}
public abstract static class SerializableCloudTasksClient implements Serializable {
public abstract Task enqueue(String projectId, String locationId, String queueName, Task task);
}
public static class GcpCloudTasksClient extends SerializableCloudTasksClient {
private static final long serialVersionUID = -5959253033129154037L;
// Use a supplier so that we can use try-with-resources with the client, which implements
// Autocloseable.
private final Supplier<CloudTasksClient> clientSupplier;
public GcpCloudTasksClient(Supplier<CloudTasksClient> clientSupplier) {
this.clientSupplier = clientSupplier;
}
@Override
public Task enqueue(String projectId, String locationId, String queueName, Task task) {
try (CloudTasksClient client = clientSupplier.get()) {
return client.createTask(QueueName.of(projectId, locationId, queueName), task);
}
}
}
}

View file

@ -22,25 +22,23 @@ import com.google.api.client.http.HttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.auth.http.HttpCredentialsAdapter;
import com.google.auth.oauth2.GoogleCredentials;
import java.io.Serializable;
/**
* Helper class to provide {@link HttpTransport}, {@link JsonFactory} and {@link
* HttpRequestInitializer} for a given {@link GoogleCredentials}. These classes are normally needed
* for creating the instance of a GCP client.
*/
public class GoogleCredentialsBundle {
public class GoogleCredentialsBundle implements Serializable {
private static final HttpTransport HTTP_TRANSPORT = Utils.getDefaultTransport();
private static final JsonFactory JSON_FACTORY = Utils.getDefaultJsonFactory();
private GoogleCredentials googleCredentials;
private HttpTransport httpTransport;
private JsonFactory jsonFactory;
private HttpRequestInitializer httpRequestInitializer;
private GoogleCredentialsBundle(GoogleCredentials googleCredentials) {
checkNotNull(googleCredentials);
this.googleCredentials = googleCredentials;
this.httpTransport = Utils.getDefaultTransport();
this.jsonFactory = Utils.getDefaultJsonFactory();
this.httpRequestInitializer = new HttpCredentialsAdapter(googleCredentials);
}
/** Creates a {@link GoogleCredentialsBundle} instance from given {@link GoogleCredentials}. */
@ -55,16 +53,16 @@ public class GoogleCredentialsBundle {
/** Returns the instance of {@link HttpTransport}. */
public HttpTransport getHttpTransport() {
return httpTransport;
return HTTP_TRANSPORT;
}
/** Returns the instance of {@link JsonFactory}. */
public JsonFactory getJsonFactory() {
return jsonFactory;
return JSON_FACTORY;
}
/** Returns the instance of {@link HttpRequestInitializer}. */
public HttpRequestInitializer getHttpRequestInitializer() {
return httpRequestInitializer;
return new HttpCredentialsAdapter(googleCredentials);
}
}

View file

@ -0,0 +1,82 @@
// Copyright 2020 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.util;
import com.google.apphosting.api.ApiProxy.Environment;
import com.google.common.collect.ImmutableMap;
import java.util.Map;
/** A placeholder GAE environment class that is used when masquerading a thread as a GAE thread. */
public final class PlaceholderEnvironment implements Environment {
private static final PlaceholderEnvironment INSTANCE = new PlaceholderEnvironment();
public static PlaceholderEnvironment get() {
return INSTANCE;
}
private PlaceholderEnvironment() {}
@Override
public String getAppId() {
return "PlaceholderAppId";
}
@Override
public Map<String, Object> getAttributes() {
return ImmutableMap.of();
}
@Override
public String getModuleId() {
throw new UnsupportedOperationException();
}
@Override
public String getVersionId() {
throw new UnsupportedOperationException();
}
@Override
public String getEmail() {
throw new UnsupportedOperationException();
}
@Override
public boolean isLoggedIn() {
throw new UnsupportedOperationException();
}
@Override
public boolean isAdmin() {
throw new UnsupportedOperationException();
}
@Override
public String getAuthDomain() {
throw new UnsupportedOperationException();
}
@SuppressWarnings("deprecation")
@Override
public String getRequestNamespace() {
throw new UnsupportedOperationException();
}
@Override
public long getRemainingMillis() {
throw new UnsupportedOperationException();
}
}

View file

@ -17,18 +17,18 @@ package google.registry.util;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.google.cloud.tasks.v2.CloudTasksClient;
import com.google.cloud.tasks.v2.HttpMethod;
import com.google.cloud.tasks.v2.QueueName;
import com.google.cloud.tasks.v2.Task;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.LinkedListMultimap;
import google.registry.testing.FakeClock;
import google.registry.testing.FakeSleeper;
import google.registry.util.CloudTasksUtils.SerializableCloudTasksClient;
import java.nio.charset.StandardCharsets;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -37,21 +37,18 @@ import org.junit.jupiter.api.Test;
public class CloudTasksUtilsTest {
// Use a LinkedListMultimap to preserve order of the inserted entries for assertion.
private final LinkedListMultimap<String, String> params = LinkedListMultimap.create();
private final CloudTasksClient mockClient = mock(CloudTasksClient.class);
private final SerializableCloudTasksClient mockClient = mock(SerializableCloudTasksClient.class);
private final CloudTasksUtils cloudTasksUtils =
new CloudTasksUtils(
new Retrier(new FakeSleeper(new FakeClock()), 1),
"project",
"location",
() -> mockClient);
new Retrier(new FakeSleeper(new FakeClock()), 1), "project", "location", mockClient);
@BeforeEach
void beforeEach() {
params.put("key1", "val1");
params.put("key2", "val2");
params.put("key1", "val3");
when(mockClient.createTask(any(QueueName.class), any(Task.class)))
.thenAnswer(invocation -> invocation.getArgument(1));
when(mockClient.enqueue(anyString(), anyString(), anyString(), any(Task.class)))
.thenAnswer(invocation -> invocation.getArgument(3));
}
@Test
@ -94,7 +91,7 @@ public class CloudTasksUtilsTest {
void testSuccess_enqueueTask() {
Task task = CloudTasksUtils.createGetTask("/the/path", "myservice", params);
cloudTasksUtils.enqueue("test-queue", task);
verify(mockClient).createTask(QueueName.of("project", "location", "test-queue"), task);
verify(mockClient).enqueue("project", "location", "test-queue", task);
}
@Test
@ -102,8 +99,8 @@ public class CloudTasksUtilsTest {
Task task1 = CloudTasksUtils.createGetTask("/the/path", "myservice", params);
Task task2 = CloudTasksUtils.createGetTask("/other/path", "yourservice", params);
cloudTasksUtils.enqueue("test-queue", task1, task2);
verify(mockClient).createTask(QueueName.of("project", "location", "test-queue"), task1);
verify(mockClient).createTask(QueueName.of("project", "location", "test-queue"), task2);
verify(mockClient).enqueue("project", "location", "test-queue", task1);
verify(mockClient).enqueue("project", "location", "test-queue", task2);
}
@Test
@ -111,7 +108,7 @@ public class CloudTasksUtilsTest {
Task task1 = CloudTasksUtils.createGetTask("/the/path", "myservice", params);
Task task2 = CloudTasksUtils.createGetTask("/other/path", "yourservice", params);
cloudTasksUtils.enqueue("test-queue", ImmutableList.of(task1, task2));
verify(mockClient).createTask(QueueName.of("project", "location", "test-queue"), task1);
verify(mockClient).createTask(QueueName.of("project", "location", "test-queue"), task2);
verify(mockClient).enqueue("project", "location", "test-queue", task1);
verify(mockClient).enqueue("project", "location", "test-queue", task2);
}
}