mirror of
https://github.com/google/nomulus.git
synced 2025-08-01 07:26:29 +02:00
Refactor RDE pipeline (#1427)
The original RDE pipeline was a direct translation of the App Engine MapReduce logic. It turned out to be too slow (taking more than a day to run) due to the way it finds the most recent history entry. This PR overhauled the pipeline by using embedded EPP resource entities inside history entries (only available in SQL) and finding the most recent entries using the SQL engine. It cuts the time done to ~2h. Note that there are quota limits on the CPU cores and external IP addresses for a given GCP region inside a project, which will need to accommodate the resource requirements for the pipeline. More details are provided in comments. Also merged the update cursor stage and enqueue next action stage in RdeIO so that they can be done within a transaction, same as how MapReduce handles them. <!-- Reviewable:start --> This change is [<img src="https://reviewable.io/review_button.svg" height="34" align="absmiddle" alt="Reviewable"/>](https://reviewable.io/reviews/google/nomulus/1427) <!-- Reviewable:end -->
This commit is contained in:
parent
bf4b6978a7
commit
65c8769c68
12 changed files with 985 additions and 295 deletions
|
@ -57,7 +57,8 @@ import org.apache.beam.sdk.values.TypeDescriptor;
|
|||
/**
|
||||
* Definition of a Dataflow Flex pipeline template, which generates a given month's invoices.
|
||||
*
|
||||
* <p>To stage this template locally, run the {@code stage_beam_pipeline.sh} shell script.
|
||||
* <p>To stage this template locally, run {@code ./nom_build :core:sBP --environment=alpha
|
||||
* --pipeline=invoicing}.
|
||||
*
|
||||
* <p>Then, you can run the staged template via the API client library, gCloud or a raw REST call.
|
||||
*
|
||||
|
|
|
@ -75,6 +75,8 @@ public class RdeIO {
|
|||
abstract static class Write
|
||||
extends PTransform<PCollection<KV<PendingDeposit, Iterable<DepositFragment>>>, PDone> {
|
||||
|
||||
private static final long serialVersionUID = 3334807737227087760L;
|
||||
|
||||
abstract GcsUtils gcsUtils();
|
||||
|
||||
abstract CloudTasksUtils cloudTasksUtils();
|
||||
|
@ -113,8 +115,9 @@ public class RdeIO {
|
|||
.apply(
|
||||
"Write to GCS",
|
||||
ParDo.of(new RdeWriter(gcsUtils(), rdeBucket(), stagingKeyBytes(), validationMode())))
|
||||
.apply("Update cursors", ParDo.of(new CursorUpdater()))
|
||||
.apply("Enqueue upload action", ParDo.of(new UploadEnqueuer(cloudTasksUtils())));
|
||||
.apply(
|
||||
"Update cursor and enqueue next action",
|
||||
ParDo.of(new CursorUpdater(cloudTasksUtils())));
|
||||
return PDone.in(input.getPipeline());
|
||||
}
|
||||
}
|
||||
|
@ -123,6 +126,7 @@ public class RdeIO {
|
|||
extends DoFn<KV<PendingDeposit, Iterable<DepositFragment>>, KV<PendingDeposit, Integer>> {
|
||||
|
||||
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
|
||||
private static final long serialVersionUID = 5496375923068400382L;
|
||||
|
||||
private final GcsUtils gcsUtils;
|
||||
private final String rdeBucket;
|
||||
|
@ -169,7 +173,7 @@ public class RdeIO {
|
|||
checkState(key.directoryWithTrailingSlash() != null, "Manual subdirectory not specified");
|
||||
prefix = prefix + "/manual/" + key.directoryWithTrailingSlash() + basename;
|
||||
} else {
|
||||
prefix = prefix + "/" + basename;
|
||||
prefix = prefix + '/' + basename;
|
||||
}
|
||||
BlobId xmlFilename = BlobId.of(rdeBucket, prefix + ".xml.ghostryde");
|
||||
// This file will contain the byte length (ASCII) of the raw unencrypted XML.
|
||||
|
@ -250,12 +254,20 @@ public class RdeIO {
|
|||
}
|
||||
}
|
||||
|
||||
private static class CursorUpdater extends DoFn<KV<PendingDeposit, Integer>, PendingDeposit> {
|
||||
private static class CursorUpdater extends DoFn<KV<PendingDeposit, Integer>, Void> {
|
||||
|
||||
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
|
||||
private static final long serialVersionUID = 5822176227753327224L;
|
||||
|
||||
private final CloudTasksUtils cloudTasksUtils;
|
||||
|
||||
private CursorUpdater(CloudTasksUtils cloudTasksUtils) {
|
||||
this.cloudTasksUtils = cloudTasksUtils;
|
||||
}
|
||||
|
||||
@ProcessElement
|
||||
public void processElement(
|
||||
@Element KV<PendingDeposit, Integer> input, OutputReceiver<PendingDeposit> outputReceiver) {
|
||||
@Element KV<PendingDeposit, Integer> input, PipelineOptions options) {
|
||||
tm().transact(
|
||||
() -> {
|
||||
PendingDeposit key = input.getKey();
|
||||
|
@ -282,46 +294,32 @@ public class RdeIO {
|
|||
logger.atInfo().log(
|
||||
"Rolled forward %s on %s cursor to %s.", key.cursor(), key.tld(), newPosition);
|
||||
RdeRevision.saveRevision(key.tld(), key.watermark(), key.mode(), revision);
|
||||
if (key.mode() == RdeMode.FULL) {
|
||||
cloudTasksUtils.enqueue(
|
||||
RDE_UPLOAD_QUEUE,
|
||||
CloudTasksUtils.createPostTask(
|
||||
RdeUploadAction.PATH,
|
||||
Service.BACKEND.getServiceId(),
|
||||
ImmutableMultimap.of(
|
||||
RequestParameters.PARAM_TLD,
|
||||
key.tld(),
|
||||
RdeModule.PARAM_PREFIX,
|
||||
options.getJobName() + '/')));
|
||||
} else {
|
||||
cloudTasksUtils.enqueue(
|
||||
BRDA_QUEUE,
|
||||
CloudTasksUtils.createPostTask(
|
||||
BrdaCopyAction.PATH,
|
||||
Service.BACKEND.getServiceId(),
|
||||
ImmutableMultimap.of(
|
||||
RequestParameters.PARAM_TLD,
|
||||
key.tld(),
|
||||
RdeModule.PARAM_WATERMARK,
|
||||
key.watermark().toString(),
|
||||
RdeModule.PARAM_PREFIX,
|
||||
options.getJobName() + '/')));
|
||||
}
|
||||
});
|
||||
outputReceiver.output(input.getKey());
|
||||
}
|
||||
}
|
||||
|
||||
private static class UploadEnqueuer extends DoFn<PendingDeposit, Void> {
|
||||
|
||||
private final CloudTasksUtils cloudTasksUtils;
|
||||
|
||||
private UploadEnqueuer(CloudTasksUtils cloudTasksUtils) {
|
||||
this.cloudTasksUtils = cloudTasksUtils;
|
||||
}
|
||||
|
||||
@ProcessElement
|
||||
public void processElement(@Element PendingDeposit input, PipelineOptions options) {
|
||||
if (input.mode() == RdeMode.FULL) {
|
||||
cloudTasksUtils.enqueue(
|
||||
RDE_UPLOAD_QUEUE,
|
||||
CloudTasksUtils.createPostTask(
|
||||
RdeUploadAction.PATH,
|
||||
Service.BACKEND.getServiceId(),
|
||||
ImmutableMultimap.of(
|
||||
RequestParameters.PARAM_TLD,
|
||||
input.tld(),
|
||||
RdeModule.PARAM_PREFIX,
|
||||
options.getJobName() + '/')));
|
||||
} else {
|
||||
cloudTasksUtils.enqueue(
|
||||
BRDA_QUEUE,
|
||||
CloudTasksUtils.createPostTask(
|
||||
BrdaCopyAction.PATH,
|
||||
Service.BACKEND.getServiceId(),
|
||||
ImmutableMultimap.of(
|
||||
RequestParameters.PARAM_TLD,
|
||||
input.tld(),
|
||||
RdeModule.PARAM_WATERMARK,
|
||||
input.watermark().toString(),
|
||||
RdeModule.PARAM_PREFIX,
|
||||
options.getJobName() + '/')));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,36 +14,50 @@
|
|||
|
||||
package google.registry.beam.rde;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
import static com.google.common.collect.ImmutableSet.toImmutableSet;
|
||||
import static google.registry.model.EppResourceUtils.loadAtPointInTimeAsync;
|
||||
import static google.registry.beam.rde.RdePipeline.TupleTags.DOMAIN_FRAGMENTS;
|
||||
import static google.registry.beam.rde.RdePipeline.TupleTags.EXTERNAL_HOST_FRAGMENTS;
|
||||
import static google.registry.beam.rde.RdePipeline.TupleTags.HOST_TO_PENDING_DEPOSIT;
|
||||
import static google.registry.beam.rde.RdePipeline.TupleTags.PENDING_DEPOSIT;
|
||||
import static google.registry.beam.rde.RdePipeline.TupleTags.REFERENCED_CONTACTS;
|
||||
import static google.registry.beam.rde.RdePipeline.TupleTags.REFERENCED_HOSTS;
|
||||
import static google.registry.beam.rde.RdePipeline.TupleTags.REVISION_ID;
|
||||
import static google.registry.beam.rde.RdePipeline.TupleTags.SUPERORDINATE_DOMAINS;
|
||||
import static google.registry.model.reporting.HistoryEntryDao.RESOURCE_TYPES_TO_HISTORY_TYPES;
|
||||
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
|
||||
import static org.apache.beam.sdk.values.TypeDescriptors.kvs;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.ImmutableSetMultimap;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.collect.Streams;
|
||||
import com.google.common.io.BaseEncoding;
|
||||
import dagger.BindsInstance;
|
||||
import dagger.Component;
|
||||
import google.registry.beam.common.RegistryJpaIO;
|
||||
import google.registry.beam.common.RegistryPipelineOptions;
|
||||
import google.registry.config.CloudTasksUtilsModule;
|
||||
import google.registry.config.CredentialModule;
|
||||
import google.registry.config.RegistryConfig.ConfigModule;
|
||||
import google.registry.gcs.GcsUtils;
|
||||
import google.registry.model.EppResource;
|
||||
import google.registry.model.contact.ContactHistory;
|
||||
import google.registry.model.contact.ContactResource;
|
||||
import google.registry.model.domain.DomainBase;
|
||||
import google.registry.model.domain.DomainHistory;
|
||||
import google.registry.model.host.HostHistory;
|
||||
import google.registry.model.host.HostResource;
|
||||
import google.registry.model.rde.RdeMode;
|
||||
import google.registry.model.registrar.Registrar;
|
||||
import google.registry.model.registrar.Registrar.Type;
|
||||
import google.registry.model.reporting.HistoryEntry;
|
||||
import google.registry.model.reporting.HistoryEntryDao;
|
||||
import google.registry.persistence.PersistenceModule.TransactionIsolationLevel;
|
||||
import google.registry.persistence.VKey;
|
||||
import google.registry.rde.DepositFragment;
|
||||
import google.registry.rde.PendingDeposit;
|
||||
import google.registry.rde.PendingDeposit.PendingDepositCoder;
|
||||
import google.registry.rde.RdeFragmenter;
|
||||
import google.registry.rde.RdeMarshaller;
|
||||
import google.registry.util.CloudTasksUtils;
|
||||
import google.registry.util.UtilsModule;
|
||||
|
@ -54,74 +68,158 @@ import java.io.IOException;
|
|||
import java.io.ObjectInputStream;
|
||||
import java.io.ObjectOutputStream;
|
||||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Supplier;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.util.HashSet;
|
||||
import javax.inject.Inject;
|
||||
import javax.inject.Singleton;
|
||||
import javax.persistence.Entity;
|
||||
import javax.persistence.IdClass;
|
||||
import org.apache.beam.sdk.Pipeline;
|
||||
import org.apache.beam.sdk.PipelineResult;
|
||||
import org.apache.beam.sdk.coders.KvCoder;
|
||||
import org.apache.beam.sdk.coders.SerializableCoder;
|
||||
import org.apache.beam.sdk.coders.StringUtf8Coder;
|
||||
import org.apache.beam.sdk.coders.VarLongCoder;
|
||||
import org.apache.beam.sdk.metrics.Counter;
|
||||
import org.apache.beam.sdk.metrics.Metrics;
|
||||
import org.apache.beam.sdk.options.PipelineOptionsFactory;
|
||||
import org.apache.beam.sdk.transforms.DoFn;
|
||||
import org.apache.beam.sdk.transforms.Filter;
|
||||
import org.apache.beam.sdk.transforms.FlatMapElements;
|
||||
import org.apache.beam.sdk.transforms.Flatten;
|
||||
import org.apache.beam.sdk.transforms.GroupByKey;
|
||||
import org.apache.beam.sdk.transforms.ParDo;
|
||||
import org.apache.beam.sdk.transforms.join.CoGbkResult;
|
||||
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
|
||||
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
|
||||
import org.apache.beam.sdk.values.KV;
|
||||
import org.apache.beam.sdk.values.PCollection;
|
||||
import org.apache.beam.sdk.values.PCollectionList;
|
||||
import org.apache.beam.sdk.values.PCollectionTuple;
|
||||
import org.apache.beam.sdk.values.TupleTag;
|
||||
import org.apache.beam.sdk.values.TupleTagList;
|
||||
import org.apache.beam.sdk.values.TypeDescriptor;
|
||||
import org.apache.beam.sdk.values.TypeDescriptors;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
/**
|
||||
* Definition of a Dataflow Flex template, which generates RDE/BRDA deposits.
|
||||
*
|
||||
* <p>To stage this template locally, run the {@code stage_beam_pipeline.sh} shell script.
|
||||
* <p>To stage this template locally, run {@code ./nom_build :core:sBP --environment=alpha
|
||||
* --pipeline=rde}.
|
||||
*
|
||||
* <p>Then, you can run the staged template via the API client library, gCloud or a raw REST call.
|
||||
*
|
||||
* <p>This pipeline only works for pending deposits with the same watermark, the {@link
|
||||
* google.registry.rde.RdeStagingAction} will batch such pending deposits together and launch
|
||||
* multiple pipelines if multiple watermarks exist.
|
||||
*
|
||||
* <p>The pipeline is broadly divided into two parts -- creating the {@link DepositFragment}s, and
|
||||
* processing them.
|
||||
*
|
||||
* <h1>Creating {@link DepositFragment}</h1>
|
||||
*
|
||||
* <h2>{@link Registrar}</h2>
|
||||
*
|
||||
* Non-test registrar entities are loaded from Cloud SQL and marshalled into deposit fragments. They
|
||||
* are <b>NOT</b> rewound to the watermark.
|
||||
*
|
||||
* <h2>{@link EppResource}</h2>
|
||||
*
|
||||
* All EPP resources are loaded from the corresponding {@link HistoryEntry}, which has the resource
|
||||
* embedded. In general we find most recent history entry before watermark and filter out the ones
|
||||
* that are soft-deleted by watermark. The history is emitted as pairs of (resource repo ID: history
|
||||
* revision ID) from the SQL query.
|
||||
*
|
||||
* <h3>{@link DomainBase}</h3>
|
||||
*
|
||||
* After the most recent (live) domain resources are loaded from the corresponding history objects,
|
||||
* we marshall them to deposit fragments and emit the (pending deposit: deposit fragment) pairs for
|
||||
* further processing. We also find all the contacts and hosts referenced by a given domain and emit
|
||||
* pairs of (contact/host repo ID: pending deposit) for all RDE pending deposits for further
|
||||
* processing.
|
||||
*
|
||||
* <h3>{@link ContactResource}</h3>
|
||||
*
|
||||
* We first join most recent contact histories, represented by (contact repo ID: contact history
|
||||
* revision ID) pairs, with referenced contacts, represented by (contact repo ID: pending deposit)
|
||||
* pairs, on the contact repo ID, to remove unreferenced contact histories. Contact resources are
|
||||
* then loaded from the remaining referenced contact histories, and marshalled into (pending
|
||||
* deposit: deposit fragment) pairs.
|
||||
*
|
||||
* <h3>{@link HostResource}</h3>
|
||||
*
|
||||
* Similar to {@link ContactResource}, we join the most recent host history with referenced hosts to
|
||||
* find most recent referenced hosts. For external hosts we do the same treatment as we did on
|
||||
* contacts and obtain the (pending deposit: deposit fragment) pairs. For subordinate hosts, we need
|
||||
* to find the superordinate domain in order to properly handle pending transfer in the deposit as
|
||||
* well. So we first find the superordinate domain repo ID from the host and join the (superordinate
|
||||
* domain repo ID: (subordinate host repo ID: (pending deposit: revision ID))) pair with the (domain
|
||||
* repo ID: revision ID) pair obtained from the domain history query in order to map the host at
|
||||
* watermark to the domain at watermark. We then proceed to create the (pending deposit: deposit
|
||||
* fragment) pair for subordinate hosts using the added domain information.
|
||||
*
|
||||
* <h1>Processing {@link DepositFragment}</h1>
|
||||
*
|
||||
* The (pending deposit: deposit fragment) pairs from different resources are combined and grouped
|
||||
* by pending deposit. For each pending deposit, all the relevant deposit fragments are written into
|
||||
* a encrypted file stored on GCS. The filename is uniquely determined by the Beam job ID so there
|
||||
* is no need to lock the GCS write operation to prevent stomping. The cursor for staging the
|
||||
* pending deposit is then rolled forward, and the next action is enqueued. The latter two
|
||||
* operations are performed in a transaction so the cursor is rolled back if enqueueing failed.
|
||||
*
|
||||
* @see <a href="https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates">Using
|
||||
* Flex Templates</a>
|
||||
*/
|
||||
@Singleton
|
||||
public class RdePipeline implements Serializable {
|
||||
|
||||
private static final long serialVersionUID = -4866795928854754666L;
|
||||
private final transient RdePipelineOptions options;
|
||||
private final ValidationMode mode;
|
||||
private final ImmutableSetMultimap<String, PendingDeposit> pendings;
|
||||
private final ImmutableSet<PendingDeposit> pendingDeposits;
|
||||
private final DateTime watermark;
|
||||
private final String rdeBucket;
|
||||
private final byte[] stagingKeyBytes;
|
||||
private final GcsUtils gcsUtils;
|
||||
private final CloudTasksUtils cloudTasksUtils;
|
||||
private final RdeMarshaller marshaller;
|
||||
|
||||
// Registrars to be excluded from data escrow. Not including the sandbox-only OTE type so that
|
||||
// if sneaks into production we would get an extra signal.
|
||||
private static final ImmutableSet<Type> IGNORED_REGISTRAR_TYPES =
|
||||
Sets.immutableEnumSet(Registrar.Type.MONITORING, Registrar.Type.TEST);
|
||||
|
||||
private static final String EPP_RESOURCE_QUERY =
|
||||
"SELECT id FROM %entity% "
|
||||
+ "WHERE COALESCE(creationClientId, '') NOT LIKE 'prober-%' "
|
||||
+ "AND COALESCE(currentSponsorClientId, '') NOT LIKE 'prober-%' "
|
||||
+ "AND COALESCE(lastEppUpdateClientId, '') NOT LIKE 'prober-%'";
|
||||
|
||||
public static String createEppResourceQuery(Class<? extends EppResource> clazz) {
|
||||
return EPP_RESOURCE_QUERY.replace("%entity%", clazz.getAnnotation(Entity.class).name())
|
||||
+ (clazz.equals(DomainBase.class) ? " AND tld in (:tlds)" : "");
|
||||
}
|
||||
// The field name of the EPP resource embedded in its corresponding history entry.
|
||||
private static final ImmutableMap<Class<? extends HistoryEntry>, String> EPP_RESOURCE_FIELD_NAME =
|
||||
ImmutableMap.of(
|
||||
DomainHistory.class,
|
||||
"domainContent",
|
||||
ContactHistory.class,
|
||||
"contactBase",
|
||||
HostHistory.class,
|
||||
"hostBase");
|
||||
|
||||
@Inject
|
||||
RdePipeline(RdePipelineOptions options, GcsUtils gcsUtils, CloudTasksUtils cloudTasksUtils) {
|
||||
this.options = options;
|
||||
this.mode = ValidationMode.valueOf(options.getValidationMode());
|
||||
this.pendings = decodePendings(options.getPendings());
|
||||
this.pendingDeposits = decodePendingDeposits(options.getPendings());
|
||||
ImmutableSet<DateTime> potentialWatermarks =
|
||||
pendingDeposits.stream()
|
||||
.map(PendingDeposit::watermark)
|
||||
.distinct()
|
||||
.collect(toImmutableSet());
|
||||
checkArgument(
|
||||
potentialWatermarks.size() == 1,
|
||||
String.format(
|
||||
"RDE pipeline should only work on pending deposits "
|
||||
+ "with the same watermark, but %d were given: %s",
|
||||
potentialWatermarks.size(), potentialWatermarks));
|
||||
this.watermark = potentialWatermarks.asList().get(0);
|
||||
this.rdeBucket = options.getRdeStagingBucket();
|
||||
this.stagingKeyBytes = BaseEncoding.base64Url().decode(options.getStagingKey());
|
||||
this.gcsUtils = gcsUtils;
|
||||
this.cloudTasksUtils = cloudTasksUtils;
|
||||
this.marshaller = new RdeMarshaller(mode);
|
||||
}
|
||||
|
||||
PipelineResult run() {
|
||||
|
@ -133,13 +231,46 @@ public class RdePipeline implements Serializable {
|
|||
}
|
||||
|
||||
PCollection<KV<PendingDeposit, Iterable<DepositFragment>>> createFragments(Pipeline pipeline) {
|
||||
return PCollectionList.of(processRegistrars(pipeline))
|
||||
.and(processNonRegistrarEntities(pipeline, DomainBase.class))
|
||||
.and(processNonRegistrarEntities(pipeline, ContactResource.class))
|
||||
.and(processNonRegistrarEntities(pipeline, HostResource.class))
|
||||
.apply(Flatten.pCollections())
|
||||
PCollection<KV<PendingDeposit, DepositFragment>> registrarFragments =
|
||||
processRegistrars(pipeline);
|
||||
|
||||
PCollection<KV<String, Long>> domainHistories =
|
||||
getMostRecentHistoryEntries(pipeline, DomainHistory.class);
|
||||
|
||||
PCollection<KV<String, Long>> contactHistories =
|
||||
getMostRecentHistoryEntries(pipeline, ContactHistory.class);
|
||||
|
||||
PCollection<KV<String, Long>> hostHistories =
|
||||
getMostRecentHistoryEntries(pipeline, HostHistory.class);
|
||||
|
||||
PCollectionTuple processedDomainHistories = processDomainHistories(domainHistories);
|
||||
|
||||
PCollection<KV<PendingDeposit, DepositFragment>> domainFragments =
|
||||
processedDomainHistories.get(DOMAIN_FRAGMENTS);
|
||||
|
||||
PCollection<KV<PendingDeposit, DepositFragment>> contactFragments =
|
||||
processContactHistories(
|
||||
processedDomainHistories.get(REFERENCED_CONTACTS), contactHistories);
|
||||
|
||||
PCollectionTuple processedHosts =
|
||||
processHostHistories(processedDomainHistories.get(REFERENCED_HOSTS), hostHistories);
|
||||
|
||||
PCollection<KV<PendingDeposit, DepositFragment>> externalHostFragments =
|
||||
processedHosts.get(EXTERNAL_HOST_FRAGMENTS);
|
||||
|
||||
PCollection<KV<PendingDeposit, DepositFragment>> subordinateHostFragments =
|
||||
processSubordinateHosts(processedHosts.get(SUPERORDINATE_DOMAINS), domainHistories);
|
||||
|
||||
return PCollectionList.of(registrarFragments)
|
||||
.and(domainFragments)
|
||||
.and(contactFragments)
|
||||
.and(externalHostFragments)
|
||||
.and(subordinateHostFragments)
|
||||
.apply(
|
||||
"Combine PendingDeposit:DepositFragment pairs from all entities",
|
||||
Flatten.pCollections())
|
||||
.setCoder(KvCoder.of(PendingDepositCoder.of(), SerializableCoder.of(DepositFragment.class)))
|
||||
.apply("Group by PendingDeposit", GroupByKey.create());
|
||||
.apply("Group DepositFragment by PendingDeposit", GroupByKey.create());
|
||||
}
|
||||
|
||||
void persistData(PCollection<KV<PendingDeposit, Iterable<DepositFragment>>> input) {
|
||||
|
@ -154,127 +285,398 @@ public class RdePipeline implements Serializable {
|
|||
.build());
|
||||
}
|
||||
|
||||
PCollection<KV<PendingDeposit, DepositFragment>> processRegistrars(Pipeline pipeline) {
|
||||
private PCollection<KV<PendingDeposit, DepositFragment>> processRegistrars(Pipeline pipeline) {
|
||||
// Note that the namespace in the metric is not being used by Stackdriver, it just has to be
|
||||
// non-empty.
|
||||
// See:
|
||||
// https://stackoverflow.com/questions/48530496/google-dataflow-custom-metrics-not-showing-on-stackdriver
|
||||
Counter includedRegistrarCounter = Metrics.counter("RDE", "IncludedRegistrar");
|
||||
Counter registrarFragmentCounter = Metrics.counter("RDE", "RegistrarFragment");
|
||||
return pipeline
|
||||
.apply(
|
||||
"Read all production Registrar entities",
|
||||
"Read all production Registrars",
|
||||
RegistryJpaIO.read(
|
||||
"SELECT clientIdentifier FROM Registrar WHERE type NOT IN (:types)",
|
||||
ImmutableMap.of("types", IGNORED_REGISTRAR_TYPES),
|
||||
String.class,
|
||||
// TODO: consider adding coders for entities and pass them directly instead of using
|
||||
// VKeys.
|
||||
id -> VKey.createSql(Registrar.class, id)))
|
||||
.apply(
|
||||
"Marshal Registrar into DepositFragment",
|
||||
"Marshall Registrar into DepositFragment",
|
||||
FlatMapElements.into(
|
||||
TypeDescriptors.kvs(
|
||||
kvs(
|
||||
TypeDescriptor.of(PendingDeposit.class),
|
||||
TypeDescriptor.of(DepositFragment.class)))
|
||||
.via(
|
||||
(VKey<Registrar> key) -> {
|
||||
includedRegistrarCounter.inc();
|
||||
Registrar registrar = jpaTm().transact(() -> jpaTm().loadByKey(key));
|
||||
DepositFragment fragment =
|
||||
new RdeMarshaller(mode).marshalRegistrar(registrar);
|
||||
return pendings.values().stream()
|
||||
.map(pending -> KV.of(pending, fragment))
|
||||
.collect(toImmutableSet());
|
||||
DepositFragment fragment = marshaller.marshalRegistrar(registrar);
|
||||
ImmutableSet<KV<PendingDeposit, DepositFragment>> fragments =
|
||||
pendingDeposits.stream()
|
||||
.map(pending -> KV.of(pending, fragment))
|
||||
.collect(toImmutableSet());
|
||||
registrarFragmentCounter.inc(fragments.size());
|
||||
return fragments;
|
||||
}));
|
||||
}
|
||||
|
||||
<T extends EppResource>
|
||||
PCollection<KV<PendingDeposit, DepositFragment>> processNonRegistrarEntities(
|
||||
Pipeline pipeline, Class<T> clazz) {
|
||||
return createInputs(pipeline, clazz)
|
||||
.apply("Marshal " + clazz.getSimpleName() + " into DepositFragment", mapToFragments(clazz))
|
||||
.setCoder(
|
||||
KvCoder.of(PendingDepositCoder.of(), SerializableCoder.of(DepositFragment.class)));
|
||||
/**
|
||||
* Load the most recent history entry before the watermark for a given history entry type.
|
||||
*
|
||||
* <p>Note that deleted and non-production resources are not included.
|
||||
*
|
||||
* @return A KV pair of (repoId, revisionId), used to reconstruct the composite key for the
|
||||
* history entry.
|
||||
*/
|
||||
private <T extends HistoryEntry> PCollection<KV<String, Long>> getMostRecentHistoryEntries(
|
||||
Pipeline pipeline, Class<T> historyClass) {
|
||||
String repoIdFieldName = HistoryEntryDao.REPO_ID_FIELD_NAMES.get(historyClass);
|
||||
String resourceFieldName = EPP_RESOURCE_FIELD_NAME.get(historyClass);
|
||||
return pipeline
|
||||
.apply(
|
||||
String.format("Load most recent %s", historyClass.getSimpleName()),
|
||||
RegistryJpaIO.read(
|
||||
("SELECT %repoIdField%, id FROM %entity% WHERE (%repoIdField%, modificationTime)"
|
||||
+ " IN (SELECT %repoIdField%, MAX(modificationTime) FROM %entity% WHERE"
|
||||
+ " modificationTime <= :watermark GROUP BY %repoIdField%) AND"
|
||||
+ " %resourceField%.deletionTime > :watermark AND"
|
||||
+ " COALESCE(%resourceField%.creationClientId, '') NOT LIKE 'prober-%' AND"
|
||||
+ " COALESCE(%resourceField%.currentSponsorClientId, '') NOT LIKE 'prober-%'"
|
||||
+ " AND COALESCE(%resourceField%.lastEppUpdateClientId, '') NOT LIKE"
|
||||
+ " 'prober-%' "
|
||||
+ (historyClass == DomainHistory.class
|
||||
? "AND %resourceField%.tld IN "
|
||||
+ "(SELECT id FROM Tld WHERE tldType = 'REAL')"
|
||||
: ""))
|
||||
.replace("%entity%", historyClass.getSimpleName())
|
||||
.replace("%repoIdField%", repoIdFieldName)
|
||||
.replace("%resourceField%", resourceFieldName),
|
||||
ImmutableMap.of("watermark", watermark),
|
||||
Object[].class,
|
||||
row -> KV.of((String) row[0], (long) row[1])))
|
||||
.setCoder(KvCoder.of(StringUtf8Coder.of(), VarLongCoder.of()));
|
||||
}
|
||||
|
||||
<T extends EppResource> PCollection<VKey<T>> createInputs(Pipeline pipeline, Class<T> clazz) {
|
||||
return pipeline.apply(
|
||||
"Read all production " + clazz.getSimpleName() + " entities",
|
||||
RegistryJpaIO.read(
|
||||
createEppResourceQuery(clazz),
|
||||
clazz.equals(DomainBase.class)
|
||||
? ImmutableMap.of("tlds", pendings.keySet())
|
||||
: ImmutableMap.of(),
|
||||
String.class,
|
||||
// TODO: consider adding coders for entities and pass them directly instead of using
|
||||
// VKeys.
|
||||
x -> VKey.createSql(clazz, x)));
|
||||
private <T extends HistoryEntry> EppResource loadResourceByHistoryEntryId(
|
||||
Class<T> historyEntryClazz, String repoId, long revisionId) {
|
||||
try {
|
||||
Class<?> idClazz = historyEntryClazz.getAnnotation(IdClass.class).value();
|
||||
Serializable idObject =
|
||||
(Serializable)
|
||||
idClazz.getConstructor(String.class, long.class).newInstance(repoId, revisionId);
|
||||
return jpaTm()
|
||||
.transact(() -> jpaTm().loadByKey(VKey.createSql(historyEntryClazz, idObject)))
|
||||
.getResourceAtPointInTime()
|
||||
.map(resource -> resource.cloneProjectedAtTime(watermark))
|
||||
.get();
|
||||
} catch (NoSuchMethodException
|
||||
| InvocationTargetException
|
||||
| InstantiationException
|
||||
| IllegalAccessException e) {
|
||||
throw new RuntimeException(
|
||||
String.format(
|
||||
"Cannot load resource from %s with repoId %s and revisionId %s",
|
||||
historyEntryClazz.getSimpleName(), repoId, revisionId),
|
||||
e);
|
||||
}
|
||||
}
|
||||
|
||||
<T extends EppResource>
|
||||
FlatMapElements<VKey<T>, KV<PendingDeposit, DepositFragment>> mapToFragments(Class<T> clazz) {
|
||||
return FlatMapElements.into(
|
||||
TypeDescriptors.kvs(
|
||||
TypeDescriptor.of(PendingDeposit.class), TypeDescriptor.of(DepositFragment.class)))
|
||||
.via(
|
||||
(VKey<T> key) -> {
|
||||
T resource = jpaTm().transact(() -> jpaTm().loadByKey(key));
|
||||
// The set of all TLDs to which this resource should be emitted.
|
||||
ImmutableSet<String> tlds =
|
||||
clazz.equals(DomainBase.class)
|
||||
? ImmutableSet.of(((DomainBase) resource).getTld())
|
||||
: pendings.keySet();
|
||||
// Get the set of all point-in-time watermarks we need, to minimize rewinding.
|
||||
ImmutableSet<DateTime> dates =
|
||||
tlds.stream()
|
||||
.map(pendings::get)
|
||||
.flatMap(ImmutableSet::stream)
|
||||
.map(PendingDeposit::watermark)
|
||||
.collect(toImmutableSet());
|
||||
// Launch asynchronous fetches of point-in-time representations of resource.
|
||||
ImmutableMap<DateTime, Supplier<EppResource>> resourceAtTimes =
|
||||
ImmutableMap.copyOf(
|
||||
Maps.asMap(dates, input -> loadAtPointInTimeAsync(resource, input)));
|
||||
// Convert resource to an XML fragment for each watermark/mode pair lazily and cache
|
||||
// the result.
|
||||
RdeFragmenter fragmenter =
|
||||
new RdeFragmenter(resourceAtTimes, new RdeMarshaller(mode));
|
||||
List<KV<PendingDeposit, DepositFragment>> results = new ArrayList<>();
|
||||
for (String tld : tlds) {
|
||||
for (PendingDeposit pending : pendings.get(tld)) {
|
||||
// Hosts and contacts don't get included in BRDA deposits.
|
||||
if (pending.mode() == RdeMode.THIN && !clazz.equals(DomainBase.class)) {
|
||||
continue;
|
||||
/**
|
||||
* Remove unreferenced resources by joining the (repoId, pendingDeposit) pair with the (repoId,
|
||||
* revisionId) on the repoId.
|
||||
*
|
||||
* <p>The (repoId, pendingDeposit) pairs denote resources (contact, host) that are referenced from
|
||||
* a domain, that are to be included in the corresponding pending deposit.
|
||||
*
|
||||
* <p>The (repoId, revisionId) paris come from the most recent history entry query, which can be
|
||||
* used to load the embedded resources themselves.
|
||||
*
|
||||
* @return a pair of (repoId, ([pendingDeposit], [revisionId])) where neither the pendingDeposit
|
||||
* nor the revisionId list is empty.
|
||||
*/
|
||||
private static PCollection<KV<String, CoGbkResult>> removeUnreferencedResource(
|
||||
PCollection<KV<String, PendingDeposit>> referencedResources,
|
||||
PCollection<KV<String, Long>> historyEntries,
|
||||
Class<? extends EppResource> resourceClazz) {
|
||||
String resourceName = resourceClazz.getSimpleName();
|
||||
Class<? extends HistoryEntry> historyEntryClazz =
|
||||
RESOURCE_TYPES_TO_HISTORY_TYPES.get(resourceClazz);
|
||||
String historyEntryName = historyEntryClazz.getSimpleName();
|
||||
Counter referencedResourceCounter = Metrics.counter("RDE", "Referenced" + resourceName);
|
||||
return KeyedPCollectionTuple.of(PENDING_DEPOSIT, referencedResources)
|
||||
.and(REVISION_ID, historyEntries)
|
||||
.apply(
|
||||
String.format(
|
||||
"Join PendingDeposit with %s revision ID on %s", historyEntryName, resourceName),
|
||||
CoGroupByKey.create())
|
||||
.apply(
|
||||
String.format("Remove unreferenced %s", resourceName),
|
||||
Filter.by(
|
||||
(KV<String, CoGbkResult> kv) -> {
|
||||
boolean toInclude =
|
||||
// If a resource does not have corresponding pending deposit, it is not
|
||||
// referenced and should not be included.
|
||||
kv.getValue().getAll(PENDING_DEPOSIT).iterator().hasNext()
|
||||
// If a resource does not have revision id (this should not happen, as
|
||||
// every referenced resource must be valid at watermark time, therefore
|
||||
// be embedded in a history entry valid at watermark time, otherwise
|
||||
// the domain cannot reference it), there is no way for us to find the
|
||||
// history entry and load the embedded resource. So we ignore the resource
|
||||
// to keep the downstream process simple.
|
||||
&& kv.getValue().getAll(REVISION_ID).iterator().hasNext();
|
||||
if (toInclude) {
|
||||
referencedResourceCounter.inc();
|
||||
}
|
||||
Optional<DepositFragment> fragment =
|
||||
fragmenter.marshal(pending.watermark(), pending.mode());
|
||||
fragment.ifPresent(
|
||||
depositFragment -> results.add(KV.of(pending, depositFragment)));
|
||||
}
|
||||
}
|
||||
return results;
|
||||
});
|
||||
return toInclude;
|
||||
}));
|
||||
}
|
||||
|
||||
private PCollectionTuple processDomainHistories(PCollection<KV<String, Long>> domainHistories) {
|
||||
Counter activeDomainCounter = Metrics.counter("RDE", "ActiveDomainBase");
|
||||
Counter domainFragmentCounter = Metrics.counter("RDE", "DomainFragment");
|
||||
Counter referencedContactCounter = Metrics.counter("RDE", "ReferencedContactResource");
|
||||
Counter referencedHostCounter = Metrics.counter("RDE", "ReferencedHostResource");
|
||||
return domainHistories.apply(
|
||||
"Map DomainHistory to DepositFragment "
|
||||
+ "and emit referenced ContactResource and HostResource",
|
||||
ParDo.of(
|
||||
new DoFn<KV<String, Long>, KV<PendingDeposit, DepositFragment>>() {
|
||||
@ProcessElement
|
||||
public void processElement(
|
||||
@Element KV<String, Long> kv, MultiOutputReceiver receiver) {
|
||||
activeDomainCounter.inc();
|
||||
DomainBase domain =
|
||||
(DomainBase)
|
||||
loadResourceByHistoryEntryId(
|
||||
DomainHistory.class, kv.getKey(), kv.getValue());
|
||||
pendingDeposits.stream()
|
||||
.filter(pendingDeposit -> pendingDeposit.tld().equals(domain.getTld()))
|
||||
.forEach(
|
||||
pendingDeposit -> {
|
||||
// Domains are always deposited in both modes.
|
||||
domainFragmentCounter.inc();
|
||||
receiver
|
||||
.get(DOMAIN_FRAGMENTS)
|
||||
.output(
|
||||
KV.of(
|
||||
pendingDeposit,
|
||||
marshaller.marshalDomain(domain, pendingDeposit.mode())));
|
||||
// Contacts and hosts are only deposited in RDE, not BRDA.
|
||||
if (pendingDeposit.mode() == RdeMode.FULL) {
|
||||
HashSet<Serializable> contacts = new HashSet<>();
|
||||
contacts.add(domain.getAdminContact().getSqlKey());
|
||||
contacts.add(domain.getTechContact().getSqlKey());
|
||||
contacts.add(domain.getRegistrant().getSqlKey());
|
||||
// Billing contact is not mandatory.
|
||||
if (domain.getBillingContact() != null) {
|
||||
contacts.add(domain.getBillingContact().getSqlKey());
|
||||
}
|
||||
referencedContactCounter.inc(contacts.size());
|
||||
contacts.forEach(
|
||||
contactRepoId ->
|
||||
receiver
|
||||
.get(REFERENCED_CONTACTS)
|
||||
.output(KV.of((String) contactRepoId, pendingDeposit)));
|
||||
if (domain.getNsHosts() != null) {
|
||||
referencedHostCounter.inc(domain.getNsHosts().size());
|
||||
domain
|
||||
.getNsHosts()
|
||||
.forEach(
|
||||
hostKey ->
|
||||
receiver
|
||||
.get(REFERENCED_HOSTS)
|
||||
.output(
|
||||
KV.of(
|
||||
(String) hostKey.getSqlKey(),
|
||||
pendingDeposit)));
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
})
|
||||
.withOutputTags(
|
||||
DOMAIN_FRAGMENTS, TupleTagList.of(REFERENCED_CONTACTS).and(REFERENCED_HOSTS)));
|
||||
}
|
||||
|
||||
private PCollection<KV<PendingDeposit, DepositFragment>> processContactHistories(
|
||||
PCollection<KV<String, PendingDeposit>> referencedContacts,
|
||||
PCollection<KV<String, Long>> contactHistories) {
|
||||
Counter contactFragmentCounter = Metrics.counter("RDE", "ContactFragment");
|
||||
return removeUnreferencedResource(referencedContacts, contactHistories, ContactResource.class)
|
||||
.apply(
|
||||
"Map ContactResource to DepositFragment",
|
||||
FlatMapElements.into(
|
||||
kvs(
|
||||
TypeDescriptor.of(PendingDeposit.class),
|
||||
TypeDescriptor.of(DepositFragment.class)))
|
||||
.via(
|
||||
(KV<String, CoGbkResult> kv) -> {
|
||||
ContactResource contact =
|
||||
(ContactResource)
|
||||
loadResourceByHistoryEntryId(
|
||||
ContactHistory.class,
|
||||
kv.getKey(),
|
||||
kv.getValue().getOnly(REVISION_ID));
|
||||
DepositFragment fragment = marshaller.marshalContact(contact);
|
||||
ImmutableSet<KV<PendingDeposit, DepositFragment>> fragments =
|
||||
Streams.stream(kv.getValue().getAll(PENDING_DEPOSIT))
|
||||
// The same contact could be used by multiple domains, therefore
|
||||
// matched to the same pending deposit multiple times.
|
||||
.distinct()
|
||||
.map(pendingDeposit -> KV.of(pendingDeposit, fragment))
|
||||
.collect(toImmutableSet());
|
||||
contactFragmentCounter.inc(fragments.size());
|
||||
return fragments;
|
||||
}));
|
||||
}
|
||||
|
||||
private PCollectionTuple processHostHistories(
|
||||
PCollection<KV<String, PendingDeposit>> referencedHosts,
|
||||
PCollection<KV<String, Long>> hostHistories) {
|
||||
Counter subordinateHostCounter = Metrics.counter("RDE", "SubordinateHostResource");
|
||||
Counter externalHostCounter = Metrics.counter("RDE", "ExternalHostResource");
|
||||
Counter externalHostFragmentCounter = Metrics.counter("RDE", "ExternalHostFragment");
|
||||
return removeUnreferencedResource(referencedHosts, hostHistories, HostResource.class)
|
||||
.apply(
|
||||
"Map external DomainResource to DepositFragment and process subordinate domains",
|
||||
ParDo.of(
|
||||
new DoFn<KV<String, CoGbkResult>, KV<PendingDeposit, DepositFragment>>() {
|
||||
@ProcessElement
|
||||
public void processElement(
|
||||
@Element KV<String, CoGbkResult> kv, MultiOutputReceiver receiver) {
|
||||
HostResource host =
|
||||
(HostResource)
|
||||
loadResourceByHistoryEntryId(
|
||||
HostHistory.class,
|
||||
kv.getKey(),
|
||||
kv.getValue().getOnly(REVISION_ID));
|
||||
// When a host is subordinate, we need to find it's superordinate domain and
|
||||
// include it in the deposit as well.
|
||||
if (host.isSubordinate()) {
|
||||
subordinateHostCounter.inc();
|
||||
receiver
|
||||
.get(SUPERORDINATE_DOMAINS)
|
||||
.output(
|
||||
// The output are pairs of
|
||||
// (superordinateDomainRepoId,
|
||||
// (subordinateHostRepoId, (pendingDeposit, revisionId))).
|
||||
KV.of((String) host.getSuperordinateDomain().getSqlKey(), kv));
|
||||
} else {
|
||||
externalHostCounter.inc();
|
||||
DepositFragment fragment = marshaller.marshalExternalHost(host);
|
||||
Streams.stream(kv.getValue().getAll(PENDING_DEPOSIT))
|
||||
// The same host could be used by multiple domains, therefore
|
||||
// matched to the same pending deposit multiple times.
|
||||
.distinct()
|
||||
.forEach(
|
||||
pendingDeposit -> {
|
||||
externalHostFragmentCounter.inc();
|
||||
receiver
|
||||
.get(EXTERNAL_HOST_FRAGMENTS)
|
||||
.output(KV.of(pendingDeposit, fragment));
|
||||
});
|
||||
}
|
||||
}
|
||||
})
|
||||
.withOutputTags(EXTERNAL_HOST_FRAGMENTS, TupleTagList.of(SUPERORDINATE_DOMAINS)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Process subordinate hosts by making a deposit fragment with pending transfer information
|
||||
* obtained from its superordinate domain.
|
||||
*
|
||||
* @param superordinateDomains Pairs of (superordinateDomainRepoId, (subordinateHostRepoId,
|
||||
* (pendingDeposit, revisionId))). This collection maps the subordinate host and the pending
|
||||
* deposit to include it to its superordinate domain.
|
||||
* @param domainHistories Pairs of (domainRepoId, revisionId). This collection helps us find the
|
||||
* historical superordinate domain from its history entry and is obtained from calling {@link
|
||||
* #getMostRecentHistoryEntries} for domains.
|
||||
*/
|
||||
private PCollection<KV<PendingDeposit, DepositFragment>> processSubordinateHosts(
|
||||
PCollection<KV<String, KV<String, CoGbkResult>>> superordinateDomains,
|
||||
PCollection<KV<String, Long>> domainHistories) {
|
||||
Counter subordinateHostFragmentCounter = Metrics.counter("RDE", "SubordinateHostFragment");
|
||||
Counter referencedSubordinateHostCounter = Metrics.counter("RDE", "ReferencedSubordinateHost");
|
||||
return KeyedPCollectionTuple.of(HOST_TO_PENDING_DEPOSIT, superordinateDomains)
|
||||
.and(REVISION_ID, domainHistories)
|
||||
.apply(
|
||||
"Join HostResource:PendingDeposits with DomainHistory on DomainResource",
|
||||
CoGroupByKey.create())
|
||||
.apply(
|
||||
" Remove unreferenced DomainResource",
|
||||
Filter.by(
|
||||
kv -> {
|
||||
boolean toInclude =
|
||||
kv.getValue().getAll(HOST_TO_PENDING_DEPOSIT).iterator().hasNext()
|
||||
&& kv.getValue().getAll(REVISION_ID).iterator().hasNext();
|
||||
if (toInclude) {
|
||||
referencedSubordinateHostCounter.inc();
|
||||
}
|
||||
return toInclude;
|
||||
}))
|
||||
.apply(
|
||||
"Map subordinate HostResource to DepositFragment",
|
||||
FlatMapElements.into(
|
||||
kvs(
|
||||
TypeDescriptor.of(PendingDeposit.class),
|
||||
TypeDescriptor.of(DepositFragment.class)))
|
||||
.via(
|
||||
(KV<String, CoGbkResult> kv) -> {
|
||||
DomainBase superordinateDomain =
|
||||
(DomainBase)
|
||||
loadResourceByHistoryEntryId(
|
||||
DomainHistory.class,
|
||||
kv.getKey(),
|
||||
kv.getValue().getOnly(REVISION_ID));
|
||||
ImmutableSet.Builder<KV<PendingDeposit, DepositFragment>> results =
|
||||
new ImmutableSet.Builder<>();
|
||||
for (KV<String, CoGbkResult> hostToPendingDeposits :
|
||||
kv.getValue().getAll(HOST_TO_PENDING_DEPOSIT)) {
|
||||
HostResource host =
|
||||
(HostResource)
|
||||
loadResourceByHistoryEntryId(
|
||||
HostHistory.class,
|
||||
hostToPendingDeposits.getKey(),
|
||||
hostToPendingDeposits.getValue().getOnly(REVISION_ID));
|
||||
DepositFragment fragment =
|
||||
marshaller.marshalSubordinateHost(host, superordinateDomain);
|
||||
Streams.stream(hostToPendingDeposits.getValue().getAll(PENDING_DEPOSIT))
|
||||
.distinct()
|
||||
.forEach(
|
||||
pendingDeposit -> {
|
||||
subordinateHostFragmentCounter.inc();
|
||||
results.add(KV.of(pendingDeposit, fragment));
|
||||
});
|
||||
}
|
||||
return results.build();
|
||||
}));
|
||||
}
|
||||
|
||||
/**
|
||||
* Decodes the pipeline option extracted from the URL parameter sent by the pipeline launcher to
|
||||
* the original TLD to pending deposit map.
|
||||
* the original pending deposit set.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
static ImmutableSetMultimap<String, PendingDeposit> decodePendings(String encodedPending) {
|
||||
static ImmutableSet<PendingDeposit> decodePendingDeposits(String encodedPendingDeposits) {
|
||||
try (ObjectInputStream ois =
|
||||
new ObjectInputStream(
|
||||
new ByteArrayInputStream(
|
||||
BaseEncoding.base64Url().omitPadding().decode(encodedPending)))) {
|
||||
return (ImmutableSetMultimap<String, PendingDeposit>) ois.readObject();
|
||||
BaseEncoding.base64Url().omitPadding().decode(encodedPendingDeposits)))) {
|
||||
return (ImmutableSet<PendingDeposit>) ois.readObject();
|
||||
} catch (IOException | ClassNotFoundException e) {
|
||||
throw new IllegalArgumentException("Unable to parse encoded pending deposit map.", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Encodes the TLD to pending deposit map in an URL safe string that is sent to the pipeline
|
||||
* worker by the pipeline launcher as a pipeline option.
|
||||
* Encodes the pending deposit set in an URL safe string that is sent to the pipeline worker by
|
||||
* the pipeline launcher as a pipeline option.
|
||||
*/
|
||||
public static String encodePendings(ImmutableSetMultimap<String, PendingDeposit> pendings)
|
||||
public static String encodePendingDeposits(ImmutableSet<PendingDeposit> pendingDeposits)
|
||||
throws IOException {
|
||||
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
|
||||
ObjectOutputStream oos = new ObjectOutputStream(baos);
|
||||
oos.writeObject(pendings);
|
||||
oos.writeObject(pendingDeposits);
|
||||
oos.flush();
|
||||
return BaseEncoding.base64Url().omitPadding().encode(baos.toByteArray());
|
||||
}
|
||||
|
@ -284,14 +686,40 @@ public class RdePipeline implements Serializable {
|
|||
PipelineOptionsFactory.register(RdePipelineOptions.class);
|
||||
RdePipelineOptions options =
|
||||
PipelineOptionsFactory.fromArgs(args).withValidation().as(RdePipelineOptions.class);
|
||||
// RegistryPipelineWorkerInitializer only initializes before pipeline executions, after the
|
||||
// main() function constructed the graph. We need the registry environment set up so that we
|
||||
// can create a CloudTasksUtils which uses the environment-dependent config file.
|
||||
options.getRegistryEnvironment().setup();
|
||||
RegistryPipelineOptions.validateRegistryPipelineOptions(options);
|
||||
options.setIsolationOverride(TransactionIsolationLevel.TRANSACTION_READ_COMMITTED);
|
||||
DaggerRdePipeline_RdePipelineComponent.builder().options(options).build().rdePipeline().run();
|
||||
}
|
||||
|
||||
/**
|
||||
* A utility class that contains {@link TupleTag}s when {@link PCollectionTuple}s and {@link
|
||||
* CoGbkResult}s are used.
|
||||
*/
|
||||
protected abstract static class TupleTags {
|
||||
protected static final TupleTag<KV<PendingDeposit, DepositFragment>> DOMAIN_FRAGMENTS =
|
||||
new TupleTag<KV<PendingDeposit, DepositFragment>>() {};
|
||||
|
||||
protected static final TupleTag<KV<String, PendingDeposit>> REFERENCED_CONTACTS =
|
||||
new TupleTag<KV<String, PendingDeposit>>() {};
|
||||
|
||||
protected static final TupleTag<KV<String, PendingDeposit>> REFERENCED_HOSTS =
|
||||
new TupleTag<KV<String, PendingDeposit>>() {};
|
||||
|
||||
protected static final TupleTag<KV<String, KV<String, CoGbkResult>>> SUPERORDINATE_DOMAINS =
|
||||
new TupleTag<KV<String, KV<String, CoGbkResult>>>() {};
|
||||
|
||||
protected static final TupleTag<KV<PendingDeposit, DepositFragment>> EXTERNAL_HOST_FRAGMENTS =
|
||||
new TupleTag<KV<PendingDeposit, DepositFragment>>() {};
|
||||
|
||||
protected static final TupleTag<PendingDeposit> PENDING_DEPOSIT =
|
||||
new TupleTag<PendingDeposit>() {};
|
||||
|
||||
protected static final TupleTag<KV<String, CoGbkResult>> HOST_TO_PENDING_DEPOSIT =
|
||||
new TupleTag<KV<String, CoGbkResult>>() {};
|
||||
|
||||
protected static final TupleTag<Long> REVISION_ID = new TupleTag<Long>() {};
|
||||
}
|
||||
|
||||
@Singleton
|
||||
@Component(
|
||||
modules = {
|
||||
|
|
|
@ -61,7 +61,8 @@ import org.json.JSONObject;
|
|||
/**
|
||||
* Definition of a Dataflow Flex template, which generates a given month's spec11 report.
|
||||
*
|
||||
* <p>To stage this template locally, run the {@code stage_beam_pipeline.sh} shell script.
|
||||
* <p>To stage this template locally, run {@code ./nom_build :core:sBP --environment=alpha
|
||||
* --pipeline=spec11}.
|
||||
*
|
||||
* <p>Then, you can run the staged template via the API client library, gCloud or a raw REST call.
|
||||
*
|
||||
|
|
|
@ -557,11 +557,23 @@ public final class RegistryConfig {
|
|||
return config.registryPolicy.requireSslCertificates;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the GCE machine type that a CPU-demanding pipeline should use.
|
||||
*
|
||||
* @see google.registry.beam.rde.RdePipeline
|
||||
*/
|
||||
@Provides
|
||||
@Config("highPerformanceMachineType")
|
||||
public static String provideHighPerformanceMachineType(RegistryConfigSettings config) {
|
||||
return config.beam.highPerformanceMachineType;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the default job region to run Apache Beam (Cloud Dataflow) jobs in.
|
||||
*
|
||||
* @see google.registry.beam.invoicing.InvoicingPipeline
|
||||
* @see google.registry.beam.spec11.Spec11Pipeline
|
||||
* @see google.registry.beam.invoicing.InvoicingPipeline
|
||||
*/
|
||||
@Provides
|
||||
@Config("defaultJobRegion")
|
||||
|
|
|
@ -133,6 +133,7 @@ public class RegistryConfigSettings {
|
|||
/** Configuration for Apache Beam (Cloud Dataflow). */
|
||||
public static class Beam {
|
||||
public String defaultJobRegion;
|
||||
public String highPerformanceMachineType;
|
||||
public String stagingBucketUrl;
|
||||
}
|
||||
|
||||
|
|
|
@ -419,6 +419,14 @@ misc:
|
|||
beam:
|
||||
# The default region to run Apache Beam (Cloud Dataflow) jobs in.
|
||||
defaultJobRegion: us-east1
|
||||
# The GCE machine type to use when a job is CPU-intensive (e. g. RDE). Be sure
|
||||
# to check the VM CPU quota for the job region. In a massively parallel
|
||||
# pipeline this quota can be easily reached and needs to be raised, otherwise
|
||||
# the job will run very slowly. Also note that there is a separate quota for
|
||||
# external IPv4 address in a region, which means that machine type with higher
|
||||
# core count per machine may be preferable in order to preserve IP addresses.
|
||||
# See: https://cloud.google.com/compute/quotas#cpu_quota
|
||||
highPerformanceMachineType: n2-standard-4
|
||||
stagingBucketUrl: gcs-bucket-with-staged-templates
|
||||
|
||||
keyring:
|
||||
|
|
|
@ -14,12 +14,14 @@
|
|||
|
||||
package google.registry.rde;
|
||||
|
||||
import static com.google.common.collect.ImmutableSetMultimap.toImmutableSetMultimap;
|
||||
import static google.registry.beam.BeamUtils.createJobName;
|
||||
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
|
||||
import static google.registry.request.Action.Method.GET;
|
||||
import static google.registry.request.Action.Method.POST;
|
||||
import static google.registry.xml.ValidationMode.LENIENT;
|
||||
import static google.registry.xml.ValidationMode.STRICT;
|
||||
import static java.util.function.Function.identity;
|
||||
import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
|
||||
import static javax.servlet.http.HttpServletResponse.SC_NO_CONTENT;
|
||||
import static javax.servlet.http.HttpServletResponse.SC_OK;
|
||||
|
@ -29,6 +31,7 @@ import com.google.api.services.dataflow.model.LaunchFlexTemplateParameter;
|
|||
import com.google.api.services.dataflow.model.LaunchFlexTemplateRequest;
|
||||
import com.google.api.services.dataflow.model.LaunchFlexTemplateResponse;
|
||||
import com.google.common.base.Ascii;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
|
@ -48,6 +51,7 @@ import google.registry.model.EppResource;
|
|||
import google.registry.model.common.Cursor;
|
||||
import google.registry.model.common.Cursor.CursorType;
|
||||
import google.registry.model.contact.ContactResource;
|
||||
import google.registry.model.domain.DomainBase;
|
||||
import google.registry.model.host.HostResource;
|
||||
import google.registry.model.index.EppResourceIndex;
|
||||
import google.registry.model.rde.RdeMode;
|
||||
|
@ -67,14 +71,17 @@ import org.joda.time.DateTime;
|
|||
import org.joda.time.Duration;
|
||||
|
||||
/**
|
||||
* MapReduce that idempotently stages escrow deposit XML files on GCS for RDE/BRDA for all TLDs.
|
||||
* Action that kicks off either a MapReduce (for Datastore) or Dataflow (for Cloud SQL) job to stage
|
||||
* escrow deposit XML files on GCS for RDE/BRDA for all TLDs.
|
||||
*
|
||||
* <h3>MapReduce Operation</h3>
|
||||
* <h3>Pending Deposits</h3>
|
||||
*
|
||||
* <p>This task starts by asking {@link PendingDepositChecker} which deposits need to be generated.
|
||||
* If there's nothing to deposit, we return 204 No Content; otherwise, we fire off a MapReduce job
|
||||
* and redirect to its status GUI. The task can also be run in manual operation, as described below.
|
||||
*
|
||||
* <h3>MapReduce</h3>
|
||||
*
|
||||
* <p>The mapreduce job scans every {@link EppResource} in Datastore. It maps a point-in-time
|
||||
* representation of each entity to the escrow XML files in which it should appear.
|
||||
*
|
||||
|
@ -88,11 +95,26 @@ import org.joda.time.Duration;
|
|||
* <p>{@link Registrar} entities, both active and inactive, are included in all deposits. They are
|
||||
* not rewinded point-in-time.
|
||||
*
|
||||
* <h3>Dataflow</h3>
|
||||
*
|
||||
* The Dataflow job finds the most recent history entry on or before watermark for each resource
|
||||
* type and loads the embedded resource from it, which is then projected to watermark time to
|
||||
* account for things like pending transfer.
|
||||
*
|
||||
* <p>Only {@link ContactResource}s and {@link HostResource}s that are referenced by an included
|
||||
* {@link DomainBase} will be included in the corresponding pending deposit.
|
||||
*
|
||||
* <p>{@link Registrar} entities, both active and inactive, are included in all deposits. They are
|
||||
* not rewinded point-in-time.
|
||||
*
|
||||
* <h3>Afterward</h3>
|
||||
*
|
||||
* <p>The XML deposit files generated by this job are humongous. A tiny XML report file is generated
|
||||
* for each deposit, telling us how much of what it contains.
|
||||
*
|
||||
* <p>Once a deposit is successfully generated, an {@link RdeUploadAction} is enqueued which will
|
||||
* upload it via SFTP to the third-party escrow provider.
|
||||
* <p>Once a deposit is successfully generated, For RDE an {@link RdeUploadAction} is enqueued which
|
||||
* will upload it via SFTP to the third-party escrow provider; for BRDA an {@link BrdaCopyAction} is
|
||||
* enqueued which will copy it to a GCS bucket and be rsynced to a third-party escrow provider.
|
||||
*
|
||||
* <p>To generate escrow deposits manually and locally, use the {@code nomulus} tool command {@code
|
||||
* GenerateEscrowDepositCommand}.
|
||||
|
@ -106,7 +128,7 @@ import org.joda.time.Duration;
|
|||
*
|
||||
* <p>Valid model objects might not be valid to the RDE XML schema. A single invalid object will
|
||||
* cause the whole deposit to fail. You need to check the logs, find out which entities are broken,
|
||||
* and perform Datastore surgery.
|
||||
* and perform database surgery.
|
||||
*
|
||||
* <p>If a deposit fails, an error is emitted to the logs for each broken entity. It tells you the
|
||||
* key and shows you its representation in lenient XML.
|
||||
|
@ -137,33 +159,40 @@ import org.joda.time.Duration;
|
|||
* <p>The deposit and report are encrypted using {@link Ghostryde}. Administrators can use the
|
||||
* {@code GhostrydeCommand} command in the {@code nomulus} tool to view them.
|
||||
*
|
||||
* <p>Unencrypted XML fragments are stored temporarily between the map and reduce steps. The
|
||||
* ghostryde encryption on the full archived deposits makes life a little more difficult for an
|
||||
* attacker. But security ultimately depends on the bucket.
|
||||
* <p>Unencrypted XML fragments are stored temporarily between the map and reduce steps and between
|
||||
* Dataflow transforms. The ghostryde encryption on the full archived deposits makes life a little
|
||||
* more difficult for an attacker. But security ultimately depends on the bucket.
|
||||
*
|
||||
* <h3>Idempotency</h3>
|
||||
*
|
||||
* <p>We lock the reduce tasks. This is necessary because: a) App Engine tasks might get double
|
||||
* executed; and b) Cloud Storage file handles get committed on close <i>even if our code throws an
|
||||
* exception.</i>
|
||||
* <p>We lock the reduce tasks for the MapReduce job. This is necessary because: a) App Engine tasks
|
||||
* might get double executed; and b) Cloud Storage file handles get committed on close <i>even if
|
||||
* our code throws an exception.</i>
|
||||
*
|
||||
* <p>For the Dataflow job we do not employ a lock because it is difficult to span a lock across
|
||||
* three subsequent transforms (save to GCS, roll forward cursor, enqueue next action). Instead, we
|
||||
* get around the issue by saving the deposit to a unique folder named after the job name so there
|
||||
* is no possibility of overwriting.
|
||||
*
|
||||
* <p>Deposits are generated serially for a given (watermark, mode) pair. A deposit is never started
|
||||
* beyond the cursor. Once a deposit is completed, its cursor is rolled forward transactionally.
|
||||
* Duplicate jobs may exist {@code <=cursor}. So a transaction will not bother changing the cursor
|
||||
* if it's already been rolled forward.
|
||||
*
|
||||
* <p>Enqueuing {@code RdeUploadAction} is also part of the cursor transaction. This is necessary
|
||||
* because the first thing the upload task does is check the staging cursor to verify it's been
|
||||
* completed, so we can't enqueue before we roll. We also can't enqueue after the roll, because then
|
||||
* if enqueuing fails, the upload might never be enqueued.
|
||||
* <p>Enqueuing {@code RdeUploadAction} or {@code BrdaCopyAction} is also part of the cursor
|
||||
* transaction. This is necessary because the first thing the upload task does is check the staging
|
||||
* cursor to verify it's been completed, so we can't enqueue before we roll. We also can't enqueue
|
||||
* after the roll, because then if enqueuing fails, the upload might never be enqueued.
|
||||
*
|
||||
* <h3>Determinism</h3>
|
||||
*
|
||||
* <p>The filename of an escrow deposit is determistic for a given (TLD, watermark, {@linkplain
|
||||
* RdeMode mode}) triplet. Its generated contents is deterministic in all the ways that we care
|
||||
* about. Its view of the database is strongly consistent.
|
||||
* about. Its view of the database is strongly consistent in Cloud SQL automatically by nature of
|
||||
* the initial query for the history entry running at {@code READ_COMMITTED} transaction isolation
|
||||
* level.
|
||||
*
|
||||
* <p>This is because:
|
||||
* <p>This is also true in Datastore because:
|
||||
*
|
||||
* <ol>
|
||||
* <li>{@code EppResource} queries are strongly consistent thanks to {@link EppResourceIndex}
|
||||
|
@ -226,6 +255,11 @@ public final class RdeStagingAction implements Runnable {
|
|||
@Inject MapreduceRunner mrRunner;
|
||||
@Inject @Config("projectId") String projectId;
|
||||
@Inject @Config("defaultJobRegion") String jobRegion;
|
||||
|
||||
@Inject
|
||||
@Config("highPerformanceMachineType")
|
||||
String machineType;
|
||||
|
||||
@Inject @Config("transactionCooldown") Duration transactionCooldown;
|
||||
@Inject @Config("beamStagingBucketUrl") String stagingBucketUrl;
|
||||
@Inject @Config("rdeBucket") String rdeBucket;
|
||||
|
@ -270,43 +304,65 @@ public final class RdeStagingAction implements Runnable {
|
|||
new NullInput<>(), EppResourceInputs.createEntityInput(EppResource.class)))
|
||||
.sendLinkToMapreduceConsole(response);
|
||||
} else {
|
||||
try {
|
||||
LaunchFlexTemplateParameter parameter =
|
||||
new LaunchFlexTemplateParameter()
|
||||
.setJobName(createJobName("rde", clock))
|
||||
.setContainerSpecGcsPath(
|
||||
String.format("%s/%s_metadata.json", stagingBucketUrl, PIPELINE_NAME))
|
||||
.setParameters(
|
||||
ImmutableMap.of(
|
||||
"pendings",
|
||||
RdePipeline.encodePendings(pendings),
|
||||
"validationMode",
|
||||
validationMode.name(),
|
||||
"rdeStagingBucket",
|
||||
rdeBucket,
|
||||
"stagingKey",
|
||||
BaseEncoding.base64Url().omitPadding().encode(stagingKeyBytes),
|
||||
"registryEnvironment",
|
||||
RegistryEnvironment.get().name()));
|
||||
LaunchFlexTemplateResponse launchResponse =
|
||||
dataflow
|
||||
.projects()
|
||||
.locations()
|
||||
.flexTemplates()
|
||||
.launch(
|
||||
projectId,
|
||||
jobRegion,
|
||||
new LaunchFlexTemplateRequest().setLaunchParameter(parameter))
|
||||
.execute();
|
||||
logger.atInfo().log("Got response: %s", launchResponse.getJob().toPrettyString());
|
||||
response.setStatus(SC_OK);
|
||||
response.setPayload(
|
||||
String.format("Launched RDE pipeline: %s", launchResponse.getJob().getId()));
|
||||
} catch (IOException e) {
|
||||
logger.atWarning().withCause(e).log("Pipeline Launch failed");
|
||||
response.setStatus(SC_INTERNAL_SERVER_ERROR);
|
||||
response.setPayload(String.format("Pipeline launch failed: %s", e.getMessage()));
|
||||
}
|
||||
ImmutableList.Builder<String> jobNameBuilder = new ImmutableList.Builder<>();
|
||||
pendings.values().stream()
|
||||
.collect(toImmutableSetMultimap(PendingDeposit::watermark, identity()))
|
||||
.asMap()
|
||||
.forEach(
|
||||
(watermark, pendingDeposits) -> {
|
||||
try {
|
||||
LaunchFlexTemplateParameter parameter =
|
||||
new LaunchFlexTemplateParameter()
|
||||
.setJobName(
|
||||
createJobName(
|
||||
String.format(
|
||||
"rde-%s", watermark.toString("yyyy-MM-dd't'HH-mm-ss'z'")),
|
||||
clock))
|
||||
.setContainerSpecGcsPath(
|
||||
String.format("%s/%s_metadata.json", stagingBucketUrl, PIPELINE_NAME))
|
||||
.setParameters(
|
||||
new ImmutableMap.Builder<String, String>()
|
||||
.put(
|
||||
"pendings",
|
||||
RdePipeline.encodePendingDeposits(
|
||||
ImmutableSet.copyOf(pendingDeposits)))
|
||||
.put("validationMode", validationMode.name())
|
||||
.put("rdeStagingBucket", rdeBucket)
|
||||
.put(
|
||||
"stagingKey",
|
||||
BaseEncoding.base64Url()
|
||||
.omitPadding()
|
||||
.encode(stagingKeyBytes))
|
||||
.put("registryEnvironment", RegistryEnvironment.get().name())
|
||||
.put("workerMachineType", machineType)
|
||||
// TODO (jianglai): Investigate turning off public IPs (for which
|
||||
// there is a quota) in order to increase the total number of
|
||||
// workers allowed (also under quota).
|
||||
// See:
|
||||
// https://cloud.google.com/dataflow/docs/guides/routes-firewall
|
||||
.put("usePublicIps", "true")
|
||||
.build());
|
||||
LaunchFlexTemplateResponse launchResponse =
|
||||
dataflow
|
||||
.projects()
|
||||
.locations()
|
||||
.flexTemplates()
|
||||
.launch(
|
||||
projectId,
|
||||
jobRegion,
|
||||
new LaunchFlexTemplateRequest().setLaunchParameter(parameter))
|
||||
.execute();
|
||||
logger.atInfo().log("Got response: %s", launchResponse.getJob().toPrettyString());
|
||||
jobNameBuilder.add(launchResponse.getJob().getId());
|
||||
} catch (IOException e) {
|
||||
logger.atWarning().withCause(e).log("Pipeline Launch failed");
|
||||
response.setStatus(SC_INTERNAL_SERVER_ERROR);
|
||||
response.setPayload(String.format("Pipeline launch failed: %s", e.getMessage()));
|
||||
}
|
||||
});
|
||||
response.setStatus(SC_OK);
|
||||
response.setPayload(
|
||||
String.format("Launched RDE pipeline: %s", Joiner.on(", ").join(jobNameBuilder.build())));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -349,7 +405,7 @@ public final class RdeStagingAction implements Runnable {
|
|||
throw new BadRequestException("Directory must not start with a slash");
|
||||
}
|
||||
String directoryWithTrailingSlash =
|
||||
directory.get().endsWith("/") ? directory.get() : (directory.get() + '/');
|
||||
directory.get().endsWith("/") ? directory.get() : directory.get() + '/';
|
||||
|
||||
if (modeStrings.isEmpty()) {
|
||||
throw new BadRequestException("Mode parameter required in manual operation");
|
||||
|
@ -381,7 +437,7 @@ public final class RdeStagingAction implements Runnable {
|
|||
}
|
||||
}
|
||||
|
||||
if (revision.isPresent() && (revision.get() < 0)) {
|
||||
if (revision.isPresent() && revision.get() < 0) {
|
||||
throw new BadRequestException("Revision must be greater than or equal to zero");
|
||||
}
|
||||
|
||||
|
@ -394,11 +450,7 @@ public final class RdeStagingAction implements Runnable {
|
|||
pendingsBuilder.put(
|
||||
tld,
|
||||
PendingDeposit.createInManualOperation(
|
||||
tld,
|
||||
watermark,
|
||||
mode,
|
||||
directoryWithTrailingSlash,
|
||||
revision.orElse(null)));
|
||||
tld, watermark, mode, directoryWithTrailingSlash, revision.orElse(null)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -43,6 +43,22 @@
|
|||
"regexes": [
|
||||
"[A-Za-z0-9\\-_]+"
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "workerMachineType",
|
||||
"label": "The GCE machine type for the dataflow job workers.",
|
||||
"helpText": "See https://cloud.google.com/dataflow/quotas#compute-engine-quotas for available machine types.",
|
||||
"regexes": [
|
||||
"[a-z0-9\\-]+"
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "usePublicIps",
|
||||
"label": "Whether the GCE workers are assigned public IPs",
|
||||
"helpText": "Public IPs have an associated cost and there's a quota per region on the total number of public IPs assigned at a given time. If the service only needs to access GCP APIs, it's better to not use public IP, but one needs to configure the network accordingly. See https://cloud.google.com/dataflow/docs/guides/routes-firewall.",
|
||||
"regexes": [
|
||||
"true|false"
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
|
|
|
@ -29,8 +29,10 @@ import com.google.api.services.dataflow.model.LaunchFlexTemplateRequest;
|
|||
import com.google.api.services.dataflow.model.LaunchFlexTemplateResponse;
|
||||
import google.registry.testing.FakeResponse;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.junit.jupiter.MockitoSettings;
|
||||
import org.mockito.quality.Strictness;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
/** Base class for all actions that launches a Dataflow Flex template. */
|
||||
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
|
||||
|
@ -42,8 +44,19 @@ public abstract class BeamActionTestBase {
|
|||
private Locations locations = mock(Locations.class);
|
||||
protected FlexTemplates templates = mock(FlexTemplates.class);
|
||||
protected Launch launch = mock(Launch.class);
|
||||
private LaunchFlexTemplateResponse launchResponse =
|
||||
new LaunchFlexTemplateResponse().setJob(new Job().setId("jobid"));
|
||||
private Answer<LaunchFlexTemplateResponse> answer =
|
||||
new Answer<LaunchFlexTemplateResponse>() {
|
||||
private Integer times = 0;
|
||||
|
||||
@Override
|
||||
public LaunchFlexTemplateResponse answer(InvocationOnMock invocation) throws Throwable {
|
||||
LaunchFlexTemplateResponse response =
|
||||
new LaunchFlexTemplateResponse()
|
||||
.setJob(new Job().setId("jobid" + (times == 0 ? "" : times.toString())));
|
||||
times = times + 1;
|
||||
return response;
|
||||
}
|
||||
};
|
||||
|
||||
@BeforeEach
|
||||
protected void beforeEach() throws Exception {
|
||||
|
@ -52,6 +65,6 @@ public abstract class BeamActionTestBase {
|
|||
when(locations.flexTemplates()).thenReturn(templates);
|
||||
when(templates.launch(anyString(), anyString(), any(LaunchFlexTemplateRequest.class)))
|
||||
.thenReturn(launch);
|
||||
when(launch.execute()).thenReturn(launchResponse);
|
||||
when(launch.execute()).thenAnswer(answer);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,8 +17,8 @@ package google.registry.beam.rde;
|
|||
import static com.google.common.base.Preconditions.checkState;
|
||||
import static com.google.common.collect.ImmutableSet.toImmutableSet;
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
import static google.registry.beam.rde.RdePipeline.decodePendings;
|
||||
import static google.registry.beam.rde.RdePipeline.encodePendings;
|
||||
import static google.registry.beam.rde.RdePipeline.decodePendingDeposits;
|
||||
import static google.registry.beam.rde.RdePipeline.encodePendingDeposits;
|
||||
import static google.registry.model.common.Cursor.CursorType.RDE_STAGING;
|
||||
import static google.registry.model.rde.RdeMode.FULL;
|
||||
import static google.registry.model.rde.RdeMode.THIN;
|
||||
|
@ -27,25 +27,26 @@ import static google.registry.rde.RdeResourceType.CONTACT;
|
|||
import static google.registry.rde.RdeResourceType.DOMAIN;
|
||||
import static google.registry.rde.RdeResourceType.HOST;
|
||||
import static google.registry.rde.RdeResourceType.REGISTRAR;
|
||||
import static google.registry.testing.AppEngineExtension.loadInitialData;
|
||||
import static google.registry.testing.AppEngineExtension.makeRegistrar1;
|
||||
import static google.registry.testing.AppEngineExtension.makeRegistrar2;
|
||||
import static google.registry.testing.DatabaseHelper.createTld;
|
||||
import static google.registry.testing.DatabaseHelper.newHostResource;
|
||||
import static google.registry.testing.DatabaseHelper.insertSimpleResources;
|
||||
import static google.registry.testing.DatabaseHelper.newDomainBase;
|
||||
import static google.registry.testing.DatabaseHelper.persistActiveContact;
|
||||
import static google.registry.testing.DatabaseHelper.persistActiveDomain;
|
||||
import static google.registry.testing.DatabaseHelper.persistDeletedDomain;
|
||||
import static google.registry.testing.DatabaseHelper.persistActiveHost;
|
||||
import static google.registry.testing.DatabaseHelper.persistEppResource;
|
||||
import static google.registry.testing.DatabaseHelper.persistNewRegistrar;
|
||||
import static google.registry.testing.DatabaseHelper.persistResource;
|
||||
import static google.registry.util.ResourceUtils.readResourceUtf8;
|
||||
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||
import static org.joda.time.Duration.standardDays;
|
||||
import static org.junit.Assert.assertThrows;
|
||||
|
||||
import com.google.cloud.storage.BlobId;
|
||||
import com.google.cloud.storage.contrib.nio.testing.LocalStorageHelper;
|
||||
import com.google.common.base.Splitter;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSetMultimap;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Streams;
|
||||
import com.google.common.io.BaseEncoding;
|
||||
import google.registry.beam.TestPipelineExtension;
|
||||
|
@ -53,12 +54,26 @@ import google.registry.gcs.GcsUtils;
|
|||
import google.registry.keyring.api.PgpHelper;
|
||||
import google.registry.model.common.Cursor;
|
||||
import google.registry.model.common.Cursor.CursorType;
|
||||
import google.registry.model.contact.ContactBase;
|
||||
import google.registry.model.contact.ContactHistory;
|
||||
import google.registry.model.contact.ContactResource;
|
||||
import google.registry.model.domain.DesignatedContact;
|
||||
import google.registry.model.domain.DomainBase;
|
||||
import google.registry.model.domain.DomainContent;
|
||||
import google.registry.model.domain.DomainHistory;
|
||||
import google.registry.model.domain.Period;
|
||||
import google.registry.model.eppcommon.Trid;
|
||||
import google.registry.model.host.HostBase;
|
||||
import google.registry.model.host.HostHistory;
|
||||
import google.registry.model.host.HostResource;
|
||||
import google.registry.model.rde.RdeMode;
|
||||
import google.registry.model.rde.RdeRevision;
|
||||
import google.registry.model.rde.RdeRevision.RdeRevisionId;
|
||||
import google.registry.model.registrar.Registrar;
|
||||
import google.registry.model.registrar.Registrar.State;
|
||||
import google.registry.model.reporting.DomainTransactionRecord;
|
||||
import google.registry.model.reporting.DomainTransactionRecord.TransactionReportField;
|
||||
import google.registry.model.reporting.HistoryEntry;
|
||||
import google.registry.model.tld.Registry;
|
||||
import google.registry.persistence.VKey;
|
||||
import google.registry.persistence.transaction.JpaTestExtensions;
|
||||
|
@ -111,14 +126,11 @@ public class RdePipelineTest {
|
|||
// This is teh default as-of time the RDE/BRDA job.
|
||||
private final DateTime now = DateTime.parse("2000-01-01TZ");
|
||||
|
||||
private final ImmutableSetMultimap<String, PendingDeposit> pendings =
|
||||
ImmutableSetMultimap.of(
|
||||
"soy",
|
||||
PendingDeposit.create("soy", now, FULL, RDE_STAGING, standardDays(1)),
|
||||
"soy",
|
||||
PendingDeposit.create("soy", now, THIN, RDE_STAGING, standardDays(1)),
|
||||
"fun",
|
||||
PendingDeposit.create("fun", now, FULL, RDE_STAGING, standardDays(1)));
|
||||
private final ImmutableSet<PendingDeposit> pendings =
|
||||
ImmutableSet.of(
|
||||
PendingDeposit.create("soy", now, FULL, RDE_STAGING, Duration.standardDays(1)),
|
||||
PendingDeposit.create("soy", now, THIN, RDE_STAGING, Duration.standardDays(1)),
|
||||
PendingDeposit.create("fun", now, FULL, RDE_STAGING, Duration.standardDays(1)));
|
||||
|
||||
private final ImmutableList<DepositFragment> brdaFragments =
|
||||
ImmutableList.of(
|
||||
|
@ -129,8 +141,8 @@ public class RdePipelineTest {
|
|||
ImmutableList.of(
|
||||
DepositFragment.create(RdeResourceType.DOMAIN, "<rdeDomain:domain/>\n", ""),
|
||||
DepositFragment.create(RdeResourceType.REGISTRAR, "<rdeRegistrar:registrar/>\n", ""),
|
||||
DepositFragment.create(RdeResourceType.CONTACT, "<rdeContact:contact/>\n", ""),
|
||||
DepositFragment.create(RdeResourceType.HOST, "<rdeHost:host/>\n", ""));
|
||||
DepositFragment.create(CONTACT, "<rdeContact:contact/>\n", ""),
|
||||
DepositFragment.create(HOST, "<rdeHost:host/>\n", ""));
|
||||
|
||||
private final GcsUtils gcsUtils = new GcsUtils(LocalStorageHelper.getOptions());
|
||||
|
||||
|
@ -164,9 +176,68 @@ public class RdePipelineTest {
|
|||
|
||||
private RdePipeline rdePipeline;
|
||||
|
||||
private ContactHistory persistContactHistory(ContactBase contact) {
|
||||
return persistResource(
|
||||
new ContactHistory.Builder()
|
||||
.setType(HistoryEntry.Type.HOST_CREATE)
|
||||
.setXmlBytes("<xml></xml>".getBytes(UTF_8))
|
||||
.setModificationTime(clock.nowUtc())
|
||||
.setRegistrarId("TheRegistrar")
|
||||
.setTrid(Trid.create("ABC-123", "server-trid"))
|
||||
.setBySuperuser(false)
|
||||
.setReason("reason")
|
||||
.setRequestedByRegistrar(true)
|
||||
.setContact(contact)
|
||||
.setContactRepoId(contact.getRepoId())
|
||||
.build());
|
||||
}
|
||||
|
||||
private DomainHistory persistDomainHistory(DomainContent domain) {
|
||||
DomainTransactionRecord transactionRecord =
|
||||
new DomainTransactionRecord.Builder()
|
||||
.setTld("soy")
|
||||
.setReportingTime(clock.nowUtc())
|
||||
.setReportField(TransactionReportField.NET_ADDS_1_YR)
|
||||
.setReportAmount(1)
|
||||
.build();
|
||||
|
||||
return persistResource(
|
||||
new DomainHistory.Builder()
|
||||
.setType(HistoryEntry.Type.DOMAIN_CREATE)
|
||||
.setXmlBytes("<xml></xml>".getBytes(UTF_8))
|
||||
.setModificationTime(clock.nowUtc())
|
||||
.setRegistrarId("TheRegistrar")
|
||||
.setTrid(Trid.create("ABC-123", "server-trid"))
|
||||
.setBySuperuser(false)
|
||||
.setReason("reason")
|
||||
.setRequestedByRegistrar(true)
|
||||
.setDomain(domain)
|
||||
.setDomainRepoId(domain.getRepoId())
|
||||
.setDomainTransactionRecords(ImmutableSet.of(transactionRecord))
|
||||
.setOtherRegistrarId("otherClient")
|
||||
.setPeriod(Period.create(1, Period.Unit.YEARS))
|
||||
.build());
|
||||
}
|
||||
|
||||
private HostHistory persistHostHistory(HostBase hostBase) {
|
||||
return persistResource(
|
||||
new HostHistory.Builder()
|
||||
.setType(HistoryEntry.Type.HOST_CREATE)
|
||||
.setXmlBytes("<xml></xml>".getBytes(UTF_8))
|
||||
.setModificationTime(clock.nowUtc())
|
||||
.setRegistrarId("TheRegistrar")
|
||||
.setTrid(Trid.create("ABC-123", "server-trid"))
|
||||
.setBySuperuser(false)
|
||||
.setReason("reason")
|
||||
.setRequestedByRegistrar(true)
|
||||
.setHost(hostBase)
|
||||
.setHostRepoId(hostBase.getRepoId())
|
||||
.build());
|
||||
}
|
||||
|
||||
@BeforeEach
|
||||
void beforeEach() throws Exception {
|
||||
loadInitialData();
|
||||
insertSimpleResources(ImmutableList.of(makeRegistrar1(), makeRegistrar2()));
|
||||
|
||||
// Two real registrars have been created by loadInitialData(), named "New Registrar" and "The
|
||||
// Registrar". Create one included registrar (external_monitoring) and two excluded ones.
|
||||
|
@ -192,26 +263,89 @@ public class RdePipelineTest {
|
|||
tm().transact(
|
||||
() -> {
|
||||
tm().put(Cursor.create(CursorType.BRDA, now, Registry.get("soy")));
|
||||
tm().put(Cursor.create(CursorType.RDE_STAGING, now, Registry.get("soy")));
|
||||
tm().put(Cursor.create(RDE_STAGING, now, Registry.get("soy")));
|
||||
RdeRevision.saveRevision("soy", now, THIN, 0);
|
||||
RdeRevision.saveRevision("soy", now, FULL, 0);
|
||||
});
|
||||
|
||||
// Also persists a "contact1234" contact in the process.
|
||||
persistActiveDomain("hello.soy");
|
||||
persistActiveDomain("kitty.fun");
|
||||
// Should not appear anywhere.
|
||||
persistActiveDomain("lol.cat");
|
||||
persistDeletedDomain("deleted.soy", DateTime.parse("1999-12-30TZ"));
|
||||
// This contact is never referenced.
|
||||
persistContactHistory(persistActiveContact("contactX"));
|
||||
ContactResource contact1 = persistActiveContact("contact1234");
|
||||
persistContactHistory(contact1);
|
||||
ContactResource contact2 = persistActiveContact("contact456");
|
||||
persistContactHistory(contact2);
|
||||
|
||||
persistActiveContact("contact456");
|
||||
// This host is never referenced.
|
||||
persistHostHistory(persistActiveHost("ns0.domain.tld"));
|
||||
HostResource host1 = persistActiveHost("ns1.external.tld");
|
||||
persistHostHistory(host1);
|
||||
DomainBase helloDomain =
|
||||
persistEppResource(
|
||||
newDomainBase("hello.soy", contact1)
|
||||
.asBuilder()
|
||||
.addNameserver(host1.createVKey())
|
||||
.build());
|
||||
persistDomainHistory(helloDomain);
|
||||
persistHostHistory(persistActiveHost("not-used-subordinate.hello.soy"));
|
||||
HostResource host2 = persistActiveHost("ns1.hello.soy");
|
||||
persistHostHistory(host2);
|
||||
DomainBase kittyDomain =
|
||||
persistEppResource(
|
||||
newDomainBase("kitty.fun", contact2)
|
||||
.asBuilder()
|
||||
.addNameservers(ImmutableSet.of(host1.createVKey(), host2.createVKey()))
|
||||
.build());
|
||||
persistDomainHistory(kittyDomain);
|
||||
// Should not appear because the TLD is not included in a pending deposit.
|
||||
persistDomainHistory(persistEppResource(newDomainBase("lol.cat", contact1)));
|
||||
// To be deleted.
|
||||
DomainBase deletedDomain = persistActiveDomain("deleted.soy");
|
||||
persistDomainHistory(deletedDomain);
|
||||
|
||||
HostResource host = persistEppResource(newHostResource("old.host.test"));
|
||||
// Set the clock to 2000-01-02, the updated host should NOT show up in RDE.
|
||||
// Advance time
|
||||
clock.advanceOneMilli();
|
||||
persistDomainHistory(deletedDomain.asBuilder().setDeletionTime(clock.nowUtc()).build());
|
||||
kittyDomain = kittyDomain.asBuilder().setDomainName("cat.fun").build();
|
||||
persistDomainHistory(kittyDomain);
|
||||
ContactResource contact3 = persistActiveContact("contact789");
|
||||
persistContactHistory(contact3);
|
||||
// This is a subordinate domain in TLD .cat, which is not included in any pending deposit. But
|
||||
// it should still be included as a subordinate host in the pendign deposit for .soy.
|
||||
HostResource host3 = persistActiveHost("ns1.lol.cat");
|
||||
persistHostHistory(host3);
|
||||
persistDomainHistory(
|
||||
helloDomain
|
||||
.asBuilder()
|
||||
.addContacts(
|
||||
ImmutableSet.of(
|
||||
DesignatedContact.create(DesignatedContact.Type.ADMIN, contact3.createVKey())))
|
||||
.addNameserver(host3.createVKey())
|
||||
.build());
|
||||
// contact456 is renamed to contactABC.
|
||||
persistContactHistory(contact2.asBuilder().setContactId("contactABC").build());
|
||||
// ns1.hello.soy is renamed to ns2.hello.soy
|
||||
persistHostHistory(host2.asBuilder().setHostName("ns2.hello.soy").build());
|
||||
|
||||
// Set the clock to 2000-01-02, any change after hereafter should not show up in the
|
||||
// resulting deposit fragments.
|
||||
clock.advanceBy(Duration.standardDays(2));
|
||||
persistEppResource(host.asBuilder().setHostName("new.host.test").build());
|
||||
persistDomainHistory(kittyDomain.asBuilder().setDeletionTime(clock.nowUtc()).build());
|
||||
ContactResource futureContact = persistActiveContact("future-contact");
|
||||
persistContactHistory(futureContact);
|
||||
HostResource futureHost = persistActiveHost("ns1.future.tld");
|
||||
persistHostHistory(futureHost);
|
||||
persistDomainHistory(
|
||||
persistEppResource(
|
||||
newDomainBase("future.soy", futureContact)
|
||||
.asBuilder()
|
||||
.setNameservers(futureHost.createVKey())
|
||||
.build()));
|
||||
// contactABC is renamed to contactXYZ.
|
||||
persistContactHistory(contact2.asBuilder().setContactId("contactXYZ").build());
|
||||
// ns2.hello.soy is renamed to ns3.hello.soy
|
||||
persistHostHistory(host2.asBuilder().setHostName("ns3.hello.soy").build());
|
||||
|
||||
options.setPendings(encodePendings(pendings));
|
||||
options.setPendings(encodePendingDeposits(pendings));
|
||||
// The EPP resources created in tests do not have all the fields populated, using a STRICT
|
||||
// validation mode will result in a lot of warnings during marshalling.
|
||||
options.setValidationMode("LENIENT");
|
||||
|
@ -223,9 +357,22 @@ public class RdePipelineTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
void testSuccess_encodeAndDecodePendingsMap() throws Exception {
|
||||
String encodedString = encodePendings(pendings);
|
||||
assertThat(decodePendings(encodedString)).isEqualTo(pendings);
|
||||
void testSuccess_encodeAndDecodePendingDeposits() throws Exception {
|
||||
String encodedString = encodePendingDeposits(pendings);
|
||||
assertThat(decodePendingDeposits(encodedString)).isEqualTo(pendings);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testFailure_pendingDepositsWithDifferentWatermarks() throws Exception {
|
||||
options.setPendings(
|
||||
encodePendingDeposits(
|
||||
ImmutableSet.of(
|
||||
PendingDeposit.create("soy", now, FULL, RDE_STAGING, Duration.standardDays(1)),
|
||||
PendingDeposit.create(
|
||||
"soy", now.plusSeconds(1), THIN, RDE_STAGING, Duration.standardDays(1)))));
|
||||
assertThrows(
|
||||
IllegalArgumentException.class,
|
||||
() -> new RdePipeline(options, gcsUtils, cloudTasksHelper.getTestCloudTasksUtils()));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -243,36 +390,47 @@ public class RdePipelineTest {
|
|||
// The same registrars are attached to all the pending deposits.
|
||||
.containsExactly("New Registrar", "The Registrar", "external_monitoring");
|
||||
// Domain fragments.
|
||||
assertThat(
|
||||
getFragmentForType(kv, DOMAIN)
|
||||
.map(getXmlElement(DOMAIN_NAME_PATTERN))
|
||||
.anyMatch(
|
||||
domain ->
|
||||
// Deleted domain should not be included
|
||||
domain.equals("deleted.soy")
|
||||
// Only domains on the pending deposit's tld should
|
||||
// appear.
|
||||
|| !kv.getKey()
|
||||
.tld()
|
||||
.equals(
|
||||
Iterables.get(
|
||||
Splitter.on('.').split(domain), 1))))
|
||||
.isFalse();
|
||||
if (kv.getKey().tld().equals("soy")) {
|
||||
assertThat(
|
||||
getFragmentForType(kv, DOMAIN)
|
||||
.map(getXmlElement(DOMAIN_NAME_PATTERN))
|
||||
.collect(toImmutableSet()))
|
||||
.containsExactly("hello.soy");
|
||||
} else {
|
||||
assertThat(
|
||||
getFragmentForType(kv, DOMAIN)
|
||||
.map(getXmlElement(DOMAIN_NAME_PATTERN))
|
||||
.collect(toImmutableSet()))
|
||||
.containsExactly("cat.fun");
|
||||
}
|
||||
if (kv.getKey().mode().equals(FULL)) {
|
||||
// Contact fragments.
|
||||
assertThat(
|
||||
getFragmentForType(kv, CONTACT)
|
||||
.map(getXmlElement(CONTACT_ID_PATTERN))
|
||||
.collect(toImmutableSet()))
|
||||
// The same contacts are attached too all pending deposits.
|
||||
.containsExactly("contact1234", "contact456");
|
||||
// Host fragments.
|
||||
assertThat(
|
||||
getFragmentForType(kv, HOST)
|
||||
.map(getXmlElement(HOST_NAME_PATTERN))
|
||||
.collect(toImmutableSet()))
|
||||
// Should load the resource before update.
|
||||
.containsExactly("old.host.test");
|
||||
// Contact fragments for hello.soy.
|
||||
if (kv.getKey().tld().equals("soy")) {
|
||||
assertThat(
|
||||
getFragmentForType(kv, CONTACT)
|
||||
.map(getXmlElement(CONTACT_ID_PATTERN))
|
||||
.collect(toImmutableSet()))
|
||||
.containsExactly("contact1234", "contact789");
|
||||
// Host fragments for hello.soy.
|
||||
assertThat(
|
||||
getFragmentForType(kv, HOST)
|
||||
.map(getXmlElement(HOST_NAME_PATTERN))
|
||||
.collect(toImmutableSet()))
|
||||
.containsExactly("ns1.external.tld", "ns1.lol.cat");
|
||||
} else {
|
||||
// Contact fragments for cat.fun.
|
||||
assertThat(
|
||||
getFragmentForType(kv, CONTACT)
|
||||
.map(getXmlElement(CONTACT_ID_PATTERN))
|
||||
.collect(toImmutableSet()))
|
||||
.containsExactly("contactABC");
|
||||
// Host fragments for cat.soy.
|
||||
assertThat(
|
||||
getFragmentForType(kv, HOST)
|
||||
.map(getXmlElement(HOST_NAME_PATTERN))
|
||||
.collect(toImmutableSet()))
|
||||
.containsExactly("ns1.external.tld", "ns2.hello.soy");
|
||||
}
|
||||
} else {
|
||||
// BRDA does not contain contact or hosts.
|
||||
assertThat(
|
||||
|
@ -295,7 +453,7 @@ public class RdePipelineTest {
|
|||
PendingDeposit brdaKey =
|
||||
PendingDeposit.create("soy", now, THIN, CursorType.BRDA, Duration.standardDays(1));
|
||||
PendingDeposit rdeKey =
|
||||
PendingDeposit.create("soy", now, FULL, CursorType.RDE_STAGING, Duration.standardDays(1));
|
||||
PendingDeposit.create("soy", now, FULL, RDE_STAGING, Duration.standardDays(1));
|
||||
|
||||
verifyFiles(ImmutableMap.of(brdaKey, brdaFragments, rdeKey, rdeFragments), false);
|
||||
|
||||
|
@ -311,7 +469,7 @@ public class RdePipelineTest {
|
|||
.isEquivalentAccordingToCompareTo(now.plus(Duration.standardDays(1)));
|
||||
assertThat(loadRevision(now, THIN)).isEqualTo(1);
|
||||
|
||||
assertThat(loadCursorTime(CursorType.RDE_STAGING))
|
||||
assertThat(loadCursorTime(RDE_STAGING))
|
||||
.isEquivalentAccordingToCompareTo(now.plus(Duration.standardDays(1)));
|
||||
assertThat(loadRevision(now, FULL)).isEqualTo(1);
|
||||
cloudTasksHelper.assertTasksEnqueued(
|
||||
|
@ -350,7 +508,7 @@ public class RdePipelineTest {
|
|||
assertThat(loadCursorTime(CursorType.BRDA)).isEquivalentAccordingToCompareTo(now);
|
||||
assertThat(loadRevision(now, THIN)).isEqualTo(0);
|
||||
|
||||
assertThat(loadCursorTime(CursorType.RDE_STAGING)).isEquivalentAccordingToCompareTo(now);
|
||||
assertThat(loadCursorTime(RDE_STAGING)).isEquivalentAccordingToCompareTo(now);
|
||||
assertThat(loadRevision(now, FULL)).isEqualTo(0);
|
||||
cloudTasksHelper.assertNoTasksEnqueued("brda", "rde-upload");
|
||||
}
|
||||
|
|
|
@ -79,6 +79,7 @@ public class RdeStagingActionCloudSqlTest extends BeamActionTestBase {
|
|||
action.watermarks = ImmutableSet.of();
|
||||
action.revision = Optional.empty();
|
||||
action.dataflow = dataflow;
|
||||
action.machineType = "machine-type";
|
||||
}
|
||||
|
||||
@TestSqlOnly
|
||||
|
@ -156,7 +157,7 @@ public class RdeStagingActionCloudSqlTest extends BeamActionTestBase {
|
|||
}
|
||||
|
||||
@TestSqlOnly
|
||||
void testRun_afterTransactionCooldown_runsMapReduce() throws Exception {
|
||||
void testRun_afterTransactionCooldown_runsPipeline() throws Exception {
|
||||
createTldWithEscrowEnabled("lol");
|
||||
clock.setTo(DateTime.parse("2000-01-01T00:05:00Z"));
|
||||
action.transactionCooldown = Duration.standardMinutes(5);
|
||||
|
@ -241,18 +242,19 @@ public class RdeStagingActionCloudSqlTest extends BeamActionTestBase {
|
|||
}
|
||||
|
||||
@TestSqlOnly
|
||||
void testManualRun_validParameters_runsMapReduce() throws Exception {
|
||||
void testManualRun_validParameters_runsPipeline() throws Exception {
|
||||
createTldWithEscrowEnabled("lol");
|
||||
clock.setTo(DateTime.parse("2000-01-01TZ"));
|
||||
action.manual = true;
|
||||
action.directory = Optional.of("test/");
|
||||
action.modeStrings = ImmutableSet.of("full");
|
||||
action.tlds = ImmutableSet.of("lol");
|
||||
action.watermarks = ImmutableSet.of(DateTime.parse("2001-01-01TZ"));
|
||||
action.watermarks =
|
||||
ImmutableSet.of(DateTime.parse("1999-12-31TZ"), DateTime.parse("2001-01-01TZ"));
|
||||
action.run();
|
||||
assertThat(response.getStatus()).isEqualTo(200);
|
||||
assertThat(response.getPayload()).contains("Launched RDE pipeline: jobid");
|
||||
verify(templates, times(1))
|
||||
assertThat(response.getPayload()).contains("Launched RDE pipeline: jobid, jobid1");
|
||||
verify(templates, times(2))
|
||||
.launch(eq("projectId"), eq("jobRegion"), any(LaunchFlexTemplateRequest.class));
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue