Write RDE files and advance cursors in Beam pipeline (#1249)

This PR re-implements most of the logic in the RdeStagingReducer, with
the exception of the last enqueue operations, due to the fact that the
task queue API is not available outside of App Engine SDK. This part
will come in a separate PR.

Another deviation from the reducer is that we forwent the lock -- it is
difficult do it across different beam transforms. Instead we write each
report to a different folder according to its unique beam job name. When
enqueueing the publish tasks we will then pass the folder prefix as a
URL parameter.

<!-- Reviewable:start -->
---
This change is [<img src="https://reviewable.io/review_button.svg" height="34" align="absmiddle" alt="Reviewable"/>](https://reviewable.io/reviews/google/nomulus/1249)
<!-- Reviewable:end -->
This commit is contained in:
Lai Jiang 2021-07-30 16:24:58 -04:00 committed by GitHub
parent 04ecc2e78f
commit 917b34701f
11 changed files with 569 additions and 68 deletions

View file

@ -0,0 +1,273 @@
// Copyright 2021 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.beam.rde;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static google.registry.model.common.Cursor.getCursorTimeOrStartOfTime;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import static google.registry.persistence.transaction.TransactionManagerUtil.transactIfJpaTm;
import static java.nio.charset.StandardCharsets.UTF_8;
import com.google.auto.value.AutoValue;
import com.google.cloud.storage.BlobId;
import com.google.common.flogger.FluentLogger;
import google.registry.gcs.GcsUtils;
import google.registry.keyring.api.PgpHelper;
import google.registry.model.common.Cursor;
import google.registry.model.rde.RdeMode;
import google.registry.model.rde.RdeNamingUtils;
import google.registry.model.rde.RdeRevision;
import google.registry.model.registry.Registry;
import google.registry.rde.DepositFragment;
import google.registry.rde.Ghostryde;
import google.registry.rde.PendingDeposit;
import google.registry.rde.RdeCounter;
import google.registry.rde.RdeMarshaller;
import google.registry.rde.RdeResourceType;
import google.registry.rde.RdeUtil;
import google.registry.tldconfig.idn.IdnTableEnum;
import google.registry.xjc.rdeheader.XjcRdeHeader;
import google.registry.xjc.rdeheader.XjcRdeHeaderElement;
import google.registry.xml.ValidationMode;
import google.registry.xml.XmlException;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.security.Security;
import java.util.Optional;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.bouncycastle.openpgp.PGPPublicKey;
import org.joda.time.DateTime;
public class RdeIO {
@AutoValue
abstract static class Write
extends PTransform<PCollection<KV<PendingDeposit, Iterable<DepositFragment>>>, PDone> {
abstract GcsUtils gcsUtils();
abstract String rdeBucket();
// It's OK to return a primitive array because we are only using it to construct the
// PGPPublicKey, which is not serializable.
@SuppressWarnings("mutable")
abstract byte[] stagingKeyBytes();
abstract ValidationMode validationMode();
static Builder builder() {
return new AutoValue_RdeIO_Write.Builder();
}
@AutoValue.Builder
abstract static class Builder {
abstract Builder setGcsUtils(GcsUtils gcsUtils);
abstract Builder setRdeBucket(String value);
abstract Builder setStagingKeyBytes(byte[] value);
abstract Builder setValidationMode(ValidationMode value);
abstract Write build();
}
@Override
public PDone expand(PCollection<KV<PendingDeposit, Iterable<DepositFragment>>> input) {
input
.apply(
"Write to GCS",
ParDo.of(new RdeWriter(gcsUtils(), rdeBucket(), stagingKeyBytes(), validationMode())))
.apply("Update cursors", ParDo.of(new CursorUpdater()));
return PDone.in(input.getPipeline());
}
}
private static class RdeWriter
extends DoFn<KV<PendingDeposit, Iterable<DepositFragment>>, KV<PendingDeposit, Integer>> {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private final GcsUtils gcsUtils;
private final String rdeBucket;
private final byte[] stagingKeyBytes;
private final RdeMarshaller marshaller;
protected RdeWriter(
GcsUtils gcsUtils,
String rdeBucket,
byte[] stagingKeyBytes,
ValidationMode validationMode) {
this.gcsUtils = gcsUtils;
this.rdeBucket = rdeBucket;
this.stagingKeyBytes = stagingKeyBytes;
this.marshaller = new RdeMarshaller(validationMode);
}
@Setup
public void setup() {
Security.addProvider(new BouncyCastleProvider());
}
@ProcessElement
public void processElement(
@Element KV<PendingDeposit, Iterable<DepositFragment>> kv,
PipelineOptions options,
OutputReceiver<KV<PendingDeposit, Integer>> outputReceiver) {
PGPPublicKey stagingKey = PgpHelper.loadPublicKeyBytes(stagingKeyBytes);
PendingDeposit key = kv.getKey();
Iterable<DepositFragment> fragments = kv.getValue();
RdeCounter counter = new RdeCounter();
// Determine some basic things about the deposit.
final RdeMode mode = key.mode();
final String tld = key.tld();
final DateTime watermark = key.watermark();
final int revision =
Optional.ofNullable(key.revision())
.orElseGet(() -> RdeRevision.getNextRevision(tld, watermark, mode));
String id = RdeUtil.timestampToId(watermark);
String prefix = options.getJobName();
String basename = RdeNamingUtils.makeRydeFilename(tld, watermark, mode, 1, revision);
if (key.manual()) {
checkState(key.directoryWithTrailingSlash() != null, "Manual subdirectory not specified");
prefix = prefix + "/manual/" + key.directoryWithTrailingSlash() + basename;
} else {
prefix = prefix + "/" + basename;
}
BlobId xmlFilename = BlobId.of(rdeBucket, prefix + ".xml.ghostryde");
// This file will contain the byte length (ASCII) of the raw unencrypted XML.
//
// This is necessary because RdeUploadAction creates a tar file which requires that the length
// be outputted. We don't want to have to decrypt the entire ghostryde file to determine the
// length, so we just save it separately.
BlobId xmlLengthFilename = BlobId.of(rdeBucket, prefix + ".xml.length");
BlobId reportFilename = BlobId.of(rdeBucket, prefix + "-report.xml.ghostryde");
// These variables will be populated as we write the deposit XML and used for other files.
boolean failed = false;
XjcRdeHeader header;
// Write a gigantic XML file to GCS. We'll start by opening encrypted out/err file handles.
logger.atInfo().log("Writing %s and %s", xmlFilename, xmlLengthFilename);
try (OutputStream gcsOutput = gcsUtils.openOutputStream(xmlFilename);
OutputStream lengthOutput = gcsUtils.openOutputStream(xmlLengthFilename);
OutputStream ghostrydeEncoder = Ghostryde.encoder(gcsOutput, stagingKey, lengthOutput);
Writer output = new OutputStreamWriter(ghostrydeEncoder, UTF_8)) {
// Output the top portion of the XML document.
output.write(marshaller.makeHeader(id, watermark, RdeResourceType.getUris(mode), revision));
// Output XML fragments while counting them.
for (DepositFragment fragment : fragments) {
if (!fragment.xml().isEmpty()) {
output.write(fragment.xml());
counter.increment(fragment.type());
}
if (!fragment.error().isEmpty()) {
failed = true;
logger.atSevere().log("Fragment error: %s", fragment.error());
}
}
// Don't write the IDN elements for BRDA.
if (mode == RdeMode.FULL) {
for (IdnTableEnum idn : IdnTableEnum.values()) {
output.write(marshaller.marshalIdn(idn.getTable()));
counter.increment(RdeResourceType.IDN);
}
}
// Output XML that says how many resources were emitted.
header = counter.makeHeader(tld, mode);
output.write(marshaller.marshalOrDie(new XjcRdeHeaderElement(header)));
// Output the bottom of the XML document.
output.write(marshaller.makeFooter());
} catch (IOException e) {
throw new RuntimeException(e);
}
// If an entity was broken, abort after writing as much logs/deposit data as possible.
verify(!failed, "RDE staging failed for TLD %s", tld);
// Write a tiny XML file to GCS containing some information about the deposit.
//
// This will be sent to ICANN once we're done uploading the big XML to the escrow provider.
if (mode == RdeMode.FULL) {
logger.atInfo().log("Writing %s", reportFilename);
try (OutputStream gcsOutput = gcsUtils.openOutputStream(reportFilename);
OutputStream ghostrydeEncoder = Ghostryde.encoder(gcsOutput, stagingKey)) {
counter.makeReport(id, watermark, header, revision).marshal(ghostrydeEncoder, UTF_8);
} catch (IOException | XmlException e) {
throw new RuntimeException(e);
}
}
// Now that we're done, output roll the cursor forward.
if (key.manual()) {
logger.atInfo().log("Manual operation; not advancing cursor or enqueuing upload task");
} else {
outputReceiver.output(KV.of(key, revision));
}
}
}
private static class CursorUpdater extends DoFn<KV<PendingDeposit, Integer>, Void> {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
@ProcessElement
public void processElement(@Element KV<PendingDeposit, Integer> input) {
tm().transact(
() -> {
PendingDeposit key = input.getKey();
int revision = input.getValue();
Registry registry = Registry.get(key.tld());
Optional<Cursor> cursor =
transactIfJpaTm(
() ->
tm().loadByKeyIfPresent(
Cursor.createVKey(key.cursor(), registry.getTldStr())));
DateTime position = getCursorTimeOrStartOfTime(cursor);
checkState(key.interval() != null, "Interval must be present");
DateTime newPosition = key.watermark().plus(key.interval());
if (!position.isBefore(newPosition)) {
logger.atWarning().log("Cursor has already been rolled forward.");
return;
}
verify(
position.equals(key.watermark()),
"Partial ordering of RDE deposits broken: %s %s",
position,
key);
tm().put(Cursor.create(key.cursor(), newPosition, registry));
logger.atInfo().log(
"Rolled forward %s on %s cursor to %s", key.cursor(), key.tld(), newPosition);
RdeRevision.saveRevision(key.tld(), key.watermark(), key.mode(), revision);
});
}
}
}

View file

@ -18,14 +18,18 @@ import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static google.registry.model.EppResourceUtils.loadAtPointInTimeAsync;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.BaseEncoding;
import dagger.BindsInstance;
import dagger.Component;
import google.registry.beam.common.RegistryJpaIO;
import google.registry.config.CredentialModule;
import google.registry.config.RegistryConfig.ConfigModule;
import google.registry.gcs.GcsUtils;
import google.registry.model.EppResource;
import google.registry.model.contact.ContactResource;
import google.registry.model.domain.DomainBase;
@ -33,6 +37,7 @@ import google.registry.model.host.HostResource;
import google.registry.model.rde.RdeMode;
import google.registry.model.registrar.Registrar;
import google.registry.model.registrar.Registrar.Type;
import google.registry.persistence.PersistenceModule.TransactionIsolationLevel;
import google.registry.persistence.VKey;
import google.registry.rde.DepositFragment;
import google.registry.rde.PendingDeposit;
@ -50,6 +55,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;
import javax.inject.Inject;
import javax.inject.Singleton;
import javax.persistence.Entity;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
@ -58,6 +65,7 @@ import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@ -76,10 +84,15 @@ import org.joda.time.DateTime;
* @see <a href="https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates">Using
* Flex Templates</a>
*/
@Singleton
public class RdePipeline implements Serializable {
private final RdeMarshaller marshaller;
private final transient RdePipelineOptions options;
private final ValidationMode mode;
private final ImmutableSetMultimap<String, PendingDeposit> pendings;
private final String rdeBucket;
private final byte[] stagingKeyBytes;
private final GcsUtils gcsUtils;
// Registrars to be excluded from data escrow. Not including the sandbox-only OTE type so that
// if sneaks into production we would get an extra signal.
@ -97,31 +110,43 @@ public class RdePipeline implements Serializable {
+ (clazz.equals(DomainBase.class) ? " AND tld in (:tlds)" : "");
}
RdePipeline(RdePipelineOptions options) throws IOException, ClassNotFoundException {
this.marshaller = new RdeMarshaller(ValidationMode.valueOf(options.getValidationMode()));
@Inject
RdePipeline(RdePipelineOptions options, GcsUtils gcsUtils) {
this.options = options;
this.mode = ValidationMode.valueOf(options.getValidationMode());
this.pendings = decodePendings(options.getPendings());
}
@VisibleForTesting
PipelineResult run(Pipeline pipeline) {
createFragments(pipeline);
return pipeline.run();
this.rdeBucket = options.getGcsBucket();
this.stagingKeyBytes = BaseEncoding.base64Url().decode(options.getStagingKey());
this.gcsUtils = gcsUtils;
}
PipelineResult run() {
return run(Pipeline.create());
Pipeline pipeline = Pipeline.create(options);
PCollection<KV<PendingDeposit, Iterable<DepositFragment>>> fragments =
createFragments(pipeline);
persistData(fragments);
return pipeline.run();
}
PCollection<KV<PendingDeposit, DepositFragment>> createFragments(Pipeline pipeline) {
PCollection<KV<PendingDeposit, DepositFragment>> fragments =
PCollectionList.of(processRegistrars(pipeline))
.and(processNonRegistrarEntities(pipeline, DomainBase.class))
.and(processNonRegistrarEntities(pipeline, ContactResource.class))
.and(processNonRegistrarEntities(pipeline, HostResource.class))
.apply(Flatten.pCollections())
.setCoder(
KvCoder.of(PendingDepositCoder.of(), SerializableCoder.of(DepositFragment.class)));
return fragments;
PCollection<KV<PendingDeposit, Iterable<DepositFragment>>> createFragments(Pipeline pipeline) {
return PCollectionList.of(processRegistrars(pipeline))
.and(processNonRegistrarEntities(pipeline, DomainBase.class))
.and(processNonRegistrarEntities(pipeline, ContactResource.class))
.and(processNonRegistrarEntities(pipeline, HostResource.class))
.apply(Flatten.pCollections())
.setCoder(KvCoder.of(PendingDepositCoder.of(), SerializableCoder.of(DepositFragment.class)))
.apply("Group by PendingDeposit", GroupByKey.create());
}
void persistData(PCollection<KV<PendingDeposit, Iterable<DepositFragment>>> input) {
input.apply(
"Write to GCS and update cursors",
RdeIO.Write.builder()
.setRdeBucket(rdeBucket)
.setGcsUtils(gcsUtils)
.setValidationMode(mode)
.setStagingKeyBytes(stagingKeyBytes)
.build());
}
PCollection<KV<PendingDeposit, DepositFragment>> processRegistrars(Pipeline pipeline) {
@ -144,7 +169,8 @@ public class RdePipeline implements Serializable {
.via(
(VKey<Registrar> key) -> {
Registrar registrar = jpaTm().transact(() -> jpaTm().loadByKey(key));
DepositFragment fragment = marshaller.marshalRegistrar(registrar);
DepositFragment fragment =
new RdeMarshaller(mode).marshalRegistrar(registrar);
return pendings.values().stream()
.map(pending -> KV.of(pending, fragment))
.collect(toImmutableSet());
@ -205,7 +231,8 @@ public class RdePipeline implements Serializable {
Maps.asMap(dates, input -> loadAtPointInTimeAsync(resource, input)));
// Convert resource to an XML fragment for each watermark/mode pair lazily and cache
// the result.
RdeFragmenter fragmenter = new RdeFragmenter(resourceAtTimes, marshaller);
RdeFragmenter fragmenter =
new RdeFragmenter(resourceAtTimes, new RdeMarshaller(mode));
List<KV<PendingDeposit, DepositFragment>> results = new ArrayList<>();
for (String tld : tlds) {
for (PendingDeposit pending : pendings.get(tld)) {
@ -228,13 +255,14 @@ public class RdePipeline implements Serializable {
* the original TLD to pending deposit map.
*/
@SuppressWarnings("unchecked")
static ImmutableSetMultimap<String, PendingDeposit> decodePendings(String encodedPending)
throws IOException, ClassNotFoundException {
static ImmutableSetMultimap<String, PendingDeposit> decodePendings(String encodedPending) {
try (ObjectInputStream ois =
new ObjectInputStream(
new ByteArrayInputStream(
BaseEncoding.base64Url().omitPadding().decode(encodedPending)))) {
return (ImmutableSetMultimap<String, PendingDeposit>) ois.readObject();
} catch (IOException | ClassNotFoundException e) {
throw new IllegalArgumentException("Unable to parse encoded pending deposit map.", e);
}
}
@ -242,7 +270,7 @@ public class RdePipeline implements Serializable {
* Encodes the TLD to pending deposit map in an URL safe string that is sent to the pipeline
* worker by the pipeline launcher as a pipeline option.
*/
public static String encodePendings(ImmutableSetMultimap<String, PendingDeposit> pendings)
static String encodePendings(ImmutableSetMultimap<String, PendingDeposit> pendings)
throws IOException {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
ObjectOutputStream oos = new ObjectOutputStream(baos);
@ -255,6 +283,21 @@ public class RdePipeline implements Serializable {
public static void main(String[] args) throws IOException, ClassNotFoundException {
PipelineOptionsFactory.register(RdePipelineOptions.class);
RdePipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(RdePipelineOptions.class);
new RdePipeline(options).run();
options.setIsolationOverride(TransactionIsolationLevel.TRANSACTION_READ_COMMITTED);
DaggerRdePipeline_RdePipelineComponent.builder().options(options).build().rdePipeline().run();
}
@Singleton
@Component(modules = {CredentialModule.class, ConfigModule.class})
interface RdePipelineComponent {
RdePipeline rdePipeline();
@Component.Builder
interface Builder {
@BindsInstance
Builder options(RdePipelineOptions options);
RdePipelineComponent build();
}
}
}

View file

@ -29,4 +29,14 @@ public interface RdePipelineOptions extends RegistryPipelineOptions {
String getValidationMode();
void setValidationMode(String value);
@Description("The GCS bucket where the encrypted RDE deposits will be uploaded to.")
String getGcsBucket();
void setGcsBucket(String value);
@Description("The Base64-encoded PGP public key to encrypt the deposits.")
String getStagingKey();
void setStagingKey(String value);
}

View file

@ -154,7 +154,7 @@ public final class RdeRevision extends BackupGroupRoot implements NonReplicatedE
}
/** Class to represent the composite primary key of {@link RdeRevision} entity. */
static class RdeRevisionId extends ImmutableObject implements Serializable {
public static class RdeRevisionId extends ImmutableObject implements Serializable {
String tld;
@ -169,7 +169,7 @@ public final class RdeRevision extends BackupGroupRoot implements NonReplicatedE
/** Hibernate requires this default constructor. */
private RdeRevisionId() {}
static RdeRevisionId create(String tld, LocalDate date, RdeMode mode) {
public static RdeRevisionId create(String tld, LocalDate date, RdeMode mode) {
RdeRevisionId instance = new RdeRevisionId();
instance.tld = tld;
instance.date = date;

View file

@ -83,7 +83,7 @@ public abstract class PendingDeposit implements Serializable {
return new AutoValue_PendingDeposit(false, tld, watermark, mode, cursor, interval, null, null);
}
static PendingDeposit createInManualOperation(
public static PendingDeposit createInManualOperation(
String tld,
DateTime watermark,
RdeMode mode,

View file

@ -14,10 +14,26 @@
"name": "validationMode",
"label": "How strict the marshaller validates the given EPP resources.",
"helpText": "If set to LENIENT the marshaller will not warn about missing data on the EPP resources.",
"is_optional": true,
"regexes": [
"^STRICT|LENIENT$"
]
},
{
"name": "gcsBucket",
"label": "The GCS bucket that where the resulting files will be stored.",
"helpText": "Only the bucket name itself, without the leading \"gs://\".",
"is_optional": false,
"regexes": [
"^[a-zA-Z0-9_\\-]+$"
]
},
{
"name": "stagingKey",
"label": "The PGP public key used to encrypt the RDE/BRDA deposit files.",
"helpText": "The key is Base64 URL-safe encoded.",
"regexes": [
"A-Za-z0-9\\-_"
]
}
]
}

View file

@ -36,33 +36,54 @@ import static google.registry.testing.DatabaseHelper.persistActiveDomain;
import static google.registry.testing.DatabaseHelper.persistDeletedDomain;
import static google.registry.testing.DatabaseHelper.persistEppResource;
import static google.registry.testing.DatabaseHelper.persistNewRegistrar;
import static google.registry.util.ResourceUtils.readResourceUtf8;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.joda.time.Duration.standardDays;
import com.google.cloud.storage.BlobId;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Streams;
import com.google.common.io.BaseEncoding;
import google.registry.beam.TestPipelineExtension;
import google.registry.gcs.GcsUtils;
import google.registry.gcs.backport.LocalStorageHelper;
import google.registry.keyring.api.PgpHelper;
import google.registry.model.common.Cursor;
import google.registry.model.common.Cursor.CursorType;
import google.registry.model.host.HostResource;
import google.registry.model.rde.RdeMode;
import google.registry.model.rde.RdeRevision;
import google.registry.model.rde.RdeRevision.RdeRevisionId;
import google.registry.model.registrar.Registrar;
import google.registry.model.registrar.Registrar.State;
import google.registry.model.registry.Registry;
import google.registry.persistence.VKey;
import google.registry.persistence.transaction.JpaTestRules;
import google.registry.persistence.transaction.JpaTestRules.JpaIntegrationTestExtension;
import google.registry.persistence.transaction.TransactionManagerFactory;
import google.registry.rde.DepositFragment;
import google.registry.rde.Ghostryde;
import google.registry.rde.PendingDeposit;
import google.registry.rde.RdeResourceType;
import google.registry.testing.DatastoreEntityExtension;
import google.registry.testing.FakeClock;
import google.registry.testing.FakeKeyringModule;
import java.io.IOException;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.bouncycastle.openpgp.PGPPrivateKey;
import org.bouncycastle.openpgp.PGPPublicKey;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.junit.jupiter.api.AfterEach;
@ -83,21 +104,44 @@ public class RdePipelineTest {
private static final String HOST_NAME_PATTERN = "<rdeHost:name>(.*)</rdeHost:name>";
private static final ImmutableSetMultimap<String, PendingDeposit> PENDINGS =
ImmutableSetMultimap.of(
"pal",
PendingDeposit.create(
"pal", DateTime.parse("2000-01-01TZ"), FULL, RDE_STAGING, standardDays(1)),
"pal",
PendingDeposit.create(
"pal", DateTime.parse("2000-01-01TZ"), THIN, RDE_STAGING, standardDays(1)),
"fun",
PendingDeposit.create(
"fun", DateTime.parse("2000-01-01TZ"), FULL, RDE_STAGING, standardDays(1)));
// This is the default creation time for test data.
private final FakeClock clock = new FakeClock(DateTime.parse("1999-12-31TZ"));
// This is teh default as-of time the RDE/BRDA job.
private final DateTime now = DateTime.parse("2000-01-01TZ");
private final ImmutableSetMultimap<String, PendingDeposit> pendings =
ImmutableSetMultimap.of(
"soy",
PendingDeposit.create("soy", now, FULL, RDE_STAGING, standardDays(1)),
"soy",
PendingDeposit.create("soy", now, THIN, RDE_STAGING, standardDays(1)),
"fun",
PendingDeposit.create("fun", now, FULL, RDE_STAGING, standardDays(1)));
private final ImmutableList<DepositFragment> brdaFragments =
ImmutableList.of(
DepositFragment.create(RdeResourceType.DOMAIN, "<rdeDomain:domain/>\n", ""),
DepositFragment.create(RdeResourceType.REGISTRAR, "<rdeRegistrar:registrar/>\n", ""));
private final ImmutableList<DepositFragment> rdeFragments =
ImmutableList.of(
DepositFragment.create(RdeResourceType.DOMAIN, "<rdeDomain:domain/>\n", ""),
DepositFragment.create(RdeResourceType.REGISTRAR, "<rdeRegistrar:registrar/>\n", ""),
DepositFragment.create(RdeResourceType.CONTACT, "<rdeContact:contact/>\n", ""),
DepositFragment.create(RdeResourceType.HOST, "<rdeHost:host/>\n", ""));
private final GcsUtils gcsUtils = new GcsUtils(LocalStorageHelper.getOptions());
private final PGPPublicKey encryptionKey =
new FakeKeyringModule().get().getRdeStagingEncryptionKey();
private final PGPPrivateKey decryptionKey =
new FakeKeyringModule().get().getRdeStagingDecryptionKey();
private final RdePipelineOptions options =
PipelineOptionsFactory.create().as(RdePipelineOptions.class);
// The pipeline runs in a different thread, which needs to be masqueraded as a GAE thread as well.
@RegisterExtension
@Order(Order.DEFAULT - 1)
@ -109,10 +153,7 @@ public class RdePipelineTest {
@RegisterExtension
final TestPipelineExtension pipeline =
TestPipelineExtension.create().enableAbandonedNodeEnforcement(true);
private final RdePipelineOptions options =
PipelineOptionsFactory.create().as(RdePipelineOptions.class);
TestPipelineExtension.fromOptions(options).enableAbandonedNodeEnforcement(true);
private RdePipeline rdePipeline;
@ -138,16 +179,24 @@ public class RdePipelineTest {
testRegistrar.asBuilder().setState(State.ACTIVE).build(),
monitoringRegistrar.asBuilder().setState(State.ACTIVE).build())));
createTld("pal");
createTld("soy");
createTld("fun");
createTld("cat");
tm().transact(
() -> {
tm().put(Cursor.create(CursorType.BRDA, now, Registry.get("soy")));
tm().put(Cursor.create(CursorType.RDE_STAGING, now, Registry.get("soy")));
RdeRevision.saveRevision("soy", now, THIN, 0);
RdeRevision.saveRevision("soy", now, FULL, 0);
});
// Also persists a "contact1234" contact in the process.
persistActiveDomain("hello.pal");
persistActiveDomain("hello.soy");
persistActiveDomain("kitty.fun");
// Should not appear anywhere.
persistActiveDomain("lol.cat");
persistDeletedDomain("deleted.pal", DateTime.parse("1999-12-30TZ"));
persistDeletedDomain("deleted.soy", DateTime.parse("1999-12-30TZ"));
persistActiveContact("contact456");
@ -156,11 +205,15 @@ public class RdePipelineTest {
clock.advanceBy(Duration.standardDays(2));
persistEppResource(host.asBuilder().setHostName("new.host.test").build());
options.setPendings(encodePendings(PENDINGS));
options.setPendings(encodePendings(pendings));
// The EPP resources created in tests do not have all the fields populated, using a STRICT
// validation mode will result in a lot of warnings during marshalling.
options.setValidationMode("LENIENT");
rdePipeline = new RdePipeline(options);
options.setStagingKey(
BaseEncoding.base64Url().encode(PgpHelper.convertPublicKeyToBytes(encryptionKey)));
options.setGcsBucket("gcs-bucket");
options.setJobName("rde-job");
rdePipeline = new RdePipeline(options, gcsUtils);
}
@AfterEach
@ -170,16 +223,13 @@ public class RdePipelineTest {
@Test
void testSuccess_encodeAndDecodePendingsMap() throws Exception {
String encodedString = encodePendings(PENDINGS);
assertThat(decodePendings(encodedString)).isEqualTo(PENDINGS);
String encodedString = encodePendings(pendings);
assertThat(decodePendings(encodedString)).isEqualTo(pendings);
}
@Test
void testSuccess_createFragments() {
PAssert.that(
rdePipeline
.createFragments(pipeline)
.apply("Group by PendingDeposit", GroupByKey.create()))
PAssert.that(rdePipeline.createFragments(pipeline))
.satisfies(
kvs -> {
kvs.forEach(
@ -198,7 +248,7 @@ public class RdePipelineTest {
.anyMatch(
domain ->
// Deleted domain should not be included
domain.equals("deleted.pal")
domain.equals("deleted.soy")
// Only domains on the pending deposit's tld should
// appear.
|| !kv.getKey()
@ -238,6 +288,116 @@ public class RdePipelineTest {
pipeline.run().waitUntilFinish();
}
@Test
void testSuccess_persistData() throws Exception {
PendingDeposit brdaKey =
PendingDeposit.create("soy", now, THIN, CursorType.BRDA, Duration.standardDays(1));
PendingDeposit rdeKey =
PendingDeposit.create("soy", now, FULL, CursorType.RDE_STAGING, Duration.standardDays(1));
verifyFiles(ImmutableMap.of(brdaKey, brdaFragments, rdeKey, rdeFragments), false);
assertThat(gcsUtils.listFolderObjects("gcs-bucket", "rde-job/"))
.containsExactly(
"soy_2000-01-01_thin_S1_R1.xml.length",
"soy_2000-01-01_thin_S1_R1.xml.ghostryde",
"soy_2000-01-01_full_S1_R1.xml.length",
"soy_2000-01-01_full_S1_R1.xml.ghostryde",
"soy_2000-01-01_full_S1_R1-report.xml.ghostryde");
assertThat(loadCursorTime(CursorType.BRDA))
.isEquivalentAccordingToCompareTo(now.plus(Duration.standardDays(1)));
assertThat(loadRevision(now, THIN)).isEqualTo(1);
assertThat(loadCursorTime(CursorType.RDE_STAGING))
.isEquivalentAccordingToCompareTo(now.plus(Duration.standardDays(1)));
assertThat(loadRevision(now, FULL)).isEqualTo(1);
}
@Test
void testSuccess_persistData_manual() throws Exception {
PendingDeposit brdaKey = PendingDeposit.createInManualOperation("soy", now, THIN, "test/", 0);
PendingDeposit rdeKey = PendingDeposit.createInManualOperation("soy", now, FULL, "test/", 0);
verifyFiles(ImmutableMap.of(brdaKey, brdaFragments, rdeKey, rdeFragments), true);
assertThat(gcsUtils.listFolderObjects("gcs-bucket", "rde-job/"))
.containsExactly(
"manual/test/soy_2000-01-01_thin_S1_R0.xml.length",
"manual/test/soy_2000-01-01_thin_S1_R0.xml.ghostryde",
"manual/test/soy_2000-01-01_full_S1_R0.xml.length",
"manual/test/soy_2000-01-01_full_S1_R0.xml.ghostryde",
"manual/test/soy_2000-01-01_full_S1_R0-report.xml.ghostryde");
assertThat(loadCursorTime(CursorType.BRDA)).isEquivalentAccordingToCompareTo(now);
assertThat(loadRevision(now, THIN)).isEqualTo(0);
assertThat(loadCursorTime(CursorType.RDE_STAGING)).isEquivalentAccordingToCompareTo(now);
assertThat(loadRevision(now, FULL)).isEqualTo(0);
}
private void verifyFiles(
ImmutableMap<PendingDeposit, Iterable<DepositFragment>> input, boolean manual)
throws Exception {
PCollection<KV<PendingDeposit, Iterable<DepositFragment>>> fragments =
pipeline.apply("Create Input", Create.of(input));
rdePipeline.persistData(fragments);
pipeline.run().waitUntilFinish();
String prefix = manual ? "rde-job/manual/test/" : "rde-job/";
String revision = manual ? "R0" : "R1";
// BRDA
String brdaOutputFile =
decryptGhostrydeGcsFile(prefix + "soy_2000-01-01_thin_S1_" + revision + ".xml.ghostryde");
assertThat(brdaOutputFile)
.isEqualTo(
readResourceUtf8(this.getClass(), "reducer_brda.xml")
.replace("%RESEND%", manual ? "" : " resend=\"1\""));
compareLength(brdaOutputFile, prefix + "soy_2000-01-01_thin_S1_" + revision + ".xml.length");
// RDE
String rdeOutputFile =
decryptGhostrydeGcsFile(prefix + "soy_2000-01-01_full_S1_" + revision + ".xml.ghostryde");
assertThat(rdeOutputFile)
.isEqualTo(
readResourceUtf8(RdePipelineTest.class, "reducer_rde.xml")
.replace("%RESEND%", manual ? "" : " resend=\"1\""));
compareLength(rdeOutputFile, prefix + "soy_2000-01-01_full_S1_" + revision + ".xml.length");
assertThat(
decryptGhostrydeGcsFile(
prefix + "soy_2000-01-01_full_S1_" + revision + "-report.xml.ghostryde"))
.isEqualTo(
readResourceUtf8(RdePipelineTest.class, "reducer_rde_report.xml")
.replace("%RESEND%", manual ? "0" : "1"));
}
private String decryptGhostrydeGcsFile(String filename) throws IOException {
return new String(
Ghostryde.decode(gcsUtils.readBytesFrom(BlobId.of("gcs-bucket", filename)), decryptionKey),
UTF_8);
}
private void compareLength(String outputFile, String lengthFilename) throws IOException {
assertThat(String.valueOf(outputFile.getBytes(UTF_8).length))
.isEqualTo(
new String(gcsUtils.readBytesFrom(BlobId.of("gcs-bucket", lengthFilename)), UTF_8));
}
private static int loadRevision(DateTime now, RdeMode mode) {
return tm().transact(
() ->
tm().loadByKey(
VKey.createSql(
RdeRevision.class,
RdeRevisionId.create("soy", now.toLocalDate(), mode)))
.getRevision());
}
private static DateTime loadCursorTime(CursorType type) {
return tm().transact(() -> tm().loadByKey(Cursor.createVKey(type, "soy")).getCursorTime());
}
private static Function<DepositFragment, String> getXmlElement(String pattern) {
return (fragment) -> {
Matcher matcher = Pattern.compile(pattern).matcher(fragment.xml());

View file

@ -30,6 +30,7 @@ import com.google.appengine.tools.mapreduce.ReducerInput;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.StorageException;
import com.google.common.collect.ImmutableList;
import google.registry.beam.rde.RdePipelineTest;
import google.registry.gcs.GcsUtils;
import google.registry.gcs.backport.LocalStorageHelper;
import google.registry.keyring.api.PgpHelper;
@ -120,7 +121,7 @@ class RdeStagingReducerTest {
String outputFile = decryptGhostrydeGcsFile("soy_2000-01-01_thin_S1_R1.xml.ghostryde");
assertThat(outputFile)
.isEqualTo(
readResourceUtf8(RdeStagingReducerTest.class, "reducer_brda.xml")
readResourceUtf8(RdePipelineTest.class, "reducer_brda.xml")
.replace("%RESEND%", " resend=\"1\""));
compareLength(outputFile, "soy_2000-01-01_thin_S1_R1.xml.length");
// BRDA doesn't write a report file.
@ -147,8 +148,7 @@ class RdeStagingReducerTest {
String outputFile = decryptGhostrydeGcsFile("manual/soy_2000-01-01_thin_S1_R0.xml.ghostryde");
assertThat(outputFile)
.isEqualTo(
readResourceUtf8(RdeStagingReducerTest.class, "reducer_brda.xml")
.replace("%RESEND%", ""));
readResourceUtf8(RdePipelineTest.class, "reducer_brda.xml").replace("%RESEND%", ""));
compareLength(outputFile, "manual/soy_2000-01-01_thin_S1_R0.xml.length");
// BRDA doesn't write a report file.
assertThrows(
@ -169,12 +169,12 @@ class RdeStagingReducerTest {
String outputFile = decryptGhostrydeGcsFile("soy_2000-01-01_full_S1_R1.xml.ghostryde");
assertThat(outputFile)
.isEqualTo(
readResourceUtf8(RdeStagingReducerTest.class, "reducer_rde.xml")
readResourceUtf8(RdePipelineTest.class, "reducer_rde.xml")
.replace("%RESEND%", " resend=\"1\""));
compareLength(outputFile, "soy_2000-01-01_full_S1_R1.xml.length");
assertThat(decryptGhostrydeGcsFile("soy_2000-01-01_full_S1_R1-report.xml.ghostryde"))
.isEqualTo(
readResourceUtf8(RdeStagingReducerTest.class, "reducer_rde_report.xml")
readResourceUtf8(RdePipelineTest.class, "reducer_rde_report.xml")
.replace("%RESEND%", "1"));
assertThat(loadCursorTime(CursorType.RDE_STAGING))
.isEquivalentAccordingToCompareTo(now.plus(Duration.standardDays(1)));
@ -191,12 +191,11 @@ class RdeStagingReducerTest {
String outputFile = decryptGhostrydeGcsFile("manual/soy_2000-01-01_full_S1_R0.xml.ghostryde");
assertThat(outputFile)
.isEqualTo(
readResourceUtf8(RdeStagingReducerTest.class, "reducer_rde.xml")
.replace("%RESEND%", ""));
readResourceUtf8(RdePipelineTest.class, "reducer_rde.xml").replace("%RESEND%", ""));
compareLength(outputFile, "manual/soy_2000-01-01_full_S1_R0.xml.length");
assertThat(decryptGhostrydeGcsFile("manual/soy_2000-01-01_full_S1_R0-report.xml.ghostryde"))
.isEqualTo(
readResourceUtf8(RdeStagingReducerTest.class, "reducer_rde_report.xml")
readResourceUtf8(RdePipelineTest.class, "reducer_rde_report.xml")
.replace("%RESEND%", "0"));
// No extra operations in manual mode.
assertThat(loadCursorTime(CursorType.RDE_STAGING)).isEquivalentAccordingToCompareTo(now);