diff --git a/core/src/main/java/google/registry/beam/rde/RdeIO.java b/core/src/main/java/google/registry/beam/rde/RdeIO.java new file mode 100644 index 000000000..a17c7f3e1 --- /dev/null +++ b/core/src/main/java/google/registry/beam/rde/RdeIO.java @@ -0,0 +1,273 @@ +// Copyright 2021 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam.rde; + +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Verify.verify; +import static google.registry.model.common.Cursor.getCursorTimeOrStartOfTime; +import static google.registry.persistence.transaction.TransactionManagerFactory.tm; +import static google.registry.persistence.transaction.TransactionManagerUtil.transactIfJpaTm; +import static java.nio.charset.StandardCharsets.UTF_8; + +import com.google.auto.value.AutoValue; +import com.google.cloud.storage.BlobId; +import com.google.common.flogger.FluentLogger; +import google.registry.gcs.GcsUtils; +import google.registry.keyring.api.PgpHelper; +import google.registry.model.common.Cursor; +import google.registry.model.rde.RdeMode; +import google.registry.model.rde.RdeNamingUtils; +import google.registry.model.rde.RdeRevision; +import google.registry.model.registry.Registry; +import google.registry.rde.DepositFragment; +import google.registry.rde.Ghostryde; +import google.registry.rde.PendingDeposit; +import google.registry.rde.RdeCounter; +import google.registry.rde.RdeMarshaller; +import google.registry.rde.RdeResourceType; +import google.registry.rde.RdeUtil; +import google.registry.tldconfig.idn.IdnTableEnum; +import google.registry.xjc.rdeheader.XjcRdeHeader; +import google.registry.xjc.rdeheader.XjcRdeHeaderElement; +import google.registry.xml.ValidationMode; +import google.registry.xml.XmlException; +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.security.Security; +import java.util.Optional; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.bouncycastle.jce.provider.BouncyCastleProvider; +import org.bouncycastle.openpgp.PGPPublicKey; +import org.joda.time.DateTime; + +public class RdeIO { + + @AutoValue + abstract static class Write + extends PTransform>>, PDone> { + + abstract GcsUtils gcsUtils(); + + abstract String rdeBucket(); + + // It's OK to return a primitive array because we are only using it to construct the + // PGPPublicKey, which is not serializable. + @SuppressWarnings("mutable") + abstract byte[] stagingKeyBytes(); + + abstract ValidationMode validationMode(); + + static Builder builder() { + return new AutoValue_RdeIO_Write.Builder(); + } + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setGcsUtils(GcsUtils gcsUtils); + + abstract Builder setRdeBucket(String value); + + abstract Builder setStagingKeyBytes(byte[] value); + + abstract Builder setValidationMode(ValidationMode value); + + abstract Write build(); + } + + @Override + public PDone expand(PCollection>> input) { + input + .apply( + "Write to GCS", + ParDo.of(new RdeWriter(gcsUtils(), rdeBucket(), stagingKeyBytes(), validationMode()))) + .apply("Update cursors", ParDo.of(new CursorUpdater())); + return PDone.in(input.getPipeline()); + } + } + + private static class RdeWriter + extends DoFn>, KV> { + + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); + + private final GcsUtils gcsUtils; + private final String rdeBucket; + private final byte[] stagingKeyBytes; + private final RdeMarshaller marshaller; + + protected RdeWriter( + GcsUtils gcsUtils, + String rdeBucket, + byte[] stagingKeyBytes, + ValidationMode validationMode) { + this.gcsUtils = gcsUtils; + this.rdeBucket = rdeBucket; + this.stagingKeyBytes = stagingKeyBytes; + this.marshaller = new RdeMarshaller(validationMode); + } + + @Setup + public void setup() { + Security.addProvider(new BouncyCastleProvider()); + } + + @ProcessElement + public void processElement( + @Element KV> kv, + PipelineOptions options, + OutputReceiver> outputReceiver) { + PGPPublicKey stagingKey = PgpHelper.loadPublicKeyBytes(stagingKeyBytes); + PendingDeposit key = kv.getKey(); + Iterable fragments = kv.getValue(); + RdeCounter counter = new RdeCounter(); + + // Determine some basic things about the deposit. + final RdeMode mode = key.mode(); + final String tld = key.tld(); + final DateTime watermark = key.watermark(); + final int revision = + Optional.ofNullable(key.revision()) + .orElseGet(() -> RdeRevision.getNextRevision(tld, watermark, mode)); + String id = RdeUtil.timestampToId(watermark); + String prefix = options.getJobName(); + String basename = RdeNamingUtils.makeRydeFilename(tld, watermark, mode, 1, revision); + if (key.manual()) { + checkState(key.directoryWithTrailingSlash() != null, "Manual subdirectory not specified"); + prefix = prefix + "/manual/" + key.directoryWithTrailingSlash() + basename; + } else { + prefix = prefix + "/" + basename; + } + BlobId xmlFilename = BlobId.of(rdeBucket, prefix + ".xml.ghostryde"); + // This file will contain the byte length (ASCII) of the raw unencrypted XML. + // + // This is necessary because RdeUploadAction creates a tar file which requires that the length + // be outputted. We don't want to have to decrypt the entire ghostryde file to determine the + // length, so we just save it separately. + BlobId xmlLengthFilename = BlobId.of(rdeBucket, prefix + ".xml.length"); + BlobId reportFilename = BlobId.of(rdeBucket, prefix + "-report.xml.ghostryde"); + + // These variables will be populated as we write the deposit XML and used for other files. + boolean failed = false; + XjcRdeHeader header; + + // Write a gigantic XML file to GCS. We'll start by opening encrypted out/err file handles. + + logger.atInfo().log("Writing %s and %s", xmlFilename, xmlLengthFilename); + try (OutputStream gcsOutput = gcsUtils.openOutputStream(xmlFilename); + OutputStream lengthOutput = gcsUtils.openOutputStream(xmlLengthFilename); + OutputStream ghostrydeEncoder = Ghostryde.encoder(gcsOutput, stagingKey, lengthOutput); + Writer output = new OutputStreamWriter(ghostrydeEncoder, UTF_8)) { + + // Output the top portion of the XML document. + output.write(marshaller.makeHeader(id, watermark, RdeResourceType.getUris(mode), revision)); + + // Output XML fragments while counting them. + for (DepositFragment fragment : fragments) { + if (!fragment.xml().isEmpty()) { + output.write(fragment.xml()); + counter.increment(fragment.type()); + } + if (!fragment.error().isEmpty()) { + failed = true; + logger.atSevere().log("Fragment error: %s", fragment.error()); + } + } + + // Don't write the IDN elements for BRDA. + if (mode == RdeMode.FULL) { + for (IdnTableEnum idn : IdnTableEnum.values()) { + output.write(marshaller.marshalIdn(idn.getTable())); + counter.increment(RdeResourceType.IDN); + } + } + + // Output XML that says how many resources were emitted. + header = counter.makeHeader(tld, mode); + output.write(marshaller.marshalOrDie(new XjcRdeHeaderElement(header))); + + // Output the bottom of the XML document. + output.write(marshaller.makeFooter()); + + } catch (IOException e) { + throw new RuntimeException(e); + } + + // If an entity was broken, abort after writing as much logs/deposit data as possible. + verify(!failed, "RDE staging failed for TLD %s", tld); + + // Write a tiny XML file to GCS containing some information about the deposit. + // + // This will be sent to ICANN once we're done uploading the big XML to the escrow provider. + if (mode == RdeMode.FULL) { + logger.atInfo().log("Writing %s", reportFilename); + try (OutputStream gcsOutput = gcsUtils.openOutputStream(reportFilename); + OutputStream ghostrydeEncoder = Ghostryde.encoder(gcsOutput, stagingKey)) { + counter.makeReport(id, watermark, header, revision).marshal(ghostrydeEncoder, UTF_8); + } catch (IOException | XmlException e) { + throw new RuntimeException(e); + } + } + // Now that we're done, output roll the cursor forward. + if (key.manual()) { + logger.atInfo().log("Manual operation; not advancing cursor or enqueuing upload task"); + } else { + outputReceiver.output(KV.of(key, revision)); + } + } + } + + private static class CursorUpdater extends DoFn, Void> { + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); + + @ProcessElement + public void processElement(@Element KV input) { + tm().transact( + () -> { + PendingDeposit key = input.getKey(); + int revision = input.getValue(); + Registry registry = Registry.get(key.tld()); + Optional cursor = + transactIfJpaTm( + () -> + tm().loadByKeyIfPresent( + Cursor.createVKey(key.cursor(), registry.getTldStr()))); + DateTime position = getCursorTimeOrStartOfTime(cursor); + checkState(key.interval() != null, "Interval must be present"); + DateTime newPosition = key.watermark().plus(key.interval()); + if (!position.isBefore(newPosition)) { + logger.atWarning().log("Cursor has already been rolled forward."); + return; + } + verify( + position.equals(key.watermark()), + "Partial ordering of RDE deposits broken: %s %s", + position, + key); + tm().put(Cursor.create(key.cursor(), newPosition, registry)); + logger.atInfo().log( + "Rolled forward %s on %s cursor to %s", key.cursor(), key.tld(), newPosition); + RdeRevision.saveRevision(key.tld(), key.watermark(), key.mode(), revision); + }); + } + } +} diff --git a/core/src/main/java/google/registry/beam/rde/RdePipeline.java b/core/src/main/java/google/registry/beam/rde/RdePipeline.java index c59f19eee..3c70a0071 100644 --- a/core/src/main/java/google/registry/beam/rde/RdePipeline.java +++ b/core/src/main/java/google/registry/beam/rde/RdePipeline.java @@ -18,14 +18,18 @@ import static com.google.common.collect.ImmutableSet.toImmutableSet; import static google.registry.model.EppResourceUtils.loadAtPointInTimeAsync; import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; -import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSetMultimap; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.io.BaseEncoding; +import dagger.BindsInstance; +import dagger.Component; import google.registry.beam.common.RegistryJpaIO; +import google.registry.config.CredentialModule; +import google.registry.config.RegistryConfig.ConfigModule; +import google.registry.gcs.GcsUtils; import google.registry.model.EppResource; import google.registry.model.contact.ContactResource; import google.registry.model.domain.DomainBase; @@ -33,6 +37,7 @@ import google.registry.model.host.HostResource; import google.registry.model.rde.RdeMode; import google.registry.model.registrar.Registrar; import google.registry.model.registrar.Registrar.Type; +import google.registry.persistence.PersistenceModule.TransactionIsolationLevel; import google.registry.persistence.VKey; import google.registry.rde.DepositFragment; import google.registry.rde.PendingDeposit; @@ -50,6 +55,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.function.Supplier; +import javax.inject.Inject; +import javax.inject.Singleton; import javax.persistence.Entity; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; @@ -58,6 +65,7 @@ import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.FlatMapElements; import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.Reshuffle; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -76,10 +84,15 @@ import org.joda.time.DateTime; * @see Using * Flex Templates */ +@Singleton public class RdePipeline implements Serializable { - private final RdeMarshaller marshaller; + private final transient RdePipelineOptions options; + private final ValidationMode mode; private final ImmutableSetMultimap pendings; + private final String rdeBucket; + private final byte[] stagingKeyBytes; + private final GcsUtils gcsUtils; // Registrars to be excluded from data escrow. Not including the sandbox-only OTE type so that // if sneaks into production we would get an extra signal. @@ -97,31 +110,43 @@ public class RdePipeline implements Serializable { + (clazz.equals(DomainBase.class) ? " AND tld in (:tlds)" : ""); } - RdePipeline(RdePipelineOptions options) throws IOException, ClassNotFoundException { - this.marshaller = new RdeMarshaller(ValidationMode.valueOf(options.getValidationMode())); + @Inject + RdePipeline(RdePipelineOptions options, GcsUtils gcsUtils) { + this.options = options; + this.mode = ValidationMode.valueOf(options.getValidationMode()); this.pendings = decodePendings(options.getPendings()); - } - - @VisibleForTesting - PipelineResult run(Pipeline pipeline) { - createFragments(pipeline); - return pipeline.run(); + this.rdeBucket = options.getGcsBucket(); + this.stagingKeyBytes = BaseEncoding.base64Url().decode(options.getStagingKey()); + this.gcsUtils = gcsUtils; } PipelineResult run() { - return run(Pipeline.create()); + Pipeline pipeline = Pipeline.create(options); + PCollection>> fragments = + createFragments(pipeline); + persistData(fragments); + return pipeline.run(); } - PCollection> createFragments(Pipeline pipeline) { - PCollection> fragments = - PCollectionList.of(processRegistrars(pipeline)) - .and(processNonRegistrarEntities(pipeline, DomainBase.class)) - .and(processNonRegistrarEntities(pipeline, ContactResource.class)) - .and(processNonRegistrarEntities(pipeline, HostResource.class)) - .apply(Flatten.pCollections()) - .setCoder( - KvCoder.of(PendingDepositCoder.of(), SerializableCoder.of(DepositFragment.class))); - return fragments; + PCollection>> createFragments(Pipeline pipeline) { + return PCollectionList.of(processRegistrars(pipeline)) + .and(processNonRegistrarEntities(pipeline, DomainBase.class)) + .and(processNonRegistrarEntities(pipeline, ContactResource.class)) + .and(processNonRegistrarEntities(pipeline, HostResource.class)) + .apply(Flatten.pCollections()) + .setCoder(KvCoder.of(PendingDepositCoder.of(), SerializableCoder.of(DepositFragment.class))) + .apply("Group by PendingDeposit", GroupByKey.create()); + } + + void persistData(PCollection>> input) { + input.apply( + "Write to GCS and update cursors", + RdeIO.Write.builder() + .setRdeBucket(rdeBucket) + .setGcsUtils(gcsUtils) + .setValidationMode(mode) + .setStagingKeyBytes(stagingKeyBytes) + .build()); } PCollection> processRegistrars(Pipeline pipeline) { @@ -144,7 +169,8 @@ public class RdePipeline implements Serializable { .via( (VKey key) -> { Registrar registrar = jpaTm().transact(() -> jpaTm().loadByKey(key)); - DepositFragment fragment = marshaller.marshalRegistrar(registrar); + DepositFragment fragment = + new RdeMarshaller(mode).marshalRegistrar(registrar); return pendings.values().stream() .map(pending -> KV.of(pending, fragment)) .collect(toImmutableSet()); @@ -205,7 +231,8 @@ public class RdePipeline implements Serializable { Maps.asMap(dates, input -> loadAtPointInTimeAsync(resource, input))); // Convert resource to an XML fragment for each watermark/mode pair lazily and cache // the result. - RdeFragmenter fragmenter = new RdeFragmenter(resourceAtTimes, marshaller); + RdeFragmenter fragmenter = + new RdeFragmenter(resourceAtTimes, new RdeMarshaller(mode)); List> results = new ArrayList<>(); for (String tld : tlds) { for (PendingDeposit pending : pendings.get(tld)) { @@ -228,13 +255,14 @@ public class RdePipeline implements Serializable { * the original TLD to pending deposit map. */ @SuppressWarnings("unchecked") - static ImmutableSetMultimap decodePendings(String encodedPending) - throws IOException, ClassNotFoundException { + static ImmutableSetMultimap decodePendings(String encodedPending) { try (ObjectInputStream ois = new ObjectInputStream( new ByteArrayInputStream( BaseEncoding.base64Url().omitPadding().decode(encodedPending)))) { return (ImmutableSetMultimap) ois.readObject(); + } catch (IOException | ClassNotFoundException e) { + throw new IllegalArgumentException("Unable to parse encoded pending deposit map.", e); } } @@ -242,7 +270,7 @@ public class RdePipeline implements Serializable { * Encodes the TLD to pending deposit map in an URL safe string that is sent to the pipeline * worker by the pipeline launcher as a pipeline option. */ - public static String encodePendings(ImmutableSetMultimap pendings) + static String encodePendings(ImmutableSetMultimap pendings) throws IOException { try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { ObjectOutputStream oos = new ObjectOutputStream(baos); @@ -255,6 +283,21 @@ public class RdePipeline implements Serializable { public static void main(String[] args) throws IOException, ClassNotFoundException { PipelineOptionsFactory.register(RdePipelineOptions.class); RdePipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(RdePipelineOptions.class); - new RdePipeline(options).run(); + options.setIsolationOverride(TransactionIsolationLevel.TRANSACTION_READ_COMMITTED); + DaggerRdePipeline_RdePipelineComponent.builder().options(options).build().rdePipeline().run(); + } + + @Singleton + @Component(modules = {CredentialModule.class, ConfigModule.class}) + interface RdePipelineComponent { + RdePipeline rdePipeline(); + + @Component.Builder + interface Builder { + @BindsInstance + Builder options(RdePipelineOptions options); + + RdePipelineComponent build(); + } } } diff --git a/core/src/main/java/google/registry/beam/rde/RdePipelineOptions.java b/core/src/main/java/google/registry/beam/rde/RdePipelineOptions.java index d82de1ce8..333491ff0 100644 --- a/core/src/main/java/google/registry/beam/rde/RdePipelineOptions.java +++ b/core/src/main/java/google/registry/beam/rde/RdePipelineOptions.java @@ -29,4 +29,14 @@ public interface RdePipelineOptions extends RegistryPipelineOptions { String getValidationMode(); void setValidationMode(String value); + + @Description("The GCS bucket where the encrypted RDE deposits will be uploaded to.") + String getGcsBucket(); + + void setGcsBucket(String value); + + @Description("The Base64-encoded PGP public key to encrypt the deposits.") + String getStagingKey(); + + void setStagingKey(String value); } diff --git a/core/src/main/java/google/registry/model/rde/RdeRevision.java b/core/src/main/java/google/registry/model/rde/RdeRevision.java index 58b0afdb1..cd0675647 100644 --- a/core/src/main/java/google/registry/model/rde/RdeRevision.java +++ b/core/src/main/java/google/registry/model/rde/RdeRevision.java @@ -154,7 +154,7 @@ public final class RdeRevision extends BackupGroupRoot implements NonReplicatedE } /** Class to represent the composite primary key of {@link RdeRevision} entity. */ - static class RdeRevisionId extends ImmutableObject implements Serializable { + public static class RdeRevisionId extends ImmutableObject implements Serializable { String tld; @@ -169,7 +169,7 @@ public final class RdeRevision extends BackupGroupRoot implements NonReplicatedE /** Hibernate requires this default constructor. */ private RdeRevisionId() {} - static RdeRevisionId create(String tld, LocalDate date, RdeMode mode) { + public static RdeRevisionId create(String tld, LocalDate date, RdeMode mode) { RdeRevisionId instance = new RdeRevisionId(); instance.tld = tld; instance.date = date; diff --git a/core/src/main/java/google/registry/rde/PendingDeposit.java b/core/src/main/java/google/registry/rde/PendingDeposit.java index c8bca8823..1ba7e2a61 100644 --- a/core/src/main/java/google/registry/rde/PendingDeposit.java +++ b/core/src/main/java/google/registry/rde/PendingDeposit.java @@ -83,7 +83,7 @@ public abstract class PendingDeposit implements Serializable { return new AutoValue_PendingDeposit(false, tld, watermark, mode, cursor, interval, null, null); } - static PendingDeposit createInManualOperation( + public static PendingDeposit createInManualOperation( String tld, DateTime watermark, RdeMode mode, diff --git a/core/src/main/resources/google/registry/beam/rde_pipeline_metadata.json b/core/src/main/resources/google/registry/beam/rde_pipeline_metadata.json index a88f69dd0..5911623f0 100644 --- a/core/src/main/resources/google/registry/beam/rde_pipeline_metadata.json +++ b/core/src/main/resources/google/registry/beam/rde_pipeline_metadata.json @@ -14,10 +14,26 @@ "name": "validationMode", "label": "How strict the marshaller validates the given EPP resources.", "helpText": "If set to LENIENT the marshaller will not warn about missing data on the EPP resources.", - "is_optional": true, "regexes": [ "^STRICT|LENIENT$" ] + }, + { + "name": "gcsBucket", + "label": "The GCS bucket that where the resulting files will be stored.", + "helpText": "Only the bucket name itself, without the leading \"gs://\".", + "is_optional": false, + "regexes": [ + "^[a-zA-Z0-9_\\-]+$" + ] + }, + { + "name": "stagingKey", + "label": "The PGP public key used to encrypt the RDE/BRDA deposit files.", + "helpText": "The key is Base64 URL-safe encoded.", + "regexes": [ + "A-Za-z0-9\\-_" + ] } ] } diff --git a/core/src/test/java/google/registry/beam/rde/RdePipelineTest.java b/core/src/test/java/google/registry/beam/rde/RdePipelineTest.java index 1da0ebda9..53c73999d 100644 --- a/core/src/test/java/google/registry/beam/rde/RdePipelineTest.java +++ b/core/src/test/java/google/registry/beam/rde/RdePipelineTest.java @@ -36,33 +36,54 @@ import static google.registry.testing.DatabaseHelper.persistActiveDomain; import static google.registry.testing.DatabaseHelper.persistDeletedDomain; import static google.registry.testing.DatabaseHelper.persistEppResource; import static google.registry.testing.DatabaseHelper.persistNewRegistrar; +import static google.registry.util.ResourceUtils.readResourceUtf8; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.joda.time.Duration.standardDays; +import com.google.cloud.storage.BlobId; import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSetMultimap; import com.google.common.collect.Iterables; import com.google.common.collect.Streams; +import com.google.common.io.BaseEncoding; import google.registry.beam.TestPipelineExtension; +import google.registry.gcs.GcsUtils; +import google.registry.gcs.backport.LocalStorageHelper; +import google.registry.keyring.api.PgpHelper; +import google.registry.model.common.Cursor; +import google.registry.model.common.Cursor.CursorType; import google.registry.model.host.HostResource; +import google.registry.model.rde.RdeMode; +import google.registry.model.rde.RdeRevision; +import google.registry.model.rde.RdeRevision.RdeRevisionId; import google.registry.model.registrar.Registrar; import google.registry.model.registrar.Registrar.State; +import google.registry.model.registry.Registry; +import google.registry.persistence.VKey; import google.registry.persistence.transaction.JpaTestRules; import google.registry.persistence.transaction.JpaTestRules.JpaIntegrationTestExtension; import google.registry.persistence.transaction.TransactionManagerFactory; import google.registry.rde.DepositFragment; +import google.registry.rde.Ghostryde; import google.registry.rde.PendingDeposit; import google.registry.rde.RdeResourceType; import google.registry.testing.DatastoreEntityExtension; import google.registry.testing.FakeClock; +import google.registry.testing.FakeKeyringModule; +import java.io.IOException; import java.util.function.Function; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Stream; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.bouncycastle.openpgp.PGPPrivateKey; +import org.bouncycastle.openpgp.PGPPublicKey; import org.joda.time.DateTime; import org.joda.time.Duration; import org.junit.jupiter.api.AfterEach; @@ -83,21 +104,44 @@ public class RdePipelineTest { private static final String HOST_NAME_PATTERN = "(.*)"; - private static final ImmutableSetMultimap PENDINGS = - ImmutableSetMultimap.of( - "pal", - PendingDeposit.create( - "pal", DateTime.parse("2000-01-01TZ"), FULL, RDE_STAGING, standardDays(1)), - "pal", - PendingDeposit.create( - "pal", DateTime.parse("2000-01-01TZ"), THIN, RDE_STAGING, standardDays(1)), - "fun", - PendingDeposit.create( - "fun", DateTime.parse("2000-01-01TZ"), FULL, RDE_STAGING, standardDays(1))); - // This is the default creation time for test data. private final FakeClock clock = new FakeClock(DateTime.parse("1999-12-31TZ")); + // This is teh default as-of time the RDE/BRDA job. + private final DateTime now = DateTime.parse("2000-01-01TZ"); + + private final ImmutableSetMultimap pendings = + ImmutableSetMultimap.of( + "soy", + PendingDeposit.create("soy", now, FULL, RDE_STAGING, standardDays(1)), + "soy", + PendingDeposit.create("soy", now, THIN, RDE_STAGING, standardDays(1)), + "fun", + PendingDeposit.create("fun", now, FULL, RDE_STAGING, standardDays(1))); + + private final ImmutableList brdaFragments = + ImmutableList.of( + DepositFragment.create(RdeResourceType.DOMAIN, "\n", ""), + DepositFragment.create(RdeResourceType.REGISTRAR, "\n", "")); + + private final ImmutableList rdeFragments = + ImmutableList.of( + DepositFragment.create(RdeResourceType.DOMAIN, "\n", ""), + DepositFragment.create(RdeResourceType.REGISTRAR, "\n", ""), + DepositFragment.create(RdeResourceType.CONTACT, "\n", ""), + DepositFragment.create(RdeResourceType.HOST, "\n", "")); + + private final GcsUtils gcsUtils = new GcsUtils(LocalStorageHelper.getOptions()); + + private final PGPPublicKey encryptionKey = + new FakeKeyringModule().get().getRdeStagingEncryptionKey(); + + private final PGPPrivateKey decryptionKey = + new FakeKeyringModule().get().getRdeStagingDecryptionKey(); + + private final RdePipelineOptions options = + PipelineOptionsFactory.create().as(RdePipelineOptions.class); + // The pipeline runs in a different thread, which needs to be masqueraded as a GAE thread as well. @RegisterExtension @Order(Order.DEFAULT - 1) @@ -109,10 +153,7 @@ public class RdePipelineTest { @RegisterExtension final TestPipelineExtension pipeline = - TestPipelineExtension.create().enableAbandonedNodeEnforcement(true); - - private final RdePipelineOptions options = - PipelineOptionsFactory.create().as(RdePipelineOptions.class); + TestPipelineExtension.fromOptions(options).enableAbandonedNodeEnforcement(true); private RdePipeline rdePipeline; @@ -138,16 +179,24 @@ public class RdePipelineTest { testRegistrar.asBuilder().setState(State.ACTIVE).build(), monitoringRegistrar.asBuilder().setState(State.ACTIVE).build()))); - createTld("pal"); + createTld("soy"); createTld("fun"); createTld("cat"); + tm().transact( + () -> { + tm().put(Cursor.create(CursorType.BRDA, now, Registry.get("soy"))); + tm().put(Cursor.create(CursorType.RDE_STAGING, now, Registry.get("soy"))); + RdeRevision.saveRevision("soy", now, THIN, 0); + RdeRevision.saveRevision("soy", now, FULL, 0); + }); + // Also persists a "contact1234" contact in the process. - persistActiveDomain("hello.pal"); + persistActiveDomain("hello.soy"); persistActiveDomain("kitty.fun"); // Should not appear anywhere. persistActiveDomain("lol.cat"); - persistDeletedDomain("deleted.pal", DateTime.parse("1999-12-30TZ")); + persistDeletedDomain("deleted.soy", DateTime.parse("1999-12-30TZ")); persistActiveContact("contact456"); @@ -156,11 +205,15 @@ public class RdePipelineTest { clock.advanceBy(Duration.standardDays(2)); persistEppResource(host.asBuilder().setHostName("new.host.test").build()); - options.setPendings(encodePendings(PENDINGS)); + options.setPendings(encodePendings(pendings)); // The EPP resources created in tests do not have all the fields populated, using a STRICT // validation mode will result in a lot of warnings during marshalling. options.setValidationMode("LENIENT"); - rdePipeline = new RdePipeline(options); + options.setStagingKey( + BaseEncoding.base64Url().encode(PgpHelper.convertPublicKeyToBytes(encryptionKey))); + options.setGcsBucket("gcs-bucket"); + options.setJobName("rde-job"); + rdePipeline = new RdePipeline(options, gcsUtils); } @AfterEach @@ -170,16 +223,13 @@ public class RdePipelineTest { @Test void testSuccess_encodeAndDecodePendingsMap() throws Exception { - String encodedString = encodePendings(PENDINGS); - assertThat(decodePendings(encodedString)).isEqualTo(PENDINGS); + String encodedString = encodePendings(pendings); + assertThat(decodePendings(encodedString)).isEqualTo(pendings); } @Test void testSuccess_createFragments() { - PAssert.that( - rdePipeline - .createFragments(pipeline) - .apply("Group by PendingDeposit", GroupByKey.create())) + PAssert.that(rdePipeline.createFragments(pipeline)) .satisfies( kvs -> { kvs.forEach( @@ -198,7 +248,7 @@ public class RdePipelineTest { .anyMatch( domain -> // Deleted domain should not be included - domain.equals("deleted.pal") + domain.equals("deleted.soy") // Only domains on the pending deposit's tld should // appear. || !kv.getKey() @@ -238,6 +288,116 @@ public class RdePipelineTest { pipeline.run().waitUntilFinish(); } + @Test + void testSuccess_persistData() throws Exception { + PendingDeposit brdaKey = + PendingDeposit.create("soy", now, THIN, CursorType.BRDA, Duration.standardDays(1)); + PendingDeposit rdeKey = + PendingDeposit.create("soy", now, FULL, CursorType.RDE_STAGING, Duration.standardDays(1)); + + verifyFiles(ImmutableMap.of(brdaKey, brdaFragments, rdeKey, rdeFragments), false); + + assertThat(gcsUtils.listFolderObjects("gcs-bucket", "rde-job/")) + .containsExactly( + "soy_2000-01-01_thin_S1_R1.xml.length", + "soy_2000-01-01_thin_S1_R1.xml.ghostryde", + "soy_2000-01-01_full_S1_R1.xml.length", + "soy_2000-01-01_full_S1_R1.xml.ghostryde", + "soy_2000-01-01_full_S1_R1-report.xml.ghostryde"); + + assertThat(loadCursorTime(CursorType.BRDA)) + .isEquivalentAccordingToCompareTo(now.plus(Duration.standardDays(1))); + assertThat(loadRevision(now, THIN)).isEqualTo(1); + + assertThat(loadCursorTime(CursorType.RDE_STAGING)) + .isEquivalentAccordingToCompareTo(now.plus(Duration.standardDays(1))); + assertThat(loadRevision(now, FULL)).isEqualTo(1); + } + + @Test + void testSuccess_persistData_manual() throws Exception { + PendingDeposit brdaKey = PendingDeposit.createInManualOperation("soy", now, THIN, "test/", 0); + PendingDeposit rdeKey = PendingDeposit.createInManualOperation("soy", now, FULL, "test/", 0); + + verifyFiles(ImmutableMap.of(brdaKey, brdaFragments, rdeKey, rdeFragments), true); + + assertThat(gcsUtils.listFolderObjects("gcs-bucket", "rde-job/")) + .containsExactly( + "manual/test/soy_2000-01-01_thin_S1_R0.xml.length", + "manual/test/soy_2000-01-01_thin_S1_R0.xml.ghostryde", + "manual/test/soy_2000-01-01_full_S1_R0.xml.length", + "manual/test/soy_2000-01-01_full_S1_R0.xml.ghostryde", + "manual/test/soy_2000-01-01_full_S1_R0-report.xml.ghostryde"); + + assertThat(loadCursorTime(CursorType.BRDA)).isEquivalentAccordingToCompareTo(now); + assertThat(loadRevision(now, THIN)).isEqualTo(0); + + assertThat(loadCursorTime(CursorType.RDE_STAGING)).isEquivalentAccordingToCompareTo(now); + assertThat(loadRevision(now, FULL)).isEqualTo(0); + } + + private void verifyFiles( + ImmutableMap> input, boolean manual) + throws Exception { + PCollection>> fragments = + pipeline.apply("Create Input", Create.of(input)); + rdePipeline.persistData(fragments); + pipeline.run().waitUntilFinish(); + + String prefix = manual ? "rde-job/manual/test/" : "rde-job/"; + String revision = manual ? "R0" : "R1"; + + // BRDA + String brdaOutputFile = + decryptGhostrydeGcsFile(prefix + "soy_2000-01-01_thin_S1_" + revision + ".xml.ghostryde"); + assertThat(brdaOutputFile) + .isEqualTo( + readResourceUtf8(this.getClass(), "reducer_brda.xml") + .replace("%RESEND%", manual ? "" : " resend=\"1\"")); + compareLength(brdaOutputFile, prefix + "soy_2000-01-01_thin_S1_" + revision + ".xml.length"); + + // RDE + String rdeOutputFile = + decryptGhostrydeGcsFile(prefix + "soy_2000-01-01_full_S1_" + revision + ".xml.ghostryde"); + assertThat(rdeOutputFile) + .isEqualTo( + readResourceUtf8(RdePipelineTest.class, "reducer_rde.xml") + .replace("%RESEND%", manual ? "" : " resend=\"1\"")); + compareLength(rdeOutputFile, prefix + "soy_2000-01-01_full_S1_" + revision + ".xml.length"); + assertThat( + decryptGhostrydeGcsFile( + prefix + "soy_2000-01-01_full_S1_" + revision + "-report.xml.ghostryde")) + .isEqualTo( + readResourceUtf8(RdePipelineTest.class, "reducer_rde_report.xml") + .replace("%RESEND%", manual ? "0" : "1")); + } + + private String decryptGhostrydeGcsFile(String filename) throws IOException { + return new String( + Ghostryde.decode(gcsUtils.readBytesFrom(BlobId.of("gcs-bucket", filename)), decryptionKey), + UTF_8); + } + + private void compareLength(String outputFile, String lengthFilename) throws IOException { + assertThat(String.valueOf(outputFile.getBytes(UTF_8).length)) + .isEqualTo( + new String(gcsUtils.readBytesFrom(BlobId.of("gcs-bucket", lengthFilename)), UTF_8)); + } + + private static int loadRevision(DateTime now, RdeMode mode) { + return tm().transact( + () -> + tm().loadByKey( + VKey.createSql( + RdeRevision.class, + RdeRevisionId.create("soy", now.toLocalDate(), mode))) + .getRevision()); + } + + private static DateTime loadCursorTime(CursorType type) { + return tm().transact(() -> tm().loadByKey(Cursor.createVKey(type, "soy")).getCursorTime()); + } + private static Function getXmlElement(String pattern) { return (fragment) -> { Matcher matcher = Pattern.compile(pattern).matcher(fragment.xml()); diff --git a/core/src/test/java/google/registry/rde/RdeStagingReducerTest.java b/core/src/test/java/google/registry/rde/RdeStagingReducerTest.java index f2fa21bef..5f06b891e 100644 --- a/core/src/test/java/google/registry/rde/RdeStagingReducerTest.java +++ b/core/src/test/java/google/registry/rde/RdeStagingReducerTest.java @@ -30,6 +30,7 @@ import com.google.appengine.tools.mapreduce.ReducerInput; import com.google.cloud.storage.BlobId; import com.google.cloud.storage.StorageException; import com.google.common.collect.ImmutableList; +import google.registry.beam.rde.RdePipelineTest; import google.registry.gcs.GcsUtils; import google.registry.gcs.backport.LocalStorageHelper; import google.registry.keyring.api.PgpHelper; @@ -120,7 +121,7 @@ class RdeStagingReducerTest { String outputFile = decryptGhostrydeGcsFile("soy_2000-01-01_thin_S1_R1.xml.ghostryde"); assertThat(outputFile) .isEqualTo( - readResourceUtf8(RdeStagingReducerTest.class, "reducer_brda.xml") + readResourceUtf8(RdePipelineTest.class, "reducer_brda.xml") .replace("%RESEND%", " resend=\"1\"")); compareLength(outputFile, "soy_2000-01-01_thin_S1_R1.xml.length"); // BRDA doesn't write a report file. @@ -147,8 +148,7 @@ class RdeStagingReducerTest { String outputFile = decryptGhostrydeGcsFile("manual/soy_2000-01-01_thin_S1_R0.xml.ghostryde"); assertThat(outputFile) .isEqualTo( - readResourceUtf8(RdeStagingReducerTest.class, "reducer_brda.xml") - .replace("%RESEND%", "")); + readResourceUtf8(RdePipelineTest.class, "reducer_brda.xml").replace("%RESEND%", "")); compareLength(outputFile, "manual/soy_2000-01-01_thin_S1_R0.xml.length"); // BRDA doesn't write a report file. assertThrows( @@ -169,12 +169,12 @@ class RdeStagingReducerTest { String outputFile = decryptGhostrydeGcsFile("soy_2000-01-01_full_S1_R1.xml.ghostryde"); assertThat(outputFile) .isEqualTo( - readResourceUtf8(RdeStagingReducerTest.class, "reducer_rde.xml") + readResourceUtf8(RdePipelineTest.class, "reducer_rde.xml") .replace("%RESEND%", " resend=\"1\"")); compareLength(outputFile, "soy_2000-01-01_full_S1_R1.xml.length"); assertThat(decryptGhostrydeGcsFile("soy_2000-01-01_full_S1_R1-report.xml.ghostryde")) .isEqualTo( - readResourceUtf8(RdeStagingReducerTest.class, "reducer_rde_report.xml") + readResourceUtf8(RdePipelineTest.class, "reducer_rde_report.xml") .replace("%RESEND%", "1")); assertThat(loadCursorTime(CursorType.RDE_STAGING)) .isEquivalentAccordingToCompareTo(now.plus(Duration.standardDays(1))); @@ -191,12 +191,11 @@ class RdeStagingReducerTest { String outputFile = decryptGhostrydeGcsFile("manual/soy_2000-01-01_full_S1_R0.xml.ghostryde"); assertThat(outputFile) .isEqualTo( - readResourceUtf8(RdeStagingReducerTest.class, "reducer_rde.xml") - .replace("%RESEND%", "")); + readResourceUtf8(RdePipelineTest.class, "reducer_rde.xml").replace("%RESEND%", "")); compareLength(outputFile, "manual/soy_2000-01-01_full_S1_R0.xml.length"); assertThat(decryptGhostrydeGcsFile("manual/soy_2000-01-01_full_S1_R0-report.xml.ghostryde")) .isEqualTo( - readResourceUtf8(RdeStagingReducerTest.class, "reducer_rde_report.xml") + readResourceUtf8(RdePipelineTest.class, "reducer_rde_report.xml") .replace("%RESEND%", "0")); // No extra operations in manual mode. assertThat(loadCursorTime(CursorType.RDE_STAGING)).isEquivalentAccordingToCompareTo(now); diff --git a/core/src/test/resources/google/registry/rde/reducer_brda.xml b/core/src/test/resources/google/registry/beam/rde/reducer_brda.xml similarity index 100% rename from core/src/test/resources/google/registry/rde/reducer_brda.xml rename to core/src/test/resources/google/registry/beam/rde/reducer_brda.xml diff --git a/core/src/test/resources/google/registry/rde/reducer_rde.xml b/core/src/test/resources/google/registry/beam/rde/reducer_rde.xml similarity index 100% rename from core/src/test/resources/google/registry/rde/reducer_rde.xml rename to core/src/test/resources/google/registry/beam/rde/reducer_rde.xml diff --git a/core/src/test/resources/google/registry/rde/reducer_rde_report.xml b/core/src/test/resources/google/registry/beam/rde/reducer_rde_report.xml similarity index 100% rename from core/src/test/resources/google/registry/rde/reducer_rde_report.xml rename to core/src/test/resources/google/registry/beam/rde/reducer_rde_report.xml