diff --git a/core/src/main/java/google/registry/beam/invoicing/InvoicingPipeline.java b/core/src/main/java/google/registry/beam/invoicing/InvoicingPipeline.java index 0cebe5d79..dd763efe2 100644 --- a/core/src/main/java/google/registry/beam/invoicing/InvoicingPipeline.java +++ b/core/src/main/java/google/registry/beam/invoicing/InvoicingPipeline.java @@ -57,7 +57,8 @@ import org.apache.beam.sdk.values.TypeDescriptor; /** * Definition of a Dataflow Flex pipeline template, which generates a given month's invoices. * - *

To stage this template locally, run the {@code stage_beam_pipeline.sh} shell script. + *

To stage this template locally, run {@code ./nom_build :core:sBP --environment=alpha + * --pipeline=invoicing}. * *

Then, you can run the staged template via the API client library, gCloud or a raw REST call. * diff --git a/core/src/main/java/google/registry/beam/rde/RdeIO.java b/core/src/main/java/google/registry/beam/rde/RdeIO.java index d340d86c0..03cbb5d24 100644 --- a/core/src/main/java/google/registry/beam/rde/RdeIO.java +++ b/core/src/main/java/google/registry/beam/rde/RdeIO.java @@ -75,6 +75,8 @@ public class RdeIO { abstract static class Write extends PTransform>>, PDone> { + private static final long serialVersionUID = 3334807737227087760L; + abstract GcsUtils gcsUtils(); abstract CloudTasksUtils cloudTasksUtils(); @@ -113,8 +115,9 @@ public class RdeIO { .apply( "Write to GCS", ParDo.of(new RdeWriter(gcsUtils(), rdeBucket(), stagingKeyBytes(), validationMode()))) - .apply("Update cursors", ParDo.of(new CursorUpdater())) - .apply("Enqueue upload action", ParDo.of(new UploadEnqueuer(cloudTasksUtils()))); + .apply( + "Update cursor and enqueue next action", + ParDo.of(new CursorUpdater(cloudTasksUtils()))); return PDone.in(input.getPipeline()); } } @@ -123,6 +126,7 @@ public class RdeIO { extends DoFn>, KV> { private static final FluentLogger logger = FluentLogger.forEnclosingClass(); + private static final long serialVersionUID = 5496375923068400382L; private final GcsUtils gcsUtils; private final String rdeBucket; @@ -169,7 +173,7 @@ public class RdeIO { checkState(key.directoryWithTrailingSlash() != null, "Manual subdirectory not specified"); prefix = prefix + "/manual/" + key.directoryWithTrailingSlash() + basename; } else { - prefix = prefix + "/" + basename; + prefix = prefix + '/' + basename; } BlobId xmlFilename = BlobId.of(rdeBucket, prefix + ".xml.ghostryde"); // This file will contain the byte length (ASCII) of the raw unencrypted XML. @@ -250,12 +254,20 @@ public class RdeIO { } } - private static class CursorUpdater extends DoFn, PendingDeposit> { + private static class CursorUpdater extends DoFn, Void> { + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); + private static final long serialVersionUID = 5822176227753327224L; + + private final CloudTasksUtils cloudTasksUtils; + + private CursorUpdater(CloudTasksUtils cloudTasksUtils) { + this.cloudTasksUtils = cloudTasksUtils; + } @ProcessElement public void processElement( - @Element KV input, OutputReceiver outputReceiver) { + @Element KV input, PipelineOptions options) { tm().transact( () -> { PendingDeposit key = input.getKey(); @@ -282,46 +294,32 @@ public class RdeIO { logger.atInfo().log( "Rolled forward %s on %s cursor to %s.", key.cursor(), key.tld(), newPosition); RdeRevision.saveRevision(key.tld(), key.watermark(), key.mode(), revision); + if (key.mode() == RdeMode.FULL) { + cloudTasksUtils.enqueue( + RDE_UPLOAD_QUEUE, + CloudTasksUtils.createPostTask( + RdeUploadAction.PATH, + Service.BACKEND.getServiceId(), + ImmutableMultimap.of( + RequestParameters.PARAM_TLD, + key.tld(), + RdeModule.PARAM_PREFIX, + options.getJobName() + '/'))); + } else { + cloudTasksUtils.enqueue( + BRDA_QUEUE, + CloudTasksUtils.createPostTask( + BrdaCopyAction.PATH, + Service.BACKEND.getServiceId(), + ImmutableMultimap.of( + RequestParameters.PARAM_TLD, + key.tld(), + RdeModule.PARAM_WATERMARK, + key.watermark().toString(), + RdeModule.PARAM_PREFIX, + options.getJobName() + '/'))); + } }); - outputReceiver.output(input.getKey()); - } - } - - private static class UploadEnqueuer extends DoFn { - - private final CloudTasksUtils cloudTasksUtils; - - private UploadEnqueuer(CloudTasksUtils cloudTasksUtils) { - this.cloudTasksUtils = cloudTasksUtils; - } - - @ProcessElement - public void processElement(@Element PendingDeposit input, PipelineOptions options) { - if (input.mode() == RdeMode.FULL) { - cloudTasksUtils.enqueue( - RDE_UPLOAD_QUEUE, - CloudTasksUtils.createPostTask( - RdeUploadAction.PATH, - Service.BACKEND.getServiceId(), - ImmutableMultimap.of( - RequestParameters.PARAM_TLD, - input.tld(), - RdeModule.PARAM_PREFIX, - options.getJobName() + '/'))); - } else { - cloudTasksUtils.enqueue( - BRDA_QUEUE, - CloudTasksUtils.createPostTask( - BrdaCopyAction.PATH, - Service.BACKEND.getServiceId(), - ImmutableMultimap.of( - RequestParameters.PARAM_TLD, - input.tld(), - RdeModule.PARAM_WATERMARK, - input.watermark().toString(), - RdeModule.PARAM_PREFIX, - options.getJobName() + '/'))); - } } } } diff --git a/core/src/main/java/google/registry/beam/rde/RdePipeline.java b/core/src/main/java/google/registry/beam/rde/RdePipeline.java index f0b337e9e..ced195ea5 100644 --- a/core/src/main/java/google/registry/beam/rde/RdePipeline.java +++ b/core/src/main/java/google/registry/beam/rde/RdePipeline.java @@ -14,36 +14,50 @@ package google.registry.beam.rde; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableSet.toImmutableSet; -import static google.registry.model.EppResourceUtils.loadAtPointInTimeAsync; +import static google.registry.beam.rde.RdePipeline.TupleTags.DOMAIN_FRAGMENTS; +import static google.registry.beam.rde.RdePipeline.TupleTags.EXTERNAL_HOST_FRAGMENTS; +import static google.registry.beam.rde.RdePipeline.TupleTags.HOST_TO_PENDING_DEPOSIT; +import static google.registry.beam.rde.RdePipeline.TupleTags.PENDING_DEPOSIT; +import static google.registry.beam.rde.RdePipeline.TupleTags.REFERENCED_CONTACTS; +import static google.registry.beam.rde.RdePipeline.TupleTags.REFERENCED_HOSTS; +import static google.registry.beam.rde.RdePipeline.TupleTags.REVISION_ID; +import static google.registry.beam.rde.RdePipeline.TupleTags.SUPERORDINATE_DOMAINS; +import static google.registry.model.reporting.HistoryEntryDao.RESOURCE_TYPES_TO_HISTORY_TYPES; import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; +import static org.apache.beam.sdk.values.TypeDescriptors.kvs; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.ImmutableSetMultimap; -import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import com.google.common.collect.Streams; import com.google.common.io.BaseEncoding; import dagger.BindsInstance; import dagger.Component; import google.registry.beam.common.RegistryJpaIO; +import google.registry.beam.common.RegistryPipelineOptions; import google.registry.config.CloudTasksUtilsModule; import google.registry.config.CredentialModule; import google.registry.config.RegistryConfig.ConfigModule; import google.registry.gcs.GcsUtils; import google.registry.model.EppResource; +import google.registry.model.contact.ContactHistory; import google.registry.model.contact.ContactResource; import google.registry.model.domain.DomainBase; +import google.registry.model.domain.DomainHistory; +import google.registry.model.host.HostHistory; import google.registry.model.host.HostResource; import google.registry.model.rde.RdeMode; import google.registry.model.registrar.Registrar; import google.registry.model.registrar.Registrar.Type; +import google.registry.model.reporting.HistoryEntry; +import google.registry.model.reporting.HistoryEntryDao; import google.registry.persistence.PersistenceModule.TransactionIsolationLevel; import google.registry.persistence.VKey; import google.registry.rde.DepositFragment; import google.registry.rde.PendingDeposit; import google.registry.rde.PendingDeposit.PendingDepositCoder; -import google.registry.rde.RdeFragmenter; import google.registry.rde.RdeMarshaller; import google.registry.util.CloudTasksUtils; import google.registry.util.UtilsModule; @@ -54,74 +68,158 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import java.util.function.Supplier; +import java.lang.reflect.InvocationTargetException; +import java.util.HashSet; import javax.inject.Inject; import javax.inject.Singleton; -import javax.persistence.Entity; +import javax.persistence.IdClass; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Filter; import org.apache.beam.sdk.transforms.FlatMapElements; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.transforms.join.CoGroupByKey; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.TypeDescriptor; -import org.apache.beam.sdk.values.TypeDescriptors; import org.joda.time.DateTime; /** * Definition of a Dataflow Flex template, which generates RDE/BRDA deposits. * - *

To stage this template locally, run the {@code stage_beam_pipeline.sh} shell script. + *

To stage this template locally, run {@code ./nom_build :core:sBP --environment=alpha + * --pipeline=rde}. * *

Then, you can run the staged template via the API client library, gCloud or a raw REST call. * + *

This pipeline only works for pending deposits with the same watermark, the {@link + * google.registry.rde.RdeStagingAction} will batch such pending deposits together and launch + * multiple pipelines if multiple watermarks exist. + * + *

The pipeline is broadly divided into two parts -- creating the {@link DepositFragment}s, and + * processing them. + * + *

Creating {@link DepositFragment}

+ * + *

{@link Registrar}

+ * + * Non-test registrar entities are loaded from Cloud SQL and marshalled into deposit fragments. They + * are NOT rewound to the watermark. + * + *

{@link EppResource}

+ * + * All EPP resources are loaded from the corresponding {@link HistoryEntry}, which has the resource + * embedded. In general we find most recent history entry before watermark and filter out the ones + * that are soft-deleted by watermark. The history is emitted as pairs of (resource repo ID: history + * revision ID) from the SQL query. + * + *

{@link DomainBase}

+ * + * After the most recent (live) domain resources are loaded from the corresponding history objects, + * we marshall them to deposit fragments and emit the (pending deposit: deposit fragment) pairs for + * further processing. We also find all the contacts and hosts referenced by a given domain and emit + * pairs of (contact/host repo ID: pending deposit) for all RDE pending deposits for further + * processing. + * + *

{@link ContactResource}

+ * + * We first join most recent contact histories, represented by (contact repo ID: contact history + * revision ID) pairs, with referenced contacts, represented by (contact repo ID: pending deposit) + * pairs, on the contact repo ID, to remove unreferenced contact histories. Contact resources are + * then loaded from the remaining referenced contact histories, and marshalled into (pending + * deposit: deposit fragment) pairs. + * + *

{@link HostResource}

+ * + * Similar to {@link ContactResource}, we join the most recent host history with referenced hosts to + * find most recent referenced hosts. For external hosts we do the same treatment as we did on + * contacts and obtain the (pending deposit: deposit fragment) pairs. For subordinate hosts, we need + * to find the superordinate domain in order to properly handle pending transfer in the deposit as + * well. So we first find the superordinate domain repo ID from the host and join the (superordinate + * domain repo ID: (subordinate host repo ID: (pending deposit: revision ID))) pair with the (domain + * repo ID: revision ID) pair obtained from the domain history query in order to map the host at + * watermark to the domain at watermark. We then proceed to create the (pending deposit: deposit + * fragment) pair for subordinate hosts using the added domain information. + * + *

Processing {@link DepositFragment}

+ * + * The (pending deposit: deposit fragment) pairs from different resources are combined and grouped + * by pending deposit. For each pending deposit, all the relevant deposit fragments are written into + * a encrypted file stored on GCS. The filename is uniquely determined by the Beam job ID so there + * is no need to lock the GCS write operation to prevent stomping. The cursor for staging the + * pending deposit is then rolled forward, and the next action is enqueued. The latter two + * operations are performed in a transaction so the cursor is rolled back if enqueueing failed. + * * @see Using * Flex Templates */ @Singleton public class RdePipeline implements Serializable { + private static final long serialVersionUID = -4866795928854754666L; private final transient RdePipelineOptions options; private final ValidationMode mode; - private final ImmutableSetMultimap pendings; + private final ImmutableSet pendingDeposits; + private final DateTime watermark; private final String rdeBucket; private final byte[] stagingKeyBytes; private final GcsUtils gcsUtils; private final CloudTasksUtils cloudTasksUtils; + private final RdeMarshaller marshaller; // Registrars to be excluded from data escrow. Not including the sandbox-only OTE type so that // if sneaks into production we would get an extra signal. private static final ImmutableSet IGNORED_REGISTRAR_TYPES = Sets.immutableEnumSet(Registrar.Type.MONITORING, Registrar.Type.TEST); - private static final String EPP_RESOURCE_QUERY = - "SELECT id FROM %entity% " - + "WHERE COALESCE(creationClientId, '') NOT LIKE 'prober-%' " - + "AND COALESCE(currentSponsorClientId, '') NOT LIKE 'prober-%' " - + "AND COALESCE(lastEppUpdateClientId, '') NOT LIKE 'prober-%'"; - - public static String createEppResourceQuery(Class clazz) { - return EPP_RESOURCE_QUERY.replace("%entity%", clazz.getAnnotation(Entity.class).name()) - + (clazz.equals(DomainBase.class) ? " AND tld in (:tlds)" : ""); - } + // The field name of the EPP resource embedded in its corresponding history entry. + private static final ImmutableMap, String> EPP_RESOURCE_FIELD_NAME = + ImmutableMap.of( + DomainHistory.class, + "domainContent", + ContactHistory.class, + "contactBase", + HostHistory.class, + "hostBase"); @Inject RdePipeline(RdePipelineOptions options, GcsUtils gcsUtils, CloudTasksUtils cloudTasksUtils) { this.options = options; this.mode = ValidationMode.valueOf(options.getValidationMode()); - this.pendings = decodePendings(options.getPendings()); + this.pendingDeposits = decodePendingDeposits(options.getPendings()); + ImmutableSet potentialWatermarks = + pendingDeposits.stream() + .map(PendingDeposit::watermark) + .distinct() + .collect(toImmutableSet()); + checkArgument( + potentialWatermarks.size() == 1, + String.format( + "RDE pipeline should only work on pending deposits " + + "with the same watermark, but %d were given: %s", + potentialWatermarks.size(), potentialWatermarks)); + this.watermark = potentialWatermarks.asList().get(0); this.rdeBucket = options.getRdeStagingBucket(); this.stagingKeyBytes = BaseEncoding.base64Url().decode(options.getStagingKey()); this.gcsUtils = gcsUtils; this.cloudTasksUtils = cloudTasksUtils; + this.marshaller = new RdeMarshaller(mode); } PipelineResult run() { @@ -133,13 +231,46 @@ public class RdePipeline implements Serializable { } PCollection>> createFragments(Pipeline pipeline) { - return PCollectionList.of(processRegistrars(pipeline)) - .and(processNonRegistrarEntities(pipeline, DomainBase.class)) - .and(processNonRegistrarEntities(pipeline, ContactResource.class)) - .and(processNonRegistrarEntities(pipeline, HostResource.class)) - .apply(Flatten.pCollections()) + PCollection> registrarFragments = + processRegistrars(pipeline); + + PCollection> domainHistories = + getMostRecentHistoryEntries(pipeline, DomainHistory.class); + + PCollection> contactHistories = + getMostRecentHistoryEntries(pipeline, ContactHistory.class); + + PCollection> hostHistories = + getMostRecentHistoryEntries(pipeline, HostHistory.class); + + PCollectionTuple processedDomainHistories = processDomainHistories(domainHistories); + + PCollection> domainFragments = + processedDomainHistories.get(DOMAIN_FRAGMENTS); + + PCollection> contactFragments = + processContactHistories( + processedDomainHistories.get(REFERENCED_CONTACTS), contactHistories); + + PCollectionTuple processedHosts = + processHostHistories(processedDomainHistories.get(REFERENCED_HOSTS), hostHistories); + + PCollection> externalHostFragments = + processedHosts.get(EXTERNAL_HOST_FRAGMENTS); + + PCollection> subordinateHostFragments = + processSubordinateHosts(processedHosts.get(SUPERORDINATE_DOMAINS), domainHistories); + + return PCollectionList.of(registrarFragments) + .and(domainFragments) + .and(contactFragments) + .and(externalHostFragments) + .and(subordinateHostFragments) + .apply( + "Combine PendingDeposit:DepositFragment pairs from all entities", + Flatten.pCollections()) .setCoder(KvCoder.of(PendingDepositCoder.of(), SerializableCoder.of(DepositFragment.class))) - .apply("Group by PendingDeposit", GroupByKey.create()); + .apply("Group DepositFragment by PendingDeposit", GroupByKey.create()); } void persistData(PCollection>> input) { @@ -154,127 +285,398 @@ public class RdePipeline implements Serializable { .build()); } - PCollection> processRegistrars(Pipeline pipeline) { + private PCollection> processRegistrars(Pipeline pipeline) { + // Note that the namespace in the metric is not being used by Stackdriver, it just has to be + // non-empty. + // See: + // https://stackoverflow.com/questions/48530496/google-dataflow-custom-metrics-not-showing-on-stackdriver + Counter includedRegistrarCounter = Metrics.counter("RDE", "IncludedRegistrar"); + Counter registrarFragmentCounter = Metrics.counter("RDE", "RegistrarFragment"); return pipeline .apply( - "Read all production Registrar entities", + "Read all production Registrars", RegistryJpaIO.read( "SELECT clientIdentifier FROM Registrar WHERE type NOT IN (:types)", ImmutableMap.of("types", IGNORED_REGISTRAR_TYPES), String.class, - // TODO: consider adding coders for entities and pass them directly instead of using - // VKeys. id -> VKey.createSql(Registrar.class, id))) .apply( - "Marshal Registrar into DepositFragment", + "Marshall Registrar into DepositFragment", FlatMapElements.into( - TypeDescriptors.kvs( + kvs( TypeDescriptor.of(PendingDeposit.class), TypeDescriptor.of(DepositFragment.class))) .via( (VKey key) -> { + includedRegistrarCounter.inc(); Registrar registrar = jpaTm().transact(() -> jpaTm().loadByKey(key)); - DepositFragment fragment = - new RdeMarshaller(mode).marshalRegistrar(registrar); - return pendings.values().stream() - .map(pending -> KV.of(pending, fragment)) - .collect(toImmutableSet()); + DepositFragment fragment = marshaller.marshalRegistrar(registrar); + ImmutableSet> fragments = + pendingDeposits.stream() + .map(pending -> KV.of(pending, fragment)) + .collect(toImmutableSet()); + registrarFragmentCounter.inc(fragments.size()); + return fragments; })); } - - PCollection> processNonRegistrarEntities( - Pipeline pipeline, Class clazz) { - return createInputs(pipeline, clazz) - .apply("Marshal " + clazz.getSimpleName() + " into DepositFragment", mapToFragments(clazz)) - .setCoder( - KvCoder.of(PendingDepositCoder.of(), SerializableCoder.of(DepositFragment.class))); + /** + * Load the most recent history entry before the watermark for a given history entry type. + * + *

Note that deleted and non-production resources are not included. + * + * @return A KV pair of (repoId, revisionId), used to reconstruct the composite key for the + * history entry. + */ + private PCollection> getMostRecentHistoryEntries( + Pipeline pipeline, Class historyClass) { + String repoIdFieldName = HistoryEntryDao.REPO_ID_FIELD_NAMES.get(historyClass); + String resourceFieldName = EPP_RESOURCE_FIELD_NAME.get(historyClass); + return pipeline + .apply( + String.format("Load most recent %s", historyClass.getSimpleName()), + RegistryJpaIO.read( + ("SELECT %repoIdField%, id FROM %entity% WHERE (%repoIdField%, modificationTime)" + + " IN (SELECT %repoIdField%, MAX(modificationTime) FROM %entity% WHERE" + + " modificationTime <= :watermark GROUP BY %repoIdField%) AND" + + " %resourceField%.deletionTime > :watermark AND" + + " COALESCE(%resourceField%.creationClientId, '') NOT LIKE 'prober-%' AND" + + " COALESCE(%resourceField%.currentSponsorClientId, '') NOT LIKE 'prober-%'" + + " AND COALESCE(%resourceField%.lastEppUpdateClientId, '') NOT LIKE" + + " 'prober-%' " + + (historyClass == DomainHistory.class + ? "AND %resourceField%.tld IN " + + "(SELECT id FROM Tld WHERE tldType = 'REAL')" + : "")) + .replace("%entity%", historyClass.getSimpleName()) + .replace("%repoIdField%", repoIdFieldName) + .replace("%resourceField%", resourceFieldName), + ImmutableMap.of("watermark", watermark), + Object[].class, + row -> KV.of((String) row[0], (long) row[1]))) + .setCoder(KvCoder.of(StringUtf8Coder.of(), VarLongCoder.of())); } - PCollection> createInputs(Pipeline pipeline, Class clazz) { - return pipeline.apply( - "Read all production " + clazz.getSimpleName() + " entities", - RegistryJpaIO.read( - createEppResourceQuery(clazz), - clazz.equals(DomainBase.class) - ? ImmutableMap.of("tlds", pendings.keySet()) - : ImmutableMap.of(), - String.class, - // TODO: consider adding coders for entities and pass them directly instead of using - // VKeys. - x -> VKey.createSql(clazz, x))); + private EppResource loadResourceByHistoryEntryId( + Class historyEntryClazz, String repoId, long revisionId) { + try { + Class idClazz = historyEntryClazz.getAnnotation(IdClass.class).value(); + Serializable idObject = + (Serializable) + idClazz.getConstructor(String.class, long.class).newInstance(repoId, revisionId); + return jpaTm() + .transact(() -> jpaTm().loadByKey(VKey.createSql(historyEntryClazz, idObject))) + .getResourceAtPointInTime() + .map(resource -> resource.cloneProjectedAtTime(watermark)) + .get(); + } catch (NoSuchMethodException + | InvocationTargetException + | InstantiationException + | IllegalAccessException e) { + throw new RuntimeException( + String.format( + "Cannot load resource from %s with repoId %s and revisionId %s", + historyEntryClazz.getSimpleName(), repoId, revisionId), + e); + } } - - FlatMapElements, KV> mapToFragments(Class clazz) { - return FlatMapElements.into( - TypeDescriptors.kvs( - TypeDescriptor.of(PendingDeposit.class), TypeDescriptor.of(DepositFragment.class))) - .via( - (VKey key) -> { - T resource = jpaTm().transact(() -> jpaTm().loadByKey(key)); - // The set of all TLDs to which this resource should be emitted. - ImmutableSet tlds = - clazz.equals(DomainBase.class) - ? ImmutableSet.of(((DomainBase) resource).getTld()) - : pendings.keySet(); - // Get the set of all point-in-time watermarks we need, to minimize rewinding. - ImmutableSet dates = - tlds.stream() - .map(pendings::get) - .flatMap(ImmutableSet::stream) - .map(PendingDeposit::watermark) - .collect(toImmutableSet()); - // Launch asynchronous fetches of point-in-time representations of resource. - ImmutableMap> resourceAtTimes = - ImmutableMap.copyOf( - Maps.asMap(dates, input -> loadAtPointInTimeAsync(resource, input))); - // Convert resource to an XML fragment for each watermark/mode pair lazily and cache - // the result. - RdeFragmenter fragmenter = - new RdeFragmenter(resourceAtTimes, new RdeMarshaller(mode)); - List> results = new ArrayList<>(); - for (String tld : tlds) { - for (PendingDeposit pending : pendings.get(tld)) { - // Hosts and contacts don't get included in BRDA deposits. - if (pending.mode() == RdeMode.THIN && !clazz.equals(DomainBase.class)) { - continue; + /** + * Remove unreferenced resources by joining the (repoId, pendingDeposit) pair with the (repoId, + * revisionId) on the repoId. + * + *

The (repoId, pendingDeposit) pairs denote resources (contact, host) that are referenced from + * a domain, that are to be included in the corresponding pending deposit. + * + *

The (repoId, revisionId) paris come from the most recent history entry query, which can be + * used to load the embedded resources themselves. + * + * @return a pair of (repoId, ([pendingDeposit], [revisionId])) where neither the pendingDeposit + * nor the revisionId list is empty. + */ + private static PCollection> removeUnreferencedResource( + PCollection> referencedResources, + PCollection> historyEntries, + Class resourceClazz) { + String resourceName = resourceClazz.getSimpleName(); + Class historyEntryClazz = + RESOURCE_TYPES_TO_HISTORY_TYPES.get(resourceClazz); + String historyEntryName = historyEntryClazz.getSimpleName(); + Counter referencedResourceCounter = Metrics.counter("RDE", "Referenced" + resourceName); + return KeyedPCollectionTuple.of(PENDING_DEPOSIT, referencedResources) + .and(REVISION_ID, historyEntries) + .apply( + String.format( + "Join PendingDeposit with %s revision ID on %s", historyEntryName, resourceName), + CoGroupByKey.create()) + .apply( + String.format("Remove unreferenced %s", resourceName), + Filter.by( + (KV kv) -> { + boolean toInclude = + // If a resource does not have corresponding pending deposit, it is not + // referenced and should not be included. + kv.getValue().getAll(PENDING_DEPOSIT).iterator().hasNext() + // If a resource does not have revision id (this should not happen, as + // every referenced resource must be valid at watermark time, therefore + // be embedded in a history entry valid at watermark time, otherwise + // the domain cannot reference it), there is no way for us to find the + // history entry and load the embedded resource. So we ignore the resource + // to keep the downstream process simple. + && kv.getValue().getAll(REVISION_ID).iterator().hasNext(); + if (toInclude) { + referencedResourceCounter.inc(); } - Optional fragment = - fragmenter.marshal(pending.watermark(), pending.mode()); - fragment.ifPresent( - depositFragment -> results.add(KV.of(pending, depositFragment))); - } - } - return results; - }); + return toInclude; + })); + } + + private PCollectionTuple processDomainHistories(PCollection> domainHistories) { + Counter activeDomainCounter = Metrics.counter("RDE", "ActiveDomainBase"); + Counter domainFragmentCounter = Metrics.counter("RDE", "DomainFragment"); + Counter referencedContactCounter = Metrics.counter("RDE", "ReferencedContactResource"); + Counter referencedHostCounter = Metrics.counter("RDE", "ReferencedHostResource"); + return domainHistories.apply( + "Map DomainHistory to DepositFragment " + + "and emit referenced ContactResource and HostResource", + ParDo.of( + new DoFn, KV>() { + @ProcessElement + public void processElement( + @Element KV kv, MultiOutputReceiver receiver) { + activeDomainCounter.inc(); + DomainBase domain = + (DomainBase) + loadResourceByHistoryEntryId( + DomainHistory.class, kv.getKey(), kv.getValue()); + pendingDeposits.stream() + .filter(pendingDeposit -> pendingDeposit.tld().equals(domain.getTld())) + .forEach( + pendingDeposit -> { + // Domains are always deposited in both modes. + domainFragmentCounter.inc(); + receiver + .get(DOMAIN_FRAGMENTS) + .output( + KV.of( + pendingDeposit, + marshaller.marshalDomain(domain, pendingDeposit.mode()))); + // Contacts and hosts are only deposited in RDE, not BRDA. + if (pendingDeposit.mode() == RdeMode.FULL) { + HashSet contacts = new HashSet<>(); + contacts.add(domain.getAdminContact().getSqlKey()); + contacts.add(domain.getTechContact().getSqlKey()); + contacts.add(domain.getRegistrant().getSqlKey()); + // Billing contact is not mandatory. + if (domain.getBillingContact() != null) { + contacts.add(domain.getBillingContact().getSqlKey()); + } + referencedContactCounter.inc(contacts.size()); + contacts.forEach( + contactRepoId -> + receiver + .get(REFERENCED_CONTACTS) + .output(KV.of((String) contactRepoId, pendingDeposit))); + if (domain.getNsHosts() != null) { + referencedHostCounter.inc(domain.getNsHosts().size()); + domain + .getNsHosts() + .forEach( + hostKey -> + receiver + .get(REFERENCED_HOSTS) + .output( + KV.of( + (String) hostKey.getSqlKey(), + pendingDeposit))); + } + } + }); + } + }) + .withOutputTags( + DOMAIN_FRAGMENTS, TupleTagList.of(REFERENCED_CONTACTS).and(REFERENCED_HOSTS))); + } + + private PCollection> processContactHistories( + PCollection> referencedContacts, + PCollection> contactHistories) { + Counter contactFragmentCounter = Metrics.counter("RDE", "ContactFragment"); + return removeUnreferencedResource(referencedContacts, contactHistories, ContactResource.class) + .apply( + "Map ContactResource to DepositFragment", + FlatMapElements.into( + kvs( + TypeDescriptor.of(PendingDeposit.class), + TypeDescriptor.of(DepositFragment.class))) + .via( + (KV kv) -> { + ContactResource contact = + (ContactResource) + loadResourceByHistoryEntryId( + ContactHistory.class, + kv.getKey(), + kv.getValue().getOnly(REVISION_ID)); + DepositFragment fragment = marshaller.marshalContact(contact); + ImmutableSet> fragments = + Streams.stream(kv.getValue().getAll(PENDING_DEPOSIT)) + // The same contact could be used by multiple domains, therefore + // matched to the same pending deposit multiple times. + .distinct() + .map(pendingDeposit -> KV.of(pendingDeposit, fragment)) + .collect(toImmutableSet()); + contactFragmentCounter.inc(fragments.size()); + return fragments; + })); + } + + private PCollectionTuple processHostHistories( + PCollection> referencedHosts, + PCollection> hostHistories) { + Counter subordinateHostCounter = Metrics.counter("RDE", "SubordinateHostResource"); + Counter externalHostCounter = Metrics.counter("RDE", "ExternalHostResource"); + Counter externalHostFragmentCounter = Metrics.counter("RDE", "ExternalHostFragment"); + return removeUnreferencedResource(referencedHosts, hostHistories, HostResource.class) + .apply( + "Map external DomainResource to DepositFragment and process subordinate domains", + ParDo.of( + new DoFn, KV>() { + @ProcessElement + public void processElement( + @Element KV kv, MultiOutputReceiver receiver) { + HostResource host = + (HostResource) + loadResourceByHistoryEntryId( + HostHistory.class, + kv.getKey(), + kv.getValue().getOnly(REVISION_ID)); + // When a host is subordinate, we need to find it's superordinate domain and + // include it in the deposit as well. + if (host.isSubordinate()) { + subordinateHostCounter.inc(); + receiver + .get(SUPERORDINATE_DOMAINS) + .output( + // The output are pairs of + // (superordinateDomainRepoId, + // (subordinateHostRepoId, (pendingDeposit, revisionId))). + KV.of((String) host.getSuperordinateDomain().getSqlKey(), kv)); + } else { + externalHostCounter.inc(); + DepositFragment fragment = marshaller.marshalExternalHost(host); + Streams.stream(kv.getValue().getAll(PENDING_DEPOSIT)) + // The same host could be used by multiple domains, therefore + // matched to the same pending deposit multiple times. + .distinct() + .forEach( + pendingDeposit -> { + externalHostFragmentCounter.inc(); + receiver + .get(EXTERNAL_HOST_FRAGMENTS) + .output(KV.of(pendingDeposit, fragment)); + }); + } + } + }) + .withOutputTags(EXTERNAL_HOST_FRAGMENTS, TupleTagList.of(SUPERORDINATE_DOMAINS))); + } + + /** + * Process subordinate hosts by making a deposit fragment with pending transfer information + * obtained from its superordinate domain. + * + * @param superordinateDomains Pairs of (superordinateDomainRepoId, (subordinateHostRepoId, + * (pendingDeposit, revisionId))). This collection maps the subordinate host and the pending + * deposit to include it to its superordinate domain. + * @param domainHistories Pairs of (domainRepoId, revisionId). This collection helps us find the + * historical superordinate domain from its history entry and is obtained from calling {@link + * #getMostRecentHistoryEntries} for domains. + */ + private PCollection> processSubordinateHosts( + PCollection>> superordinateDomains, + PCollection> domainHistories) { + Counter subordinateHostFragmentCounter = Metrics.counter("RDE", "SubordinateHostFragment"); + Counter referencedSubordinateHostCounter = Metrics.counter("RDE", "ReferencedSubordinateHost"); + return KeyedPCollectionTuple.of(HOST_TO_PENDING_DEPOSIT, superordinateDomains) + .and(REVISION_ID, domainHistories) + .apply( + "Join HostResource:PendingDeposits with DomainHistory on DomainResource", + CoGroupByKey.create()) + .apply( + " Remove unreferenced DomainResource", + Filter.by( + kv -> { + boolean toInclude = + kv.getValue().getAll(HOST_TO_PENDING_DEPOSIT).iterator().hasNext() + && kv.getValue().getAll(REVISION_ID).iterator().hasNext(); + if (toInclude) { + referencedSubordinateHostCounter.inc(); + } + return toInclude; + })) + .apply( + "Map subordinate HostResource to DepositFragment", + FlatMapElements.into( + kvs( + TypeDescriptor.of(PendingDeposit.class), + TypeDescriptor.of(DepositFragment.class))) + .via( + (KV kv) -> { + DomainBase superordinateDomain = + (DomainBase) + loadResourceByHistoryEntryId( + DomainHistory.class, + kv.getKey(), + kv.getValue().getOnly(REVISION_ID)); + ImmutableSet.Builder> results = + new ImmutableSet.Builder<>(); + for (KV hostToPendingDeposits : + kv.getValue().getAll(HOST_TO_PENDING_DEPOSIT)) { + HostResource host = + (HostResource) + loadResourceByHistoryEntryId( + HostHistory.class, + hostToPendingDeposits.getKey(), + hostToPendingDeposits.getValue().getOnly(REVISION_ID)); + DepositFragment fragment = + marshaller.marshalSubordinateHost(host, superordinateDomain); + Streams.stream(hostToPendingDeposits.getValue().getAll(PENDING_DEPOSIT)) + .distinct() + .forEach( + pendingDeposit -> { + subordinateHostFragmentCounter.inc(); + results.add(KV.of(pendingDeposit, fragment)); + }); + } + return results.build(); + })); } /** * Decodes the pipeline option extracted from the URL parameter sent by the pipeline launcher to - * the original TLD to pending deposit map. + * the original pending deposit set. */ @SuppressWarnings("unchecked") - static ImmutableSetMultimap decodePendings(String encodedPending) { + static ImmutableSet decodePendingDeposits(String encodedPendingDeposits) { try (ObjectInputStream ois = new ObjectInputStream( new ByteArrayInputStream( - BaseEncoding.base64Url().omitPadding().decode(encodedPending)))) { - return (ImmutableSetMultimap) ois.readObject(); + BaseEncoding.base64Url().omitPadding().decode(encodedPendingDeposits)))) { + return (ImmutableSet) ois.readObject(); } catch (IOException | ClassNotFoundException e) { throw new IllegalArgumentException("Unable to parse encoded pending deposit map.", e); } } /** - * Encodes the TLD to pending deposit map in an URL safe string that is sent to the pipeline - * worker by the pipeline launcher as a pipeline option. + * Encodes the pending deposit set in an URL safe string that is sent to the pipeline worker by + * the pipeline launcher as a pipeline option. */ - public static String encodePendings(ImmutableSetMultimap pendings) + public static String encodePendingDeposits(ImmutableSet pendingDeposits) throws IOException { try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { ObjectOutputStream oos = new ObjectOutputStream(baos); - oos.writeObject(pendings); + oos.writeObject(pendingDeposits); oos.flush(); return BaseEncoding.base64Url().omitPadding().encode(baos.toByteArray()); } @@ -284,14 +686,40 @@ public class RdePipeline implements Serializable { PipelineOptionsFactory.register(RdePipelineOptions.class); RdePipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(RdePipelineOptions.class); - // RegistryPipelineWorkerInitializer only initializes before pipeline executions, after the - // main() function constructed the graph. We need the registry environment set up so that we - // can create a CloudTasksUtils which uses the environment-dependent config file. - options.getRegistryEnvironment().setup(); + RegistryPipelineOptions.validateRegistryPipelineOptions(options); options.setIsolationOverride(TransactionIsolationLevel.TRANSACTION_READ_COMMITTED); DaggerRdePipeline_RdePipelineComponent.builder().options(options).build().rdePipeline().run(); } + /** + * A utility class that contains {@link TupleTag}s when {@link PCollectionTuple}s and {@link + * CoGbkResult}s are used. + */ + protected abstract static class TupleTags { + protected static final TupleTag> DOMAIN_FRAGMENTS = + new TupleTag>() {}; + + protected static final TupleTag> REFERENCED_CONTACTS = + new TupleTag>() {}; + + protected static final TupleTag> REFERENCED_HOSTS = + new TupleTag>() {}; + + protected static final TupleTag>> SUPERORDINATE_DOMAINS = + new TupleTag>>() {}; + + protected static final TupleTag> EXTERNAL_HOST_FRAGMENTS = + new TupleTag>() {}; + + protected static final TupleTag PENDING_DEPOSIT = + new TupleTag() {}; + + protected static final TupleTag> HOST_TO_PENDING_DEPOSIT = + new TupleTag>() {}; + + protected static final TupleTag REVISION_ID = new TupleTag() {}; + } + @Singleton @Component( modules = { diff --git a/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java b/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java index 59042a9b9..c11b55cfe 100644 --- a/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java +++ b/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java @@ -61,7 +61,8 @@ import org.json.JSONObject; /** * Definition of a Dataflow Flex template, which generates a given month's spec11 report. * - *

To stage this template locally, run the {@code stage_beam_pipeline.sh} shell script. + *

To stage this template locally, run {@code ./nom_build :core:sBP --environment=alpha + * --pipeline=spec11}. * *

Then, you can run the staged template via the API client library, gCloud or a raw REST call. * diff --git a/core/src/main/java/google/registry/config/RegistryConfig.java b/core/src/main/java/google/registry/config/RegistryConfig.java index 1eea552bc..fb14077b2 100644 --- a/core/src/main/java/google/registry/config/RegistryConfig.java +++ b/core/src/main/java/google/registry/config/RegistryConfig.java @@ -557,11 +557,23 @@ public final class RegistryConfig { return config.registryPolicy.requireSslCertificates; } + /** + * Returns the GCE machine type that a CPU-demanding pipeline should use. + * + * @see google.registry.beam.rde.RdePipeline + */ + @Provides + @Config("highPerformanceMachineType") + public static String provideHighPerformanceMachineType(RegistryConfigSettings config) { + return config.beam.highPerformanceMachineType; + } + /** * Returns the default job region to run Apache Beam (Cloud Dataflow) jobs in. * * @see google.registry.beam.invoicing.InvoicingPipeline * @see google.registry.beam.spec11.Spec11Pipeline + * @see google.registry.beam.invoicing.InvoicingPipeline */ @Provides @Config("defaultJobRegion") diff --git a/core/src/main/java/google/registry/config/RegistryConfigSettings.java b/core/src/main/java/google/registry/config/RegistryConfigSettings.java index 0084625c3..84b4a674c 100644 --- a/core/src/main/java/google/registry/config/RegistryConfigSettings.java +++ b/core/src/main/java/google/registry/config/RegistryConfigSettings.java @@ -133,6 +133,7 @@ public class RegistryConfigSettings { /** Configuration for Apache Beam (Cloud Dataflow). */ public static class Beam { public String defaultJobRegion; + public String highPerformanceMachineType; public String stagingBucketUrl; } diff --git a/core/src/main/java/google/registry/config/files/default-config.yaml b/core/src/main/java/google/registry/config/files/default-config.yaml index e76d90dfc..2bca9c5f1 100644 --- a/core/src/main/java/google/registry/config/files/default-config.yaml +++ b/core/src/main/java/google/registry/config/files/default-config.yaml @@ -419,6 +419,14 @@ misc: beam: # The default region to run Apache Beam (Cloud Dataflow) jobs in. defaultJobRegion: us-east1 + # The GCE machine type to use when a job is CPU-intensive (e. g. RDE). Be sure + # to check the VM CPU quota for the job region. In a massively parallel + # pipeline this quota can be easily reached and needs to be raised, otherwise + # the job will run very slowly. Also note that there is a separate quota for + # external IPv4 address in a region, which means that machine type with higher + # core count per machine may be preferable in order to preserve IP addresses. + # See: https://cloud.google.com/compute/quotas#cpu_quota + highPerformanceMachineType: n2-standard-4 stagingBucketUrl: gcs-bucket-with-staged-templates keyring: diff --git a/core/src/main/java/google/registry/rde/RdeStagingAction.java b/core/src/main/java/google/registry/rde/RdeStagingAction.java index d1c3fc04f..1dd0418a7 100644 --- a/core/src/main/java/google/registry/rde/RdeStagingAction.java +++ b/core/src/main/java/google/registry/rde/RdeStagingAction.java @@ -14,12 +14,14 @@ package google.registry.rde; +import static com.google.common.collect.ImmutableSetMultimap.toImmutableSetMultimap; import static google.registry.beam.BeamUtils.createJobName; import static google.registry.persistence.transaction.TransactionManagerFactory.tm; import static google.registry.request.Action.Method.GET; import static google.registry.request.Action.Method.POST; import static google.registry.xml.ValidationMode.LENIENT; import static google.registry.xml.ValidationMode.STRICT; +import static java.util.function.Function.identity; import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR; import static javax.servlet.http.HttpServletResponse.SC_NO_CONTENT; import static javax.servlet.http.HttpServletResponse.SC_OK; @@ -29,6 +31,7 @@ import com.google.api.services.dataflow.model.LaunchFlexTemplateParameter; import com.google.api.services.dataflow.model.LaunchFlexTemplateRequest; import com.google.api.services.dataflow.model.LaunchFlexTemplateResponse; import com.google.common.base.Ascii; +import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -48,6 +51,7 @@ import google.registry.model.EppResource; import google.registry.model.common.Cursor; import google.registry.model.common.Cursor.CursorType; import google.registry.model.contact.ContactResource; +import google.registry.model.domain.DomainBase; import google.registry.model.host.HostResource; import google.registry.model.index.EppResourceIndex; import google.registry.model.rde.RdeMode; @@ -67,14 +71,17 @@ import org.joda.time.DateTime; import org.joda.time.Duration; /** - * MapReduce that idempotently stages escrow deposit XML files on GCS for RDE/BRDA for all TLDs. + * Action that kicks off either a MapReduce (for Datastore) or Dataflow (for Cloud SQL) job to stage + * escrow deposit XML files on GCS for RDE/BRDA for all TLDs. * - *

MapReduce Operation

+ *

Pending Deposits

* *

This task starts by asking {@link PendingDepositChecker} which deposits need to be generated. * If there's nothing to deposit, we return 204 No Content; otherwise, we fire off a MapReduce job * and redirect to its status GUI. The task can also be run in manual operation, as described below. * + *

MapReduce

+ * *

The mapreduce job scans every {@link EppResource} in Datastore. It maps a point-in-time * representation of each entity to the escrow XML files in which it should appear. * @@ -88,11 +95,26 @@ import org.joda.time.Duration; *

{@link Registrar} entities, both active and inactive, are included in all deposits. They are * not rewinded point-in-time. * + *

Dataflow

+ * + * The Dataflow job finds the most recent history entry on or before watermark for each resource + * type and loads the embedded resource from it, which is then projected to watermark time to + * account for things like pending transfer. + * + *

Only {@link ContactResource}s and {@link HostResource}s that are referenced by an included + * {@link DomainBase} will be included in the corresponding pending deposit. + * + *

{@link Registrar} entities, both active and inactive, are included in all deposits. They are + * not rewinded point-in-time. + * + *

Afterward

+ * *

The XML deposit files generated by this job are humongous. A tiny XML report file is generated * for each deposit, telling us how much of what it contains. * - *

Once a deposit is successfully generated, an {@link RdeUploadAction} is enqueued which will - * upload it via SFTP to the third-party escrow provider. + *

Once a deposit is successfully generated, For RDE an {@link RdeUploadAction} is enqueued which + * will upload it via SFTP to the third-party escrow provider; for BRDA an {@link BrdaCopyAction} is + * enqueued which will copy it to a GCS bucket and be rsynced to a third-party escrow provider. * *

To generate escrow deposits manually and locally, use the {@code nomulus} tool command {@code * GenerateEscrowDepositCommand}. @@ -106,7 +128,7 @@ import org.joda.time.Duration; * *

Valid model objects might not be valid to the RDE XML schema. A single invalid object will * cause the whole deposit to fail. You need to check the logs, find out which entities are broken, - * and perform Datastore surgery. + * and perform database surgery. * *

If a deposit fails, an error is emitted to the logs for each broken entity. It tells you the * key and shows you its representation in lenient XML. @@ -137,33 +159,40 @@ import org.joda.time.Duration; *

The deposit and report are encrypted using {@link Ghostryde}. Administrators can use the * {@code GhostrydeCommand} command in the {@code nomulus} tool to view them. * - *

Unencrypted XML fragments are stored temporarily between the map and reduce steps. The - * ghostryde encryption on the full archived deposits makes life a little more difficult for an - * attacker. But security ultimately depends on the bucket. + *

Unencrypted XML fragments are stored temporarily between the map and reduce steps and between + * Dataflow transforms. The ghostryde encryption on the full archived deposits makes life a little + * more difficult for an attacker. But security ultimately depends on the bucket. * *

Idempotency

* - *

We lock the reduce tasks. This is necessary because: a) App Engine tasks might get double - * executed; and b) Cloud Storage file handles get committed on close even if our code throws an - * exception. + *

We lock the reduce tasks for the MapReduce job. This is necessary because: a) App Engine tasks + * might get double executed; and b) Cloud Storage file handles get committed on close even if + * our code throws an exception. + * + *

For the Dataflow job we do not employ a lock because it is difficult to span a lock across + * three subsequent transforms (save to GCS, roll forward cursor, enqueue next action). Instead, we + * get around the issue by saving the deposit to a unique folder named after the job name so there + * is no possibility of overwriting. * *

Deposits are generated serially for a given (watermark, mode) pair. A deposit is never started * beyond the cursor. Once a deposit is completed, its cursor is rolled forward transactionally. * Duplicate jobs may exist {@code <=cursor}. So a transaction will not bother changing the cursor * if it's already been rolled forward. * - *

Enqueuing {@code RdeUploadAction} is also part of the cursor transaction. This is necessary - * because the first thing the upload task does is check the staging cursor to verify it's been - * completed, so we can't enqueue before we roll. We also can't enqueue after the roll, because then - * if enqueuing fails, the upload might never be enqueued. + *

Enqueuing {@code RdeUploadAction} or {@code BrdaCopyAction} is also part of the cursor + * transaction. This is necessary because the first thing the upload task does is check the staging + * cursor to verify it's been completed, so we can't enqueue before we roll. We also can't enqueue + * after the roll, because then if enqueuing fails, the upload might never be enqueued. * *

Determinism

* *

The filename of an escrow deposit is determistic for a given (TLD, watermark, {@linkplain * RdeMode mode}) triplet. Its generated contents is deterministic in all the ways that we care - * about. Its view of the database is strongly consistent. + * about. Its view of the database is strongly consistent in Cloud SQL automatically by nature of + * the initial query for the history entry running at {@code READ_COMMITTED} transaction isolation + * level. * - *

This is because: + *

This is also true in Datastore because: * *

    *
  1. {@code EppResource} queries are strongly consistent thanks to {@link EppResourceIndex} @@ -226,6 +255,11 @@ public final class RdeStagingAction implements Runnable { @Inject MapreduceRunner mrRunner; @Inject @Config("projectId") String projectId; @Inject @Config("defaultJobRegion") String jobRegion; + + @Inject + @Config("highPerformanceMachineType") + String machineType; + @Inject @Config("transactionCooldown") Duration transactionCooldown; @Inject @Config("beamStagingBucketUrl") String stagingBucketUrl; @Inject @Config("rdeBucket") String rdeBucket; @@ -270,43 +304,65 @@ public final class RdeStagingAction implements Runnable { new NullInput<>(), EppResourceInputs.createEntityInput(EppResource.class))) .sendLinkToMapreduceConsole(response); } else { - try { - LaunchFlexTemplateParameter parameter = - new LaunchFlexTemplateParameter() - .setJobName(createJobName("rde", clock)) - .setContainerSpecGcsPath( - String.format("%s/%s_metadata.json", stagingBucketUrl, PIPELINE_NAME)) - .setParameters( - ImmutableMap.of( - "pendings", - RdePipeline.encodePendings(pendings), - "validationMode", - validationMode.name(), - "rdeStagingBucket", - rdeBucket, - "stagingKey", - BaseEncoding.base64Url().omitPadding().encode(stagingKeyBytes), - "registryEnvironment", - RegistryEnvironment.get().name())); - LaunchFlexTemplateResponse launchResponse = - dataflow - .projects() - .locations() - .flexTemplates() - .launch( - projectId, - jobRegion, - new LaunchFlexTemplateRequest().setLaunchParameter(parameter)) - .execute(); - logger.atInfo().log("Got response: %s", launchResponse.getJob().toPrettyString()); - response.setStatus(SC_OK); - response.setPayload( - String.format("Launched RDE pipeline: %s", launchResponse.getJob().getId())); - } catch (IOException e) { - logger.atWarning().withCause(e).log("Pipeline Launch failed"); - response.setStatus(SC_INTERNAL_SERVER_ERROR); - response.setPayload(String.format("Pipeline launch failed: %s", e.getMessage())); - } + ImmutableList.Builder jobNameBuilder = new ImmutableList.Builder<>(); + pendings.values().stream() + .collect(toImmutableSetMultimap(PendingDeposit::watermark, identity())) + .asMap() + .forEach( + (watermark, pendingDeposits) -> { + try { + LaunchFlexTemplateParameter parameter = + new LaunchFlexTemplateParameter() + .setJobName( + createJobName( + String.format( + "rde-%s", watermark.toString("yyyy-MM-dd't'HH-mm-ss'z'")), + clock)) + .setContainerSpecGcsPath( + String.format("%s/%s_metadata.json", stagingBucketUrl, PIPELINE_NAME)) + .setParameters( + new ImmutableMap.Builder() + .put( + "pendings", + RdePipeline.encodePendingDeposits( + ImmutableSet.copyOf(pendingDeposits))) + .put("validationMode", validationMode.name()) + .put("rdeStagingBucket", rdeBucket) + .put( + "stagingKey", + BaseEncoding.base64Url() + .omitPadding() + .encode(stagingKeyBytes)) + .put("registryEnvironment", RegistryEnvironment.get().name()) + .put("workerMachineType", machineType) + // TODO (jianglai): Investigate turning off public IPs (for which + // there is a quota) in order to increase the total number of + // workers allowed (also under quota). + // See: + // https://cloud.google.com/dataflow/docs/guides/routes-firewall + .put("usePublicIps", "true") + .build()); + LaunchFlexTemplateResponse launchResponse = + dataflow + .projects() + .locations() + .flexTemplates() + .launch( + projectId, + jobRegion, + new LaunchFlexTemplateRequest().setLaunchParameter(parameter)) + .execute(); + logger.atInfo().log("Got response: %s", launchResponse.getJob().toPrettyString()); + jobNameBuilder.add(launchResponse.getJob().getId()); + } catch (IOException e) { + logger.atWarning().withCause(e).log("Pipeline Launch failed"); + response.setStatus(SC_INTERNAL_SERVER_ERROR); + response.setPayload(String.format("Pipeline launch failed: %s", e.getMessage())); + } + }); + response.setStatus(SC_OK); + response.setPayload( + String.format("Launched RDE pipeline: %s", Joiner.on(", ").join(jobNameBuilder.build()))); } } @@ -349,7 +405,7 @@ public final class RdeStagingAction implements Runnable { throw new BadRequestException("Directory must not start with a slash"); } String directoryWithTrailingSlash = - directory.get().endsWith("/") ? directory.get() : (directory.get() + '/'); + directory.get().endsWith("/") ? directory.get() : directory.get() + '/'; if (modeStrings.isEmpty()) { throw new BadRequestException("Mode parameter required in manual operation"); @@ -381,7 +437,7 @@ public final class RdeStagingAction implements Runnable { } } - if (revision.isPresent() && (revision.get() < 0)) { + if (revision.isPresent() && revision.get() < 0) { throw new BadRequestException("Revision must be greater than or equal to zero"); } @@ -394,11 +450,7 @@ public final class RdeStagingAction implements Runnable { pendingsBuilder.put( tld, PendingDeposit.createInManualOperation( - tld, - watermark, - mode, - directoryWithTrailingSlash, - revision.orElse(null))); + tld, watermark, mode, directoryWithTrailingSlash, revision.orElse(null))); } } } diff --git a/core/src/main/resources/google/registry/beam/rde_pipeline_metadata.json b/core/src/main/resources/google/registry/beam/rde_pipeline_metadata.json index 3b39c5725..9f0fc7de9 100644 --- a/core/src/main/resources/google/registry/beam/rde_pipeline_metadata.json +++ b/core/src/main/resources/google/registry/beam/rde_pipeline_metadata.json @@ -43,6 +43,22 @@ "regexes": [ "[A-Za-z0-9\\-_]+" ] + }, + { + "name": "workerMachineType", + "label": "The GCE machine type for the dataflow job workers.", + "helpText": "See https://cloud.google.com/dataflow/quotas#compute-engine-quotas for available machine types.", + "regexes": [ + "[a-z0-9\\-]+" + ] + }, + { + "name": "usePublicIps", + "label": "Whether the GCE workers are assigned public IPs", + "helpText": "Public IPs have an associated cost and there's a quota per region on the total number of public IPs assigned at a given time. If the service only needs to access GCP APIs, it's better to not use public IP, but one needs to configure the network accordingly. See https://cloud.google.com/dataflow/docs/guides/routes-firewall.", + "regexes": [ + "true|false" + ] } ] } diff --git a/core/src/test/java/google/registry/beam/BeamActionTestBase.java b/core/src/test/java/google/registry/beam/BeamActionTestBase.java index 065a3e9ff..e6c9023dd 100644 --- a/core/src/test/java/google/registry/beam/BeamActionTestBase.java +++ b/core/src/test/java/google/registry/beam/BeamActionTestBase.java @@ -29,8 +29,10 @@ import com.google.api.services.dataflow.model.LaunchFlexTemplateRequest; import com.google.api.services.dataflow.model.LaunchFlexTemplateResponse; import google.registry.testing.FakeResponse; import org.junit.jupiter.api.BeforeEach; +import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; +import org.mockito.stubbing.Answer; /** Base class for all actions that launches a Dataflow Flex template. */ @MockitoSettings(strictness = Strictness.STRICT_STUBS) @@ -42,8 +44,19 @@ public abstract class BeamActionTestBase { private Locations locations = mock(Locations.class); protected FlexTemplates templates = mock(FlexTemplates.class); protected Launch launch = mock(Launch.class); - private LaunchFlexTemplateResponse launchResponse = - new LaunchFlexTemplateResponse().setJob(new Job().setId("jobid")); + private Answer answer = + new Answer() { + private Integer times = 0; + + @Override + public LaunchFlexTemplateResponse answer(InvocationOnMock invocation) throws Throwable { + LaunchFlexTemplateResponse response = + new LaunchFlexTemplateResponse() + .setJob(new Job().setId("jobid" + (times == 0 ? "" : times.toString()))); + times = times + 1; + return response; + } + }; @BeforeEach protected void beforeEach() throws Exception { @@ -52,6 +65,6 @@ public abstract class BeamActionTestBase { when(locations.flexTemplates()).thenReturn(templates); when(templates.launch(anyString(), anyString(), any(LaunchFlexTemplateRequest.class))) .thenReturn(launch); - when(launch.execute()).thenReturn(launchResponse); + when(launch.execute()).thenAnswer(answer); } } diff --git a/core/src/test/java/google/registry/beam/rde/RdePipelineTest.java b/core/src/test/java/google/registry/beam/rde/RdePipelineTest.java index 9b40de040..3b7e9621a 100644 --- a/core/src/test/java/google/registry/beam/rde/RdePipelineTest.java +++ b/core/src/test/java/google/registry/beam/rde/RdePipelineTest.java @@ -17,8 +17,8 @@ package google.registry.beam.rde; import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.truth.Truth.assertThat; -import static google.registry.beam.rde.RdePipeline.decodePendings; -import static google.registry.beam.rde.RdePipeline.encodePendings; +import static google.registry.beam.rde.RdePipeline.decodePendingDeposits; +import static google.registry.beam.rde.RdePipeline.encodePendingDeposits; import static google.registry.model.common.Cursor.CursorType.RDE_STAGING; import static google.registry.model.rde.RdeMode.FULL; import static google.registry.model.rde.RdeMode.THIN; @@ -27,25 +27,26 @@ import static google.registry.rde.RdeResourceType.CONTACT; import static google.registry.rde.RdeResourceType.DOMAIN; import static google.registry.rde.RdeResourceType.HOST; import static google.registry.rde.RdeResourceType.REGISTRAR; -import static google.registry.testing.AppEngineExtension.loadInitialData; +import static google.registry.testing.AppEngineExtension.makeRegistrar1; +import static google.registry.testing.AppEngineExtension.makeRegistrar2; import static google.registry.testing.DatabaseHelper.createTld; -import static google.registry.testing.DatabaseHelper.newHostResource; +import static google.registry.testing.DatabaseHelper.insertSimpleResources; +import static google.registry.testing.DatabaseHelper.newDomainBase; import static google.registry.testing.DatabaseHelper.persistActiveContact; import static google.registry.testing.DatabaseHelper.persistActiveDomain; -import static google.registry.testing.DatabaseHelper.persistDeletedDomain; +import static google.registry.testing.DatabaseHelper.persistActiveHost; import static google.registry.testing.DatabaseHelper.persistEppResource; import static google.registry.testing.DatabaseHelper.persistNewRegistrar; +import static google.registry.testing.DatabaseHelper.persistResource; import static google.registry.util.ResourceUtils.readResourceUtf8; import static java.nio.charset.StandardCharsets.UTF_8; -import static org.joda.time.Duration.standardDays; +import static org.junit.Assert.assertThrows; import com.google.cloud.storage.BlobId; import com.google.cloud.storage.contrib.nio.testing.LocalStorageHelper; -import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSetMultimap; -import com.google.common.collect.Iterables; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Streams; import com.google.common.io.BaseEncoding; import google.registry.beam.TestPipelineExtension; @@ -53,12 +54,26 @@ import google.registry.gcs.GcsUtils; import google.registry.keyring.api.PgpHelper; import google.registry.model.common.Cursor; import google.registry.model.common.Cursor.CursorType; +import google.registry.model.contact.ContactBase; +import google.registry.model.contact.ContactHistory; +import google.registry.model.contact.ContactResource; +import google.registry.model.domain.DesignatedContact; +import google.registry.model.domain.DomainBase; +import google.registry.model.domain.DomainContent; +import google.registry.model.domain.DomainHistory; +import google.registry.model.domain.Period; +import google.registry.model.eppcommon.Trid; +import google.registry.model.host.HostBase; +import google.registry.model.host.HostHistory; import google.registry.model.host.HostResource; import google.registry.model.rde.RdeMode; import google.registry.model.rde.RdeRevision; import google.registry.model.rde.RdeRevision.RdeRevisionId; import google.registry.model.registrar.Registrar; import google.registry.model.registrar.Registrar.State; +import google.registry.model.reporting.DomainTransactionRecord; +import google.registry.model.reporting.DomainTransactionRecord.TransactionReportField; +import google.registry.model.reporting.HistoryEntry; import google.registry.model.tld.Registry; import google.registry.persistence.VKey; import google.registry.persistence.transaction.JpaTestExtensions; @@ -111,14 +126,11 @@ public class RdePipelineTest { // This is teh default as-of time the RDE/BRDA job. private final DateTime now = DateTime.parse("2000-01-01TZ"); - private final ImmutableSetMultimap pendings = - ImmutableSetMultimap.of( - "soy", - PendingDeposit.create("soy", now, FULL, RDE_STAGING, standardDays(1)), - "soy", - PendingDeposit.create("soy", now, THIN, RDE_STAGING, standardDays(1)), - "fun", - PendingDeposit.create("fun", now, FULL, RDE_STAGING, standardDays(1))); + private final ImmutableSet pendings = + ImmutableSet.of( + PendingDeposit.create("soy", now, FULL, RDE_STAGING, Duration.standardDays(1)), + PendingDeposit.create("soy", now, THIN, RDE_STAGING, Duration.standardDays(1)), + PendingDeposit.create("fun", now, FULL, RDE_STAGING, Duration.standardDays(1))); private final ImmutableList brdaFragments = ImmutableList.of( @@ -129,8 +141,8 @@ public class RdePipelineTest { ImmutableList.of( DepositFragment.create(RdeResourceType.DOMAIN, "\n", ""), DepositFragment.create(RdeResourceType.REGISTRAR, "\n", ""), - DepositFragment.create(RdeResourceType.CONTACT, "\n", ""), - DepositFragment.create(RdeResourceType.HOST, "\n", "")); + DepositFragment.create(CONTACT, "\n", ""), + DepositFragment.create(HOST, "\n", "")); private final GcsUtils gcsUtils = new GcsUtils(LocalStorageHelper.getOptions()); @@ -164,9 +176,68 @@ public class RdePipelineTest { private RdePipeline rdePipeline; + private ContactHistory persistContactHistory(ContactBase contact) { + return persistResource( + new ContactHistory.Builder() + .setType(HistoryEntry.Type.HOST_CREATE) + .setXmlBytes("".getBytes(UTF_8)) + .setModificationTime(clock.nowUtc()) + .setRegistrarId("TheRegistrar") + .setTrid(Trid.create("ABC-123", "server-trid")) + .setBySuperuser(false) + .setReason("reason") + .setRequestedByRegistrar(true) + .setContact(contact) + .setContactRepoId(contact.getRepoId()) + .build()); + } + + private DomainHistory persistDomainHistory(DomainContent domain) { + DomainTransactionRecord transactionRecord = + new DomainTransactionRecord.Builder() + .setTld("soy") + .setReportingTime(clock.nowUtc()) + .setReportField(TransactionReportField.NET_ADDS_1_YR) + .setReportAmount(1) + .build(); + + return persistResource( + new DomainHistory.Builder() + .setType(HistoryEntry.Type.DOMAIN_CREATE) + .setXmlBytes("".getBytes(UTF_8)) + .setModificationTime(clock.nowUtc()) + .setRegistrarId("TheRegistrar") + .setTrid(Trid.create("ABC-123", "server-trid")) + .setBySuperuser(false) + .setReason("reason") + .setRequestedByRegistrar(true) + .setDomain(domain) + .setDomainRepoId(domain.getRepoId()) + .setDomainTransactionRecords(ImmutableSet.of(transactionRecord)) + .setOtherRegistrarId("otherClient") + .setPeriod(Period.create(1, Period.Unit.YEARS)) + .build()); + } + + private HostHistory persistHostHistory(HostBase hostBase) { + return persistResource( + new HostHistory.Builder() + .setType(HistoryEntry.Type.HOST_CREATE) + .setXmlBytes("".getBytes(UTF_8)) + .setModificationTime(clock.nowUtc()) + .setRegistrarId("TheRegistrar") + .setTrid(Trid.create("ABC-123", "server-trid")) + .setBySuperuser(false) + .setReason("reason") + .setRequestedByRegistrar(true) + .setHost(hostBase) + .setHostRepoId(hostBase.getRepoId()) + .build()); + } + @BeforeEach void beforeEach() throws Exception { - loadInitialData(); + insertSimpleResources(ImmutableList.of(makeRegistrar1(), makeRegistrar2())); // Two real registrars have been created by loadInitialData(), named "New Registrar" and "The // Registrar". Create one included registrar (external_monitoring) and two excluded ones. @@ -192,26 +263,89 @@ public class RdePipelineTest { tm().transact( () -> { tm().put(Cursor.create(CursorType.BRDA, now, Registry.get("soy"))); - tm().put(Cursor.create(CursorType.RDE_STAGING, now, Registry.get("soy"))); + tm().put(Cursor.create(RDE_STAGING, now, Registry.get("soy"))); RdeRevision.saveRevision("soy", now, THIN, 0); RdeRevision.saveRevision("soy", now, FULL, 0); }); - // Also persists a "contact1234" contact in the process. - persistActiveDomain("hello.soy"); - persistActiveDomain("kitty.fun"); - // Should not appear anywhere. - persistActiveDomain("lol.cat"); - persistDeletedDomain("deleted.soy", DateTime.parse("1999-12-30TZ")); + // This contact is never referenced. + persistContactHistory(persistActiveContact("contactX")); + ContactResource contact1 = persistActiveContact("contact1234"); + persistContactHistory(contact1); + ContactResource contact2 = persistActiveContact("contact456"); + persistContactHistory(contact2); - persistActiveContact("contact456"); + // This host is never referenced. + persistHostHistory(persistActiveHost("ns0.domain.tld")); + HostResource host1 = persistActiveHost("ns1.external.tld"); + persistHostHistory(host1); + DomainBase helloDomain = + persistEppResource( + newDomainBase("hello.soy", contact1) + .asBuilder() + .addNameserver(host1.createVKey()) + .build()); + persistDomainHistory(helloDomain); + persistHostHistory(persistActiveHost("not-used-subordinate.hello.soy")); + HostResource host2 = persistActiveHost("ns1.hello.soy"); + persistHostHistory(host2); + DomainBase kittyDomain = + persistEppResource( + newDomainBase("kitty.fun", contact2) + .asBuilder() + .addNameservers(ImmutableSet.of(host1.createVKey(), host2.createVKey())) + .build()); + persistDomainHistory(kittyDomain); + // Should not appear because the TLD is not included in a pending deposit. + persistDomainHistory(persistEppResource(newDomainBase("lol.cat", contact1))); + // To be deleted. + DomainBase deletedDomain = persistActiveDomain("deleted.soy"); + persistDomainHistory(deletedDomain); - HostResource host = persistEppResource(newHostResource("old.host.test")); - // Set the clock to 2000-01-02, the updated host should NOT show up in RDE. + // Advance time + clock.advanceOneMilli(); + persistDomainHistory(deletedDomain.asBuilder().setDeletionTime(clock.nowUtc()).build()); + kittyDomain = kittyDomain.asBuilder().setDomainName("cat.fun").build(); + persistDomainHistory(kittyDomain); + ContactResource contact3 = persistActiveContact("contact789"); + persistContactHistory(contact3); + // This is a subordinate domain in TLD .cat, which is not included in any pending deposit. But + // it should still be included as a subordinate host in the pendign deposit for .soy. + HostResource host3 = persistActiveHost("ns1.lol.cat"); + persistHostHistory(host3); + persistDomainHistory( + helloDomain + .asBuilder() + .addContacts( + ImmutableSet.of( + DesignatedContact.create(DesignatedContact.Type.ADMIN, contact3.createVKey()))) + .addNameserver(host3.createVKey()) + .build()); + // contact456 is renamed to contactABC. + persistContactHistory(contact2.asBuilder().setContactId("contactABC").build()); + // ns1.hello.soy is renamed to ns2.hello.soy + persistHostHistory(host2.asBuilder().setHostName("ns2.hello.soy").build()); + + // Set the clock to 2000-01-02, any change after hereafter should not show up in the + // resulting deposit fragments. clock.advanceBy(Duration.standardDays(2)); - persistEppResource(host.asBuilder().setHostName("new.host.test").build()); + persistDomainHistory(kittyDomain.asBuilder().setDeletionTime(clock.nowUtc()).build()); + ContactResource futureContact = persistActiveContact("future-contact"); + persistContactHistory(futureContact); + HostResource futureHost = persistActiveHost("ns1.future.tld"); + persistHostHistory(futureHost); + persistDomainHistory( + persistEppResource( + newDomainBase("future.soy", futureContact) + .asBuilder() + .setNameservers(futureHost.createVKey()) + .build())); + // contactABC is renamed to contactXYZ. + persistContactHistory(contact2.asBuilder().setContactId("contactXYZ").build()); + // ns2.hello.soy is renamed to ns3.hello.soy + persistHostHistory(host2.asBuilder().setHostName("ns3.hello.soy").build()); - options.setPendings(encodePendings(pendings)); + options.setPendings(encodePendingDeposits(pendings)); // The EPP resources created in tests do not have all the fields populated, using a STRICT // validation mode will result in a lot of warnings during marshalling. options.setValidationMode("LENIENT"); @@ -223,9 +357,22 @@ public class RdePipelineTest { } @Test - void testSuccess_encodeAndDecodePendingsMap() throws Exception { - String encodedString = encodePendings(pendings); - assertThat(decodePendings(encodedString)).isEqualTo(pendings); + void testSuccess_encodeAndDecodePendingDeposits() throws Exception { + String encodedString = encodePendingDeposits(pendings); + assertThat(decodePendingDeposits(encodedString)).isEqualTo(pendings); + } + + @Test + void testFailure_pendingDepositsWithDifferentWatermarks() throws Exception { + options.setPendings( + encodePendingDeposits( + ImmutableSet.of( + PendingDeposit.create("soy", now, FULL, RDE_STAGING, Duration.standardDays(1)), + PendingDeposit.create( + "soy", now.plusSeconds(1), THIN, RDE_STAGING, Duration.standardDays(1))))); + assertThrows( + IllegalArgumentException.class, + () -> new RdePipeline(options, gcsUtils, cloudTasksHelper.getTestCloudTasksUtils())); } @Test @@ -243,36 +390,47 @@ public class RdePipelineTest { // The same registrars are attached to all the pending deposits. .containsExactly("New Registrar", "The Registrar", "external_monitoring"); // Domain fragments. - assertThat( - getFragmentForType(kv, DOMAIN) - .map(getXmlElement(DOMAIN_NAME_PATTERN)) - .anyMatch( - domain -> - // Deleted domain should not be included - domain.equals("deleted.soy") - // Only domains on the pending deposit's tld should - // appear. - || !kv.getKey() - .tld() - .equals( - Iterables.get( - Splitter.on('.').split(domain), 1)))) - .isFalse(); + if (kv.getKey().tld().equals("soy")) { + assertThat( + getFragmentForType(kv, DOMAIN) + .map(getXmlElement(DOMAIN_NAME_PATTERN)) + .collect(toImmutableSet())) + .containsExactly("hello.soy"); + } else { + assertThat( + getFragmentForType(kv, DOMAIN) + .map(getXmlElement(DOMAIN_NAME_PATTERN)) + .collect(toImmutableSet())) + .containsExactly("cat.fun"); + } if (kv.getKey().mode().equals(FULL)) { - // Contact fragments. - assertThat( - getFragmentForType(kv, CONTACT) - .map(getXmlElement(CONTACT_ID_PATTERN)) - .collect(toImmutableSet())) - // The same contacts are attached too all pending deposits. - .containsExactly("contact1234", "contact456"); - // Host fragments. - assertThat( - getFragmentForType(kv, HOST) - .map(getXmlElement(HOST_NAME_PATTERN)) - .collect(toImmutableSet())) - // Should load the resource before update. - .containsExactly("old.host.test"); + // Contact fragments for hello.soy. + if (kv.getKey().tld().equals("soy")) { + assertThat( + getFragmentForType(kv, CONTACT) + .map(getXmlElement(CONTACT_ID_PATTERN)) + .collect(toImmutableSet())) + .containsExactly("contact1234", "contact789"); + // Host fragments for hello.soy. + assertThat( + getFragmentForType(kv, HOST) + .map(getXmlElement(HOST_NAME_PATTERN)) + .collect(toImmutableSet())) + .containsExactly("ns1.external.tld", "ns1.lol.cat"); + } else { + // Contact fragments for cat.fun. + assertThat( + getFragmentForType(kv, CONTACT) + .map(getXmlElement(CONTACT_ID_PATTERN)) + .collect(toImmutableSet())) + .containsExactly("contactABC"); + // Host fragments for cat.soy. + assertThat( + getFragmentForType(kv, HOST) + .map(getXmlElement(HOST_NAME_PATTERN)) + .collect(toImmutableSet())) + .containsExactly("ns1.external.tld", "ns2.hello.soy"); + } } else { // BRDA does not contain contact or hosts. assertThat( @@ -295,7 +453,7 @@ public class RdePipelineTest { PendingDeposit brdaKey = PendingDeposit.create("soy", now, THIN, CursorType.BRDA, Duration.standardDays(1)); PendingDeposit rdeKey = - PendingDeposit.create("soy", now, FULL, CursorType.RDE_STAGING, Duration.standardDays(1)); + PendingDeposit.create("soy", now, FULL, RDE_STAGING, Duration.standardDays(1)); verifyFiles(ImmutableMap.of(brdaKey, brdaFragments, rdeKey, rdeFragments), false); @@ -311,7 +469,7 @@ public class RdePipelineTest { .isEquivalentAccordingToCompareTo(now.plus(Duration.standardDays(1))); assertThat(loadRevision(now, THIN)).isEqualTo(1); - assertThat(loadCursorTime(CursorType.RDE_STAGING)) + assertThat(loadCursorTime(RDE_STAGING)) .isEquivalentAccordingToCompareTo(now.plus(Duration.standardDays(1))); assertThat(loadRevision(now, FULL)).isEqualTo(1); cloudTasksHelper.assertTasksEnqueued( @@ -350,7 +508,7 @@ public class RdePipelineTest { assertThat(loadCursorTime(CursorType.BRDA)).isEquivalentAccordingToCompareTo(now); assertThat(loadRevision(now, THIN)).isEqualTo(0); - assertThat(loadCursorTime(CursorType.RDE_STAGING)).isEquivalentAccordingToCompareTo(now); + assertThat(loadCursorTime(RDE_STAGING)).isEquivalentAccordingToCompareTo(now); assertThat(loadRevision(now, FULL)).isEqualTo(0); cloudTasksHelper.assertNoTasksEnqueued("brda", "rde-upload"); } diff --git a/core/src/test/java/google/registry/rde/RdeStagingActionCloudSqlTest.java b/core/src/test/java/google/registry/rde/RdeStagingActionCloudSqlTest.java index 27b61d232..2dffd5b74 100644 --- a/core/src/test/java/google/registry/rde/RdeStagingActionCloudSqlTest.java +++ b/core/src/test/java/google/registry/rde/RdeStagingActionCloudSqlTest.java @@ -79,6 +79,7 @@ public class RdeStagingActionCloudSqlTest extends BeamActionTestBase { action.watermarks = ImmutableSet.of(); action.revision = Optional.empty(); action.dataflow = dataflow; + action.machineType = "machine-type"; } @TestSqlOnly @@ -156,7 +157,7 @@ public class RdeStagingActionCloudSqlTest extends BeamActionTestBase { } @TestSqlOnly - void testRun_afterTransactionCooldown_runsMapReduce() throws Exception { + void testRun_afterTransactionCooldown_runsPipeline() throws Exception { createTldWithEscrowEnabled("lol"); clock.setTo(DateTime.parse("2000-01-01T00:05:00Z")); action.transactionCooldown = Duration.standardMinutes(5); @@ -241,18 +242,19 @@ public class RdeStagingActionCloudSqlTest extends BeamActionTestBase { } @TestSqlOnly - void testManualRun_validParameters_runsMapReduce() throws Exception { + void testManualRun_validParameters_runsPipeline() throws Exception { createTldWithEscrowEnabled("lol"); clock.setTo(DateTime.parse("2000-01-01TZ")); action.manual = true; action.directory = Optional.of("test/"); action.modeStrings = ImmutableSet.of("full"); action.tlds = ImmutableSet.of("lol"); - action.watermarks = ImmutableSet.of(DateTime.parse("2001-01-01TZ")); + action.watermarks = + ImmutableSet.of(DateTime.parse("1999-12-31TZ"), DateTime.parse("2001-01-01TZ")); action.run(); assertThat(response.getStatus()).isEqualTo(200); - assertThat(response.getPayload()).contains("Launched RDE pipeline: jobid"); - verify(templates, times(1)) + assertThat(response.getPayload()).contains("Launched RDE pipeline: jobid, jobid1"); + verify(templates, times(2)) .launch(eq("projectId"), eq("jobRegion"), any(LaunchFlexTemplateRequest.class)); }