From 917b34701f08a82bd0ecbf69081ab5d5d48cbf0f Mon Sep 17 00:00:00 2001 From: Lai Jiang Date: Fri, 30 Jul 2021 16:24:58 -0400 Subject: [PATCH] Write RDE files and advance cursors in Beam pipeline (#1249) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR re-implements most of the logic in the RdeStagingReducer, with the exception of the last enqueue operations, due to the fact that the task queue API is not available outside of App Engine SDK. This part will come in a separate PR. Another deviation from the reducer is that we forwent the lock -- it is difficult do it across different beam transforms. Instead we write each report to a different folder according to its unique beam job name. When enqueueing the publish tasks we will then pass the folder prefix as a URL parameter. --- This change is [Reviewable](https://reviewable.io/reviews/google/nomulus/1249) --- .../java/google/registry/beam/rde/RdeIO.java | 273 ++++++++++++++++++ .../google/registry/beam/rde/RdePipeline.java | 97 +++++-- .../registry/beam/rde/RdePipelineOptions.java | 10 + .../registry/model/rde/RdeRevision.java | 4 +- .../google/registry/rde/PendingDeposit.java | 2 +- .../registry/beam/rde_pipeline_metadata.json | 18 +- .../registry/beam/rde/RdePipelineTest.java | 218 ++++++++++++-- .../registry/rde/RdeStagingReducerTest.java | 15 +- .../registry/{ => beam}/rde/reducer_brda.xml | 0 .../registry/{ => beam}/rde/reducer_rde.xml | 0 .../{ => beam}/rde/reducer_rde_report.xml | 0 11 files changed, 569 insertions(+), 68 deletions(-) create mode 100644 core/src/main/java/google/registry/beam/rde/RdeIO.java rename core/src/test/resources/google/registry/{ => beam}/rde/reducer_brda.xml (100%) rename core/src/test/resources/google/registry/{ => beam}/rde/reducer_rde.xml (100%) rename core/src/test/resources/google/registry/{ => beam}/rde/reducer_rde_report.xml (100%) diff --git a/core/src/main/java/google/registry/beam/rde/RdeIO.java b/core/src/main/java/google/registry/beam/rde/RdeIO.java new file mode 100644 index 000000000..a17c7f3e1 --- /dev/null +++ b/core/src/main/java/google/registry/beam/rde/RdeIO.java @@ -0,0 +1,273 @@ +// Copyright 2021 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam.rde; + +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Verify.verify; +import static google.registry.model.common.Cursor.getCursorTimeOrStartOfTime; +import static google.registry.persistence.transaction.TransactionManagerFactory.tm; +import static google.registry.persistence.transaction.TransactionManagerUtil.transactIfJpaTm; +import static java.nio.charset.StandardCharsets.UTF_8; + +import com.google.auto.value.AutoValue; +import com.google.cloud.storage.BlobId; +import com.google.common.flogger.FluentLogger; +import google.registry.gcs.GcsUtils; +import google.registry.keyring.api.PgpHelper; +import google.registry.model.common.Cursor; +import google.registry.model.rde.RdeMode; +import google.registry.model.rde.RdeNamingUtils; +import google.registry.model.rde.RdeRevision; +import google.registry.model.registry.Registry; +import google.registry.rde.DepositFragment; +import google.registry.rde.Ghostryde; +import google.registry.rde.PendingDeposit; +import google.registry.rde.RdeCounter; +import google.registry.rde.RdeMarshaller; +import google.registry.rde.RdeResourceType; +import google.registry.rde.RdeUtil; +import google.registry.tldconfig.idn.IdnTableEnum; +import google.registry.xjc.rdeheader.XjcRdeHeader; +import google.registry.xjc.rdeheader.XjcRdeHeaderElement; +import google.registry.xml.ValidationMode; +import google.registry.xml.XmlException; +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.security.Security; +import java.util.Optional; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.bouncycastle.jce.provider.BouncyCastleProvider; +import org.bouncycastle.openpgp.PGPPublicKey; +import org.joda.time.DateTime; + +public class RdeIO { + + @AutoValue + abstract static class Write + extends PTransform>>, PDone> { + + abstract GcsUtils gcsUtils(); + + abstract String rdeBucket(); + + // It's OK to return a primitive array because we are only using it to construct the + // PGPPublicKey, which is not serializable. + @SuppressWarnings("mutable") + abstract byte[] stagingKeyBytes(); + + abstract ValidationMode validationMode(); + + static Builder builder() { + return new AutoValue_RdeIO_Write.Builder(); + } + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setGcsUtils(GcsUtils gcsUtils); + + abstract Builder setRdeBucket(String value); + + abstract Builder setStagingKeyBytes(byte[] value); + + abstract Builder setValidationMode(ValidationMode value); + + abstract Write build(); + } + + @Override + public PDone expand(PCollection>> input) { + input + .apply( + "Write to GCS", + ParDo.of(new RdeWriter(gcsUtils(), rdeBucket(), stagingKeyBytes(), validationMode()))) + .apply("Update cursors", ParDo.of(new CursorUpdater())); + return PDone.in(input.getPipeline()); + } + } + + private static class RdeWriter + extends DoFn>, KV> { + + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); + + private final GcsUtils gcsUtils; + private final String rdeBucket; + private final byte[] stagingKeyBytes; + private final RdeMarshaller marshaller; + + protected RdeWriter( + GcsUtils gcsUtils, + String rdeBucket, + byte[] stagingKeyBytes, + ValidationMode validationMode) { + this.gcsUtils = gcsUtils; + this.rdeBucket = rdeBucket; + this.stagingKeyBytes = stagingKeyBytes; + this.marshaller = new RdeMarshaller(validationMode); + } + + @Setup + public void setup() { + Security.addProvider(new BouncyCastleProvider()); + } + + @ProcessElement + public void processElement( + @Element KV> kv, + PipelineOptions options, + OutputReceiver> outputReceiver) { + PGPPublicKey stagingKey = PgpHelper.loadPublicKeyBytes(stagingKeyBytes); + PendingDeposit key = kv.getKey(); + Iterable fragments = kv.getValue(); + RdeCounter counter = new RdeCounter(); + + // Determine some basic things about the deposit. + final RdeMode mode = key.mode(); + final String tld = key.tld(); + final DateTime watermark = key.watermark(); + final int revision = + Optional.ofNullable(key.revision()) + .orElseGet(() -> RdeRevision.getNextRevision(tld, watermark, mode)); + String id = RdeUtil.timestampToId(watermark); + String prefix = options.getJobName(); + String basename = RdeNamingUtils.makeRydeFilename(tld, watermark, mode, 1, revision); + if (key.manual()) { + checkState(key.directoryWithTrailingSlash() != null, "Manual subdirectory not specified"); + prefix = prefix + "/manual/" + key.directoryWithTrailingSlash() + basename; + } else { + prefix = prefix + "/" + basename; + } + BlobId xmlFilename = BlobId.of(rdeBucket, prefix + ".xml.ghostryde"); + // This file will contain the byte length (ASCII) of the raw unencrypted XML. + // + // This is necessary because RdeUploadAction creates a tar file which requires that the length + // be outputted. We don't want to have to decrypt the entire ghostryde file to determine the + // length, so we just save it separately. + BlobId xmlLengthFilename = BlobId.of(rdeBucket, prefix + ".xml.length"); + BlobId reportFilename = BlobId.of(rdeBucket, prefix + "-report.xml.ghostryde"); + + // These variables will be populated as we write the deposit XML and used for other files. + boolean failed = false; + XjcRdeHeader header; + + // Write a gigantic XML file to GCS. We'll start by opening encrypted out/err file handles. + + logger.atInfo().log("Writing %s and %s", xmlFilename, xmlLengthFilename); + try (OutputStream gcsOutput = gcsUtils.openOutputStream(xmlFilename); + OutputStream lengthOutput = gcsUtils.openOutputStream(xmlLengthFilename); + OutputStream ghostrydeEncoder = Ghostryde.encoder(gcsOutput, stagingKey, lengthOutput); + Writer output = new OutputStreamWriter(ghostrydeEncoder, UTF_8)) { + + // Output the top portion of the XML document. + output.write(marshaller.makeHeader(id, watermark, RdeResourceType.getUris(mode), revision)); + + // Output XML fragments while counting them. + for (DepositFragment fragment : fragments) { + if (!fragment.xml().isEmpty()) { + output.write(fragment.xml()); + counter.increment(fragment.type()); + } + if (!fragment.error().isEmpty()) { + failed = true; + logger.atSevere().log("Fragment error: %s", fragment.error()); + } + } + + // Don't write the IDN elements for BRDA. + if (mode == RdeMode.FULL) { + for (IdnTableEnum idn : IdnTableEnum.values()) { + output.write(marshaller.marshalIdn(idn.getTable())); + counter.increment(RdeResourceType.IDN); + } + } + + // Output XML that says how many resources were emitted. + header = counter.makeHeader(tld, mode); + output.write(marshaller.marshalOrDie(new XjcRdeHeaderElement(header))); + + // Output the bottom of the XML document. + output.write(marshaller.makeFooter()); + + } catch (IOException e) { + throw new RuntimeException(e); + } + + // If an entity was broken, abort after writing as much logs/deposit data as possible. + verify(!failed, "RDE staging failed for TLD %s", tld); + + // Write a tiny XML file to GCS containing some information about the deposit. + // + // This will be sent to ICANN once we're done uploading the big XML to the escrow provider. + if (mode == RdeMode.FULL) { + logger.atInfo().log("Writing %s", reportFilename); + try (OutputStream gcsOutput = gcsUtils.openOutputStream(reportFilename); + OutputStream ghostrydeEncoder = Ghostryde.encoder(gcsOutput, stagingKey)) { + counter.makeReport(id, watermark, header, revision).marshal(ghostrydeEncoder, UTF_8); + } catch (IOException | XmlException e) { + throw new RuntimeException(e); + } + } + // Now that we're done, output roll the cursor forward. + if (key.manual()) { + logger.atInfo().log("Manual operation; not advancing cursor or enqueuing upload task"); + } else { + outputReceiver.output(KV.of(key, revision)); + } + } + } + + private static class CursorUpdater extends DoFn, Void> { + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); + + @ProcessElement + public void processElement(@Element KV input) { + tm().transact( + () -> { + PendingDeposit key = input.getKey(); + int revision = input.getValue(); + Registry registry = Registry.get(key.tld()); + Optional cursor = + transactIfJpaTm( + () -> + tm().loadByKeyIfPresent( + Cursor.createVKey(key.cursor(), registry.getTldStr()))); + DateTime position = getCursorTimeOrStartOfTime(cursor); + checkState(key.interval() != null, "Interval must be present"); + DateTime newPosition = key.watermark().plus(key.interval()); + if (!position.isBefore(newPosition)) { + logger.atWarning().log("Cursor has already been rolled forward."); + return; + } + verify( + position.equals(key.watermark()), + "Partial ordering of RDE deposits broken: %s %s", + position, + key); + tm().put(Cursor.create(key.cursor(), newPosition, registry)); + logger.atInfo().log( + "Rolled forward %s on %s cursor to %s", key.cursor(), key.tld(), newPosition); + RdeRevision.saveRevision(key.tld(), key.watermark(), key.mode(), revision); + }); + } + } +} diff --git a/core/src/main/java/google/registry/beam/rde/RdePipeline.java b/core/src/main/java/google/registry/beam/rde/RdePipeline.java index c59f19eee..3c70a0071 100644 --- a/core/src/main/java/google/registry/beam/rde/RdePipeline.java +++ b/core/src/main/java/google/registry/beam/rde/RdePipeline.java @@ -18,14 +18,18 @@ import static com.google.common.collect.ImmutableSet.toImmutableSet; import static google.registry.model.EppResourceUtils.loadAtPointInTimeAsync; import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; -import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSetMultimap; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.io.BaseEncoding; +import dagger.BindsInstance; +import dagger.Component; import google.registry.beam.common.RegistryJpaIO; +import google.registry.config.CredentialModule; +import google.registry.config.RegistryConfig.ConfigModule; +import google.registry.gcs.GcsUtils; import google.registry.model.EppResource; import google.registry.model.contact.ContactResource; import google.registry.model.domain.DomainBase; @@ -33,6 +37,7 @@ import google.registry.model.host.HostResource; import google.registry.model.rde.RdeMode; import google.registry.model.registrar.Registrar; import google.registry.model.registrar.Registrar.Type; +import google.registry.persistence.PersistenceModule.TransactionIsolationLevel; import google.registry.persistence.VKey; import google.registry.rde.DepositFragment; import google.registry.rde.PendingDeposit; @@ -50,6 +55,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.function.Supplier; +import javax.inject.Inject; +import javax.inject.Singleton; import javax.persistence.Entity; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; @@ -58,6 +65,7 @@ import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.FlatMapElements; import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.Reshuffle; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -76,10 +84,15 @@ import org.joda.time.DateTime; * @see Using * Flex Templates */ +@Singleton public class RdePipeline implements Serializable { - private final RdeMarshaller marshaller; + private final transient RdePipelineOptions options; + private final ValidationMode mode; private final ImmutableSetMultimap pendings; + private final String rdeBucket; + private final byte[] stagingKeyBytes; + private final GcsUtils gcsUtils; // Registrars to be excluded from data escrow. Not including the sandbox-only OTE type so that // if sneaks into production we would get an extra signal. @@ -97,31 +110,43 @@ public class RdePipeline implements Serializable { + (clazz.equals(DomainBase.class) ? " AND tld in (:tlds)" : ""); } - RdePipeline(RdePipelineOptions options) throws IOException, ClassNotFoundException { - this.marshaller = new RdeMarshaller(ValidationMode.valueOf(options.getValidationMode())); + @Inject + RdePipeline(RdePipelineOptions options, GcsUtils gcsUtils) { + this.options = options; + this.mode = ValidationMode.valueOf(options.getValidationMode()); this.pendings = decodePendings(options.getPendings()); - } - - @VisibleForTesting - PipelineResult run(Pipeline pipeline) { - createFragments(pipeline); - return pipeline.run(); + this.rdeBucket = options.getGcsBucket(); + this.stagingKeyBytes = BaseEncoding.base64Url().decode(options.getStagingKey()); + this.gcsUtils = gcsUtils; } PipelineResult run() { - return run(Pipeline.create()); + Pipeline pipeline = Pipeline.create(options); + PCollection>> fragments = + createFragments(pipeline); + persistData(fragments); + return pipeline.run(); } - PCollection> createFragments(Pipeline pipeline) { - PCollection> fragments = - PCollectionList.of(processRegistrars(pipeline)) - .and(processNonRegistrarEntities(pipeline, DomainBase.class)) - .and(processNonRegistrarEntities(pipeline, ContactResource.class)) - .and(processNonRegistrarEntities(pipeline, HostResource.class)) - .apply(Flatten.pCollections()) - .setCoder( - KvCoder.of(PendingDepositCoder.of(), SerializableCoder.of(DepositFragment.class))); - return fragments; + PCollection>> createFragments(Pipeline pipeline) { + return PCollectionList.of(processRegistrars(pipeline)) + .and(processNonRegistrarEntities(pipeline, DomainBase.class)) + .and(processNonRegistrarEntities(pipeline, ContactResource.class)) + .and(processNonRegistrarEntities(pipeline, HostResource.class)) + .apply(Flatten.pCollections()) + .setCoder(KvCoder.of(PendingDepositCoder.of(), SerializableCoder.of(DepositFragment.class))) + .apply("Group by PendingDeposit", GroupByKey.create()); + } + + void persistData(PCollection>> input) { + input.apply( + "Write to GCS and update cursors", + RdeIO.Write.builder() + .setRdeBucket(rdeBucket) + .setGcsUtils(gcsUtils) + .setValidationMode(mode) + .setStagingKeyBytes(stagingKeyBytes) + .build()); } PCollection> processRegistrars(Pipeline pipeline) { @@ -144,7 +169,8 @@ public class RdePipeline implements Serializable { .via( (VKey key) -> { Registrar registrar = jpaTm().transact(() -> jpaTm().loadByKey(key)); - DepositFragment fragment = marshaller.marshalRegistrar(registrar); + DepositFragment fragment = + new RdeMarshaller(mode).marshalRegistrar(registrar); return pendings.values().stream() .map(pending -> KV.of(pending, fragment)) .collect(toImmutableSet()); @@ -205,7 +231,8 @@ public class RdePipeline implements Serializable { Maps.asMap(dates, input -> loadAtPointInTimeAsync(resource, input))); // Convert resource to an XML fragment for each watermark/mode pair lazily and cache // the result. - RdeFragmenter fragmenter = new RdeFragmenter(resourceAtTimes, marshaller); + RdeFragmenter fragmenter = + new RdeFragmenter(resourceAtTimes, new RdeMarshaller(mode)); List> results = new ArrayList<>(); for (String tld : tlds) { for (PendingDeposit pending : pendings.get(tld)) { @@ -228,13 +255,14 @@ public class RdePipeline implements Serializable { * the original TLD to pending deposit map. */ @SuppressWarnings("unchecked") - static ImmutableSetMultimap decodePendings(String encodedPending) - throws IOException, ClassNotFoundException { + static ImmutableSetMultimap decodePendings(String encodedPending) { try (ObjectInputStream ois = new ObjectInputStream( new ByteArrayInputStream( BaseEncoding.base64Url().omitPadding().decode(encodedPending)))) { return (ImmutableSetMultimap) ois.readObject(); + } catch (IOException | ClassNotFoundException e) { + throw new IllegalArgumentException("Unable to parse encoded pending deposit map.", e); } } @@ -242,7 +270,7 @@ public class RdePipeline implements Serializable { * Encodes the TLD to pending deposit map in an URL safe string that is sent to the pipeline * worker by the pipeline launcher as a pipeline option. */ - public static String encodePendings(ImmutableSetMultimap pendings) + static String encodePendings(ImmutableSetMultimap pendings) throws IOException { try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { ObjectOutputStream oos = new ObjectOutputStream(baos); @@ -255,6 +283,21 @@ public class RdePipeline implements Serializable { public static void main(String[] args) throws IOException, ClassNotFoundException { PipelineOptionsFactory.register(RdePipelineOptions.class); RdePipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(RdePipelineOptions.class); - new RdePipeline(options).run(); + options.setIsolationOverride(TransactionIsolationLevel.TRANSACTION_READ_COMMITTED); + DaggerRdePipeline_RdePipelineComponent.builder().options(options).build().rdePipeline().run(); + } + + @Singleton + @Component(modules = {CredentialModule.class, ConfigModule.class}) + interface RdePipelineComponent { + RdePipeline rdePipeline(); + + @Component.Builder + interface Builder { + @BindsInstance + Builder options(RdePipelineOptions options); + + RdePipelineComponent build(); + } } } diff --git a/core/src/main/java/google/registry/beam/rde/RdePipelineOptions.java b/core/src/main/java/google/registry/beam/rde/RdePipelineOptions.java index d82de1ce8..333491ff0 100644 --- a/core/src/main/java/google/registry/beam/rde/RdePipelineOptions.java +++ b/core/src/main/java/google/registry/beam/rde/RdePipelineOptions.java @@ -29,4 +29,14 @@ public interface RdePipelineOptions extends RegistryPipelineOptions { String getValidationMode(); void setValidationMode(String value); + + @Description("The GCS bucket where the encrypted RDE deposits will be uploaded to.") + String getGcsBucket(); + + void setGcsBucket(String value); + + @Description("The Base64-encoded PGP public key to encrypt the deposits.") + String getStagingKey(); + + void setStagingKey(String value); } diff --git a/core/src/main/java/google/registry/model/rde/RdeRevision.java b/core/src/main/java/google/registry/model/rde/RdeRevision.java index 58b0afdb1..cd0675647 100644 --- a/core/src/main/java/google/registry/model/rde/RdeRevision.java +++ b/core/src/main/java/google/registry/model/rde/RdeRevision.java @@ -154,7 +154,7 @@ public final class RdeRevision extends BackupGroupRoot implements NonReplicatedE } /** Class to represent the composite primary key of {@link RdeRevision} entity. */ - static class RdeRevisionId extends ImmutableObject implements Serializable { + public static class RdeRevisionId extends ImmutableObject implements Serializable { String tld; @@ -169,7 +169,7 @@ public final class RdeRevision extends BackupGroupRoot implements NonReplicatedE /** Hibernate requires this default constructor. */ private RdeRevisionId() {} - static RdeRevisionId create(String tld, LocalDate date, RdeMode mode) { + public static RdeRevisionId create(String tld, LocalDate date, RdeMode mode) { RdeRevisionId instance = new RdeRevisionId(); instance.tld = tld; instance.date = date; diff --git a/core/src/main/java/google/registry/rde/PendingDeposit.java b/core/src/main/java/google/registry/rde/PendingDeposit.java index c8bca8823..1ba7e2a61 100644 --- a/core/src/main/java/google/registry/rde/PendingDeposit.java +++ b/core/src/main/java/google/registry/rde/PendingDeposit.java @@ -83,7 +83,7 @@ public abstract class PendingDeposit implements Serializable { return new AutoValue_PendingDeposit(false, tld, watermark, mode, cursor, interval, null, null); } - static PendingDeposit createInManualOperation( + public static PendingDeposit createInManualOperation( String tld, DateTime watermark, RdeMode mode, diff --git a/core/src/main/resources/google/registry/beam/rde_pipeline_metadata.json b/core/src/main/resources/google/registry/beam/rde_pipeline_metadata.json index a88f69dd0..5911623f0 100644 --- a/core/src/main/resources/google/registry/beam/rde_pipeline_metadata.json +++ b/core/src/main/resources/google/registry/beam/rde_pipeline_metadata.json @@ -14,10 +14,26 @@ "name": "validationMode", "label": "How strict the marshaller validates the given EPP resources.", "helpText": "If set to LENIENT the marshaller will not warn about missing data on the EPP resources.", - "is_optional": true, "regexes": [ "^STRICT|LENIENT$" ] + }, + { + "name": "gcsBucket", + "label": "The GCS bucket that where the resulting files will be stored.", + "helpText": "Only the bucket name itself, without the leading \"gs://\".", + "is_optional": false, + "regexes": [ + "^[a-zA-Z0-9_\\-]+$" + ] + }, + { + "name": "stagingKey", + "label": "The PGP public key used to encrypt the RDE/BRDA deposit files.", + "helpText": "The key is Base64 URL-safe encoded.", + "regexes": [ + "A-Za-z0-9\\-_" + ] } ] } diff --git a/core/src/test/java/google/registry/beam/rde/RdePipelineTest.java b/core/src/test/java/google/registry/beam/rde/RdePipelineTest.java index 1da0ebda9..53c73999d 100644 --- a/core/src/test/java/google/registry/beam/rde/RdePipelineTest.java +++ b/core/src/test/java/google/registry/beam/rde/RdePipelineTest.java @@ -36,33 +36,54 @@ import static google.registry.testing.DatabaseHelper.persistActiveDomain; import static google.registry.testing.DatabaseHelper.persistDeletedDomain; import static google.registry.testing.DatabaseHelper.persistEppResource; import static google.registry.testing.DatabaseHelper.persistNewRegistrar; +import static google.registry.util.ResourceUtils.readResourceUtf8; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.joda.time.Duration.standardDays; +import com.google.cloud.storage.BlobId; import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSetMultimap; import com.google.common.collect.Iterables; import com.google.common.collect.Streams; +import com.google.common.io.BaseEncoding; import google.registry.beam.TestPipelineExtension; +import google.registry.gcs.GcsUtils; +import google.registry.gcs.backport.LocalStorageHelper; +import google.registry.keyring.api.PgpHelper; +import google.registry.model.common.Cursor; +import google.registry.model.common.Cursor.CursorType; import google.registry.model.host.HostResource; +import google.registry.model.rde.RdeMode; +import google.registry.model.rde.RdeRevision; +import google.registry.model.rde.RdeRevision.RdeRevisionId; import google.registry.model.registrar.Registrar; import google.registry.model.registrar.Registrar.State; +import google.registry.model.registry.Registry; +import google.registry.persistence.VKey; import google.registry.persistence.transaction.JpaTestRules; import google.registry.persistence.transaction.JpaTestRules.JpaIntegrationTestExtension; import google.registry.persistence.transaction.TransactionManagerFactory; import google.registry.rde.DepositFragment; +import google.registry.rde.Ghostryde; import google.registry.rde.PendingDeposit; import google.registry.rde.RdeResourceType; import google.registry.testing.DatastoreEntityExtension; import google.registry.testing.FakeClock; +import google.registry.testing.FakeKeyringModule; +import java.io.IOException; import java.util.function.Function; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Stream; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.bouncycastle.openpgp.PGPPrivateKey; +import org.bouncycastle.openpgp.PGPPublicKey; import org.joda.time.DateTime; import org.joda.time.Duration; import org.junit.jupiter.api.AfterEach; @@ -83,21 +104,44 @@ public class RdePipelineTest { private static final String HOST_NAME_PATTERN = "(.*)"; - private static final ImmutableSetMultimap PENDINGS = - ImmutableSetMultimap.of( - "pal", - PendingDeposit.create( - "pal", DateTime.parse("2000-01-01TZ"), FULL, RDE_STAGING, standardDays(1)), - "pal", - PendingDeposit.create( - "pal", DateTime.parse("2000-01-01TZ"), THIN, RDE_STAGING, standardDays(1)), - "fun", - PendingDeposit.create( - "fun", DateTime.parse("2000-01-01TZ"), FULL, RDE_STAGING, standardDays(1))); - // This is the default creation time for test data. private final FakeClock clock = new FakeClock(DateTime.parse("1999-12-31TZ")); + // This is teh default as-of time the RDE/BRDA job. + private final DateTime now = DateTime.parse("2000-01-01TZ"); + + private final ImmutableSetMultimap pendings = + ImmutableSetMultimap.of( + "soy", + PendingDeposit.create("soy", now, FULL, RDE_STAGING, standardDays(1)), + "soy", + PendingDeposit.create("soy", now, THIN, RDE_STAGING, standardDays(1)), + "fun", + PendingDeposit.create("fun", now, FULL, RDE_STAGING, standardDays(1))); + + private final ImmutableList brdaFragments = + ImmutableList.of( + DepositFragment.create(RdeResourceType.DOMAIN, "\n", ""), + DepositFragment.create(RdeResourceType.REGISTRAR, "\n", "")); + + private final ImmutableList rdeFragments = + ImmutableList.of( + DepositFragment.create(RdeResourceType.DOMAIN, "\n", ""), + DepositFragment.create(RdeResourceType.REGISTRAR, "\n", ""), + DepositFragment.create(RdeResourceType.CONTACT, "\n", ""), + DepositFragment.create(RdeResourceType.HOST, "\n", "")); + + private final GcsUtils gcsUtils = new GcsUtils(LocalStorageHelper.getOptions()); + + private final PGPPublicKey encryptionKey = + new FakeKeyringModule().get().getRdeStagingEncryptionKey(); + + private final PGPPrivateKey decryptionKey = + new FakeKeyringModule().get().getRdeStagingDecryptionKey(); + + private final RdePipelineOptions options = + PipelineOptionsFactory.create().as(RdePipelineOptions.class); + // The pipeline runs in a different thread, which needs to be masqueraded as a GAE thread as well. @RegisterExtension @Order(Order.DEFAULT - 1) @@ -109,10 +153,7 @@ public class RdePipelineTest { @RegisterExtension final TestPipelineExtension pipeline = - TestPipelineExtension.create().enableAbandonedNodeEnforcement(true); - - private final RdePipelineOptions options = - PipelineOptionsFactory.create().as(RdePipelineOptions.class); + TestPipelineExtension.fromOptions(options).enableAbandonedNodeEnforcement(true); private RdePipeline rdePipeline; @@ -138,16 +179,24 @@ public class RdePipelineTest { testRegistrar.asBuilder().setState(State.ACTIVE).build(), monitoringRegistrar.asBuilder().setState(State.ACTIVE).build()))); - createTld("pal"); + createTld("soy"); createTld("fun"); createTld("cat"); + tm().transact( + () -> { + tm().put(Cursor.create(CursorType.BRDA, now, Registry.get("soy"))); + tm().put(Cursor.create(CursorType.RDE_STAGING, now, Registry.get("soy"))); + RdeRevision.saveRevision("soy", now, THIN, 0); + RdeRevision.saveRevision("soy", now, FULL, 0); + }); + // Also persists a "contact1234" contact in the process. - persistActiveDomain("hello.pal"); + persistActiveDomain("hello.soy"); persistActiveDomain("kitty.fun"); // Should not appear anywhere. persistActiveDomain("lol.cat"); - persistDeletedDomain("deleted.pal", DateTime.parse("1999-12-30TZ")); + persistDeletedDomain("deleted.soy", DateTime.parse("1999-12-30TZ")); persistActiveContact("contact456"); @@ -156,11 +205,15 @@ public class RdePipelineTest { clock.advanceBy(Duration.standardDays(2)); persistEppResource(host.asBuilder().setHostName("new.host.test").build()); - options.setPendings(encodePendings(PENDINGS)); + options.setPendings(encodePendings(pendings)); // The EPP resources created in tests do not have all the fields populated, using a STRICT // validation mode will result in a lot of warnings during marshalling. options.setValidationMode("LENIENT"); - rdePipeline = new RdePipeline(options); + options.setStagingKey( + BaseEncoding.base64Url().encode(PgpHelper.convertPublicKeyToBytes(encryptionKey))); + options.setGcsBucket("gcs-bucket"); + options.setJobName("rde-job"); + rdePipeline = new RdePipeline(options, gcsUtils); } @AfterEach @@ -170,16 +223,13 @@ public class RdePipelineTest { @Test void testSuccess_encodeAndDecodePendingsMap() throws Exception { - String encodedString = encodePendings(PENDINGS); - assertThat(decodePendings(encodedString)).isEqualTo(PENDINGS); + String encodedString = encodePendings(pendings); + assertThat(decodePendings(encodedString)).isEqualTo(pendings); } @Test void testSuccess_createFragments() { - PAssert.that( - rdePipeline - .createFragments(pipeline) - .apply("Group by PendingDeposit", GroupByKey.create())) + PAssert.that(rdePipeline.createFragments(pipeline)) .satisfies( kvs -> { kvs.forEach( @@ -198,7 +248,7 @@ public class RdePipelineTest { .anyMatch( domain -> // Deleted domain should not be included - domain.equals("deleted.pal") + domain.equals("deleted.soy") // Only domains on the pending deposit's tld should // appear. || !kv.getKey() @@ -238,6 +288,116 @@ public class RdePipelineTest { pipeline.run().waitUntilFinish(); } + @Test + void testSuccess_persistData() throws Exception { + PendingDeposit brdaKey = + PendingDeposit.create("soy", now, THIN, CursorType.BRDA, Duration.standardDays(1)); + PendingDeposit rdeKey = + PendingDeposit.create("soy", now, FULL, CursorType.RDE_STAGING, Duration.standardDays(1)); + + verifyFiles(ImmutableMap.of(brdaKey, brdaFragments, rdeKey, rdeFragments), false); + + assertThat(gcsUtils.listFolderObjects("gcs-bucket", "rde-job/")) + .containsExactly( + "soy_2000-01-01_thin_S1_R1.xml.length", + "soy_2000-01-01_thin_S1_R1.xml.ghostryde", + "soy_2000-01-01_full_S1_R1.xml.length", + "soy_2000-01-01_full_S1_R1.xml.ghostryde", + "soy_2000-01-01_full_S1_R1-report.xml.ghostryde"); + + assertThat(loadCursorTime(CursorType.BRDA)) + .isEquivalentAccordingToCompareTo(now.plus(Duration.standardDays(1))); + assertThat(loadRevision(now, THIN)).isEqualTo(1); + + assertThat(loadCursorTime(CursorType.RDE_STAGING)) + .isEquivalentAccordingToCompareTo(now.plus(Duration.standardDays(1))); + assertThat(loadRevision(now, FULL)).isEqualTo(1); + } + + @Test + void testSuccess_persistData_manual() throws Exception { + PendingDeposit brdaKey = PendingDeposit.createInManualOperation("soy", now, THIN, "test/", 0); + PendingDeposit rdeKey = PendingDeposit.createInManualOperation("soy", now, FULL, "test/", 0); + + verifyFiles(ImmutableMap.of(brdaKey, brdaFragments, rdeKey, rdeFragments), true); + + assertThat(gcsUtils.listFolderObjects("gcs-bucket", "rde-job/")) + .containsExactly( + "manual/test/soy_2000-01-01_thin_S1_R0.xml.length", + "manual/test/soy_2000-01-01_thin_S1_R0.xml.ghostryde", + "manual/test/soy_2000-01-01_full_S1_R0.xml.length", + "manual/test/soy_2000-01-01_full_S1_R0.xml.ghostryde", + "manual/test/soy_2000-01-01_full_S1_R0-report.xml.ghostryde"); + + assertThat(loadCursorTime(CursorType.BRDA)).isEquivalentAccordingToCompareTo(now); + assertThat(loadRevision(now, THIN)).isEqualTo(0); + + assertThat(loadCursorTime(CursorType.RDE_STAGING)).isEquivalentAccordingToCompareTo(now); + assertThat(loadRevision(now, FULL)).isEqualTo(0); + } + + private void verifyFiles( + ImmutableMap> input, boolean manual) + throws Exception { + PCollection>> fragments = + pipeline.apply("Create Input", Create.of(input)); + rdePipeline.persistData(fragments); + pipeline.run().waitUntilFinish(); + + String prefix = manual ? "rde-job/manual/test/" : "rde-job/"; + String revision = manual ? "R0" : "R1"; + + // BRDA + String brdaOutputFile = + decryptGhostrydeGcsFile(prefix + "soy_2000-01-01_thin_S1_" + revision + ".xml.ghostryde"); + assertThat(brdaOutputFile) + .isEqualTo( + readResourceUtf8(this.getClass(), "reducer_brda.xml") + .replace("%RESEND%", manual ? "" : " resend=\"1\"")); + compareLength(brdaOutputFile, prefix + "soy_2000-01-01_thin_S1_" + revision + ".xml.length"); + + // RDE + String rdeOutputFile = + decryptGhostrydeGcsFile(prefix + "soy_2000-01-01_full_S1_" + revision + ".xml.ghostryde"); + assertThat(rdeOutputFile) + .isEqualTo( + readResourceUtf8(RdePipelineTest.class, "reducer_rde.xml") + .replace("%RESEND%", manual ? "" : " resend=\"1\"")); + compareLength(rdeOutputFile, prefix + "soy_2000-01-01_full_S1_" + revision + ".xml.length"); + assertThat( + decryptGhostrydeGcsFile( + prefix + "soy_2000-01-01_full_S1_" + revision + "-report.xml.ghostryde")) + .isEqualTo( + readResourceUtf8(RdePipelineTest.class, "reducer_rde_report.xml") + .replace("%RESEND%", manual ? "0" : "1")); + } + + private String decryptGhostrydeGcsFile(String filename) throws IOException { + return new String( + Ghostryde.decode(gcsUtils.readBytesFrom(BlobId.of("gcs-bucket", filename)), decryptionKey), + UTF_8); + } + + private void compareLength(String outputFile, String lengthFilename) throws IOException { + assertThat(String.valueOf(outputFile.getBytes(UTF_8).length)) + .isEqualTo( + new String(gcsUtils.readBytesFrom(BlobId.of("gcs-bucket", lengthFilename)), UTF_8)); + } + + private static int loadRevision(DateTime now, RdeMode mode) { + return tm().transact( + () -> + tm().loadByKey( + VKey.createSql( + RdeRevision.class, + RdeRevisionId.create("soy", now.toLocalDate(), mode))) + .getRevision()); + } + + private static DateTime loadCursorTime(CursorType type) { + return tm().transact(() -> tm().loadByKey(Cursor.createVKey(type, "soy")).getCursorTime()); + } + private static Function getXmlElement(String pattern) { return (fragment) -> { Matcher matcher = Pattern.compile(pattern).matcher(fragment.xml()); diff --git a/core/src/test/java/google/registry/rde/RdeStagingReducerTest.java b/core/src/test/java/google/registry/rde/RdeStagingReducerTest.java index f2fa21bef..5f06b891e 100644 --- a/core/src/test/java/google/registry/rde/RdeStagingReducerTest.java +++ b/core/src/test/java/google/registry/rde/RdeStagingReducerTest.java @@ -30,6 +30,7 @@ import com.google.appengine.tools.mapreduce.ReducerInput; import com.google.cloud.storage.BlobId; import com.google.cloud.storage.StorageException; import com.google.common.collect.ImmutableList; +import google.registry.beam.rde.RdePipelineTest; import google.registry.gcs.GcsUtils; import google.registry.gcs.backport.LocalStorageHelper; import google.registry.keyring.api.PgpHelper; @@ -120,7 +121,7 @@ class RdeStagingReducerTest { String outputFile = decryptGhostrydeGcsFile("soy_2000-01-01_thin_S1_R1.xml.ghostryde"); assertThat(outputFile) .isEqualTo( - readResourceUtf8(RdeStagingReducerTest.class, "reducer_brda.xml") + readResourceUtf8(RdePipelineTest.class, "reducer_brda.xml") .replace("%RESEND%", " resend=\"1\"")); compareLength(outputFile, "soy_2000-01-01_thin_S1_R1.xml.length"); // BRDA doesn't write a report file. @@ -147,8 +148,7 @@ class RdeStagingReducerTest { String outputFile = decryptGhostrydeGcsFile("manual/soy_2000-01-01_thin_S1_R0.xml.ghostryde"); assertThat(outputFile) .isEqualTo( - readResourceUtf8(RdeStagingReducerTest.class, "reducer_brda.xml") - .replace("%RESEND%", "")); + readResourceUtf8(RdePipelineTest.class, "reducer_brda.xml").replace("%RESEND%", "")); compareLength(outputFile, "manual/soy_2000-01-01_thin_S1_R0.xml.length"); // BRDA doesn't write a report file. assertThrows( @@ -169,12 +169,12 @@ class RdeStagingReducerTest { String outputFile = decryptGhostrydeGcsFile("soy_2000-01-01_full_S1_R1.xml.ghostryde"); assertThat(outputFile) .isEqualTo( - readResourceUtf8(RdeStagingReducerTest.class, "reducer_rde.xml") + readResourceUtf8(RdePipelineTest.class, "reducer_rde.xml") .replace("%RESEND%", " resend=\"1\"")); compareLength(outputFile, "soy_2000-01-01_full_S1_R1.xml.length"); assertThat(decryptGhostrydeGcsFile("soy_2000-01-01_full_S1_R1-report.xml.ghostryde")) .isEqualTo( - readResourceUtf8(RdeStagingReducerTest.class, "reducer_rde_report.xml") + readResourceUtf8(RdePipelineTest.class, "reducer_rde_report.xml") .replace("%RESEND%", "1")); assertThat(loadCursorTime(CursorType.RDE_STAGING)) .isEquivalentAccordingToCompareTo(now.plus(Duration.standardDays(1))); @@ -191,12 +191,11 @@ class RdeStagingReducerTest { String outputFile = decryptGhostrydeGcsFile("manual/soy_2000-01-01_full_S1_R0.xml.ghostryde"); assertThat(outputFile) .isEqualTo( - readResourceUtf8(RdeStagingReducerTest.class, "reducer_rde.xml") - .replace("%RESEND%", "")); + readResourceUtf8(RdePipelineTest.class, "reducer_rde.xml").replace("%RESEND%", "")); compareLength(outputFile, "manual/soy_2000-01-01_full_S1_R0.xml.length"); assertThat(decryptGhostrydeGcsFile("manual/soy_2000-01-01_full_S1_R0-report.xml.ghostryde")) .isEqualTo( - readResourceUtf8(RdeStagingReducerTest.class, "reducer_rde_report.xml") + readResourceUtf8(RdePipelineTest.class, "reducer_rde_report.xml") .replace("%RESEND%", "0")); // No extra operations in manual mode. assertThat(loadCursorTime(CursorType.RDE_STAGING)).isEquivalentAccordingToCompareTo(now); diff --git a/core/src/test/resources/google/registry/rde/reducer_brda.xml b/core/src/test/resources/google/registry/beam/rde/reducer_brda.xml similarity index 100% rename from core/src/test/resources/google/registry/rde/reducer_brda.xml rename to core/src/test/resources/google/registry/beam/rde/reducer_brda.xml diff --git a/core/src/test/resources/google/registry/rde/reducer_rde.xml b/core/src/test/resources/google/registry/beam/rde/reducer_rde.xml similarity index 100% rename from core/src/test/resources/google/registry/rde/reducer_rde.xml rename to core/src/test/resources/google/registry/beam/rde/reducer_rde.xml diff --git a/core/src/test/resources/google/registry/rde/reducer_rde_report.xml b/core/src/test/resources/google/registry/beam/rde/reducer_rde_report.xml similarity index 100% rename from core/src/test/resources/google/registry/rde/reducer_rde_report.xml rename to core/src/test/resources/google/registry/beam/rde/reducer_rde_report.xml