diff --git a/core/build.gradle b/core/build.gradle
index 9cb919016..ca7809029 100644
--- a/core/build.gradle
+++ b/core/build.gradle
@@ -780,6 +780,10 @@ if (environment in ['alpha', 'crash']) {
mainClass: 'google.registry.beam.invoicing.InvoicingPipeline',
metaData: 'google/registry/beam/invoicing_pipeline_metadata.json'
],
+ [
+ mainClass: 'google.registry.beam.rde.RdePipeline',
+ metaData: 'google/registry/beam/rde_pipeline_metadata.json'
+ ],
]
project.tasks.create("stage_beam_pipelines") {
doLast {
diff --git a/core/src/main/java/google/registry/beam/rde/RdePipeline.java b/core/src/main/java/google/registry/beam/rde/RdePipeline.java
new file mode 100644
index 000000000..c59f19eee
--- /dev/null
+++ b/core/src/main/java/google/registry/beam/rde/RdePipeline.java
@@ -0,0 +1,260 @@
+// Copyright 2021 The Nomulus Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package google.registry.beam.rde;
+
+import static com.google.common.collect.ImmutableSet.toImmutableSet;
+import static google.registry.model.EppResourceUtils.loadAtPointInTimeAsync;
+import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSetMultimap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.io.BaseEncoding;
+import google.registry.beam.common.RegistryJpaIO;
+import google.registry.model.EppResource;
+import google.registry.model.contact.ContactResource;
+import google.registry.model.domain.DomainBase;
+import google.registry.model.host.HostResource;
+import google.registry.model.rde.RdeMode;
+import google.registry.model.registrar.Registrar;
+import google.registry.model.registrar.Registrar.Type;
+import google.registry.persistence.VKey;
+import google.registry.rde.DepositFragment;
+import google.registry.rde.PendingDeposit;
+import google.registry.rde.PendingDeposit.PendingDepositCoder;
+import google.registry.rde.RdeFragmenter;
+import google.registry.rde.RdeMarshaller;
+import google.registry.xml.ValidationMode;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Supplier;
+import javax.persistence.Entity;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.FlatMapElements;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.joda.time.DateTime;
+
+/**
+ * Definition of a Dataflow Flex template, which generates RDE/BRDA deposits.
+ *
+ *
To stage this template locally, run the {@code stage_beam_pipeline.sh} shell script.
+ *
+ *
Then, you can run the staged template via the API client library, gCloud or a raw REST call.
+ *
+ * @see Using
+ * Flex Templates
+ */
+public class RdePipeline implements Serializable {
+
+ private final RdeMarshaller marshaller;
+ private final ImmutableSetMultimap pendings;
+
+ // Registrars to be excluded from data escrow. Not including the sandbox-only OTE type so that
+ // if sneaks into production we would get an extra signal.
+ private static final ImmutableSet IGNORED_REGISTRAR_TYPES =
+ Sets.immutableEnumSet(Registrar.Type.MONITORING, Registrar.Type.TEST);
+
+ private static final String EPP_RESOURCE_QUERY =
+ "SELECT id FROM %entity% "
+ + "WHERE COALESCE(creationClientId, '') NOT LIKE 'prober-%' "
+ + "AND COALESCE(currentSponsorClientId, '') NOT LIKE 'prober-%' "
+ + "AND COALESCE(lastEppUpdateClientId, '') NOT LIKE 'prober-%'";
+
+ public static String createEppResourceQuery(Class extends EppResource> clazz) {
+ return EPP_RESOURCE_QUERY.replace("%entity%", clazz.getAnnotation(Entity.class).name())
+ + (clazz.equals(DomainBase.class) ? " AND tld in (:tlds)" : "");
+ }
+
+ RdePipeline(RdePipelineOptions options) throws IOException, ClassNotFoundException {
+ this.marshaller = new RdeMarshaller(ValidationMode.valueOf(options.getValidationMode()));
+ this.pendings = decodePendings(options.getPendings());
+ }
+
+ @VisibleForTesting
+ PipelineResult run(Pipeline pipeline) {
+ createFragments(pipeline);
+ return pipeline.run();
+ }
+
+ PipelineResult run() {
+ return run(Pipeline.create());
+ }
+
+ PCollection> createFragments(Pipeline pipeline) {
+ PCollection> fragments =
+ PCollectionList.of(processRegistrars(pipeline))
+ .and(processNonRegistrarEntities(pipeline, DomainBase.class))
+ .and(processNonRegistrarEntities(pipeline, ContactResource.class))
+ .and(processNonRegistrarEntities(pipeline, HostResource.class))
+ .apply(Flatten.pCollections())
+ .setCoder(
+ KvCoder.of(PendingDepositCoder.of(), SerializableCoder.of(DepositFragment.class)));
+ return fragments;
+ }
+
+ PCollection> processRegistrars(Pipeline pipeline) {
+ return pipeline
+ .apply(
+ "Read all production Registrar entities",
+ RegistryJpaIO.read(
+ "SELECT clientIdentifier FROM Registrar WHERE type NOT IN (:types)",
+ ImmutableMap.of("types", IGNORED_REGISTRAR_TYPES),
+ String.class,
+ // TODO: consider adding coders for entities and pass them directly instead of using
+ // VKeys.
+ id -> VKey.createSql(Registrar.class, id)))
+ .apply(
+ "Marshal Registrar into DepositFragment",
+ FlatMapElements.into(
+ TypeDescriptors.kvs(
+ TypeDescriptor.of(PendingDeposit.class),
+ TypeDescriptor.of(DepositFragment.class)))
+ .via(
+ (VKey key) -> {
+ Registrar registrar = jpaTm().transact(() -> jpaTm().loadByKey(key));
+ DepositFragment fragment = marshaller.marshalRegistrar(registrar);
+ return pendings.values().stream()
+ .map(pending -> KV.of(pending, fragment))
+ .collect(toImmutableSet());
+ }));
+ }
+
+ @SuppressWarnings("deprecation") // Reshuffle is still recommended by Dataflow.
+
+ PCollection> processNonRegistrarEntities(
+ Pipeline pipeline, Class clazz) {
+ return createInputs(pipeline, clazz)
+ .apply("Marshal " + clazz.getSimpleName() + " into DepositFragment", mapToFragments(clazz))
+ .setCoder(KvCoder.of(PendingDepositCoder.of(), SerializableCoder.of(DepositFragment.class)))
+ .apply(
+ "Reshuffle KV of "
+ + clazz.getSimpleName()
+ + " to prevent fusion",
+ Reshuffle.of());
+ }
+
+ PCollection> createInputs(Pipeline pipeline, Class clazz) {
+ return pipeline.apply(
+ "Read all production " + clazz.getSimpleName() + " entities",
+ RegistryJpaIO.read(
+ createEppResourceQuery(clazz),
+ clazz.equals(DomainBase.class)
+ ? ImmutableMap.of("tlds", pendings.keySet())
+ : ImmutableMap.of(),
+ String.class,
+ // TODO: consider adding coders for entities and pass them directly instead of using
+ // VKeys.
+ x -> VKey.create(clazz, x)));
+ }
+
+
+ FlatMapElements, KV> mapToFragments(Class clazz) {
+ return FlatMapElements.into(
+ TypeDescriptors.kvs(
+ TypeDescriptor.of(PendingDeposit.class), TypeDescriptor.of(DepositFragment.class)))
+ .via(
+ (VKey key) -> {
+ T resource = jpaTm().transact(() -> jpaTm().loadByKey(key));
+ // The set of all TLDs to which this resource should be emitted.
+ ImmutableSet tlds =
+ clazz.equals(DomainBase.class)
+ ? ImmutableSet.of(((DomainBase) resource).getTld())
+ : pendings.keySet();
+ // Get the set of all point-in-time watermarks we need, to minimize rewinding.
+ ImmutableSet dates =
+ tlds.stream()
+ .map(pendings::get)
+ .flatMap(ImmutableSet::stream)
+ .map(PendingDeposit::watermark)
+ .collect(toImmutableSet());
+ // Launch asynchronous fetches of point-in-time representations of resource.
+ ImmutableMap> resourceAtTimes =
+ ImmutableMap.copyOf(
+ Maps.asMap(dates, input -> loadAtPointInTimeAsync(resource, input)));
+ // Convert resource to an XML fragment for each watermark/mode pair lazily and cache
+ // the result.
+ RdeFragmenter fragmenter = new RdeFragmenter(resourceAtTimes, marshaller);
+ List> results = new ArrayList<>();
+ for (String tld : tlds) {
+ for (PendingDeposit pending : pendings.get(tld)) {
+ // Hosts and contacts don't get included in BRDA deposits.
+ if (pending.mode() == RdeMode.THIN && !clazz.equals(DomainBase.class)) {
+ continue;
+ }
+ Optional fragment =
+ fragmenter.marshal(pending.watermark(), pending.mode());
+ fragment.ifPresent(
+ depositFragment -> results.add(KV.of(pending, depositFragment)));
+ }
+ }
+ return results;
+ });
+ }
+
+ /**
+ * Decodes the pipeline option extracted from the URL parameter sent by the pipeline launcher to
+ * the original TLD to pending deposit map.
+ */
+ @SuppressWarnings("unchecked")
+ static ImmutableSetMultimap decodePendings(String encodedPending)
+ throws IOException, ClassNotFoundException {
+ try (ObjectInputStream ois =
+ new ObjectInputStream(
+ new ByteArrayInputStream(
+ BaseEncoding.base64Url().omitPadding().decode(encodedPending)))) {
+ return (ImmutableSetMultimap) ois.readObject();
+ }
+ }
+
+ /**
+ * Encodes the TLD to pending deposit map in an URL safe string that is sent to the pipeline
+ * worker by the pipeline launcher as a pipeline option.
+ */
+ public static String encodePendings(ImmutableSetMultimap pendings)
+ throws IOException {
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(pendings);
+ oos.flush();
+ return BaseEncoding.base64Url().omitPadding().encode(baos.toByteArray());
+ }
+ }
+
+ public static void main(String[] args) throws IOException, ClassNotFoundException {
+ PipelineOptionsFactory.register(RdePipelineOptions.class);
+ RdePipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(RdePipelineOptions.class);
+ new RdePipeline(options).run();
+ }
+}
diff --git a/core/src/main/java/google/registry/beam/rde/RdePipelineOptions.java b/core/src/main/java/google/registry/beam/rde/RdePipelineOptions.java
new file mode 100644
index 000000000..d82de1ce8
--- /dev/null
+++ b/core/src/main/java/google/registry/beam/rde/RdePipelineOptions.java
@@ -0,0 +1,32 @@
+// Copyright 2021 The Nomulus Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package google.registry.beam.rde;
+
+import google.registry.beam.common.RegistryPipelineOptions;
+import org.apache.beam.sdk.options.Description;
+
+/** Custom options for running the spec11 pipeline. */
+public interface RdePipelineOptions extends RegistryPipelineOptions {
+
+ @Description("The Base64-encoded serialized map of TLDs to PendingDeposit.")
+ String getPendings();
+
+ void setPendings(String value);
+
+ @Description("The validation mode (LENIENT|STRICT) that the RDE marshaller uses.")
+ String getValidationMode();
+
+ void setValidationMode(String value);
+}
diff --git a/core/src/main/java/google/registry/model/EppResourceUtils.java b/core/src/main/java/google/registry/model/EppResourceUtils.java
index cc943bf6d..29a5dd7c3 100644
--- a/core/src/main/java/google/registry/model/EppResourceUtils.java
+++ b/core/src/main/java/google/registry/model/EppResourceUtils.java
@@ -398,7 +398,7 @@ public final class EppResourceUtils {
.orElse(null);
if (resourceAtPointInTime == null) {
logger.atSevere().log(
- "Couldn't load resource at % for key %s, falling back to resource %s.",
+ "Couldn't load resource at %s for key %s, falling back to resource %s.",
timestamp, resource.createVKey(), resource);
return resource;
}
diff --git a/core/src/main/java/google/registry/model/contact/ContactHistory.java b/core/src/main/java/google/registry/model/contact/ContactHistory.java
index 4f5ac5cdb..57f5deadb 100644
--- a/core/src/main/java/google/registry/model/contact/ContactHistory.java
+++ b/core/src/main/java/google/registry/model/contact/ContactHistory.java
@@ -110,7 +110,8 @@ public class ContactHistory extends HistoryEntry implements SqlEntity {
@Override
public Optional extends EppResource> getResourceAtPointInTime() {
- return getContactBase();
+ return getContactBase()
+ .map(contactBase -> new ContactResource.Builder().copyFrom(contactBase).build());
}
@PostLoad
diff --git a/core/src/main/java/google/registry/model/contact/ContactResource.java b/core/src/main/java/google/registry/model/contact/ContactResource.java
index c41feea20..8258202dc 100644
--- a/core/src/main/java/google/registry/model/contact/ContactResource.java
+++ b/core/src/main/java/google/registry/model/contact/ContactResource.java
@@ -79,5 +79,26 @@ public class ContactResource extends ContactBase
private Builder(ContactResource instance) {
super(instance);
}
+
+ public Builder copyFrom(ContactBase contactBase) {
+ return this.setAuthInfo(contactBase.getAuthInfo())
+ .setContactId(contactBase.getContactId())
+ .setCreationClientId(contactBase.getCreationClientId())
+ .setCreationTime(contactBase.getCreationTime())
+ .setDeletionTime(contactBase.getDeletionTime())
+ .setDisclose(contactBase.getDisclose())
+ .setEmailAddress(contactBase.getEmailAddress())
+ .setFaxNumber(contactBase.getFaxNumber())
+ .setInternationalizedPostalInfo(contactBase.getInternationalizedPostalInfo())
+ .setLastTransferTime(contactBase.getLastTransferTime())
+ .setLastEppUpdateClientId(contactBase.getLastEppUpdateClientId())
+ .setLastEppUpdateTime(contactBase.getLastEppUpdateTime())
+ .setLocalizedPostalInfo(contactBase.getLocalizedPostalInfo())
+ .setPersistedCurrentSponsorClientId(contactBase.getPersistedCurrentSponsorClientId())
+ .setRepoId(contactBase.getRepoId())
+ .setStatusValues(contactBase.getStatusValues())
+ .setTransferData(contactBase.getTransferData())
+ .setVoiceNumber(contactBase.getVoiceNumber());
+ }
}
}
diff --git a/core/src/main/java/google/registry/model/domain/DomainBase.java b/core/src/main/java/google/registry/model/domain/DomainBase.java
index 633ddfde7..acdce32e7 100644
--- a/core/src/main/java/google/registry/model/domain/DomainBase.java
+++ b/core/src/main/java/google/registry/model/domain/DomainBase.java
@@ -181,5 +181,34 @@ public class DomainBase extends DomainContent
Builder(DomainBase instance) {
super(instance);
}
+
+ public Builder copyFrom(DomainContent domainContent) {
+ return this.setAuthInfo(domainContent.getAuthInfo())
+ .setAutorenewPollMessage(domainContent.getAutorenewPollMessage())
+ .setAutorenewBillingEvent(domainContent.getAutorenewBillingEvent())
+ .setAutorenewEndTime(domainContent.getAutorenewEndTime())
+ .setContacts(domainContent.getContacts())
+ .setCreationClientId(domainContent.getCreationClientId())
+ .setCreationTime(domainContent.getCreationTime())
+ .setDomainName(domainContent.getDomainName())
+ .setDeletePollMessage(domainContent.getDeletePollMessage())
+ .setDsData(domainContent.getDsData())
+ .setDeletionTime(domainContent.getDeletionTime())
+ .setGracePeriods(domainContent.getGracePeriods())
+ .setIdnTableName(domainContent.getIdnTableName())
+ .setLastTransferTime(domainContent.getLastTransferTime())
+ .setLaunchNotice(domainContent.getLaunchNotice())
+ .setLastEppUpdateClientId(domainContent.getLastEppUpdateClientId())
+ .setLastEppUpdateTime(domainContent.getLastEppUpdateTime())
+ .setNameservers(domainContent.getNameservers())
+ .setPersistedCurrentSponsorClientId(domainContent.getPersistedCurrentSponsorClientId())
+ .setRegistrant(domainContent.getRegistrant())
+ .setRegistrationExpirationTime(domainContent.getRegistrationExpirationTime())
+ .setRepoId(domainContent.getRepoId())
+ .setSmdId(domainContent.getSmdId())
+ .setSubordinateHosts(domainContent.getSubordinateHosts())
+ .setStatusValues(domainContent.getStatusValues())
+ .setTransferData(domainContent.getTransferData());
+ }
}
}
diff --git a/core/src/main/java/google/registry/model/domain/DomainHistory.java b/core/src/main/java/google/registry/model/domain/DomainHistory.java
index 9fb4721e8..68e841c89 100644
--- a/core/src/main/java/google/registry/model/domain/DomainHistory.java
+++ b/core/src/main/java/google/registry/model/domain/DomainHistory.java
@@ -252,11 +252,18 @@ public class DomainHistory extends HistoryEntry implements SqlEntity {
@Override
public Optional extends EppResource> getResourceAtPointInTime() {
- return getDomainContent();
+ return getDomainContent()
+ .map(domainContent -> new DomainBase.Builder().copyFrom(domainContent).build());
}
@PostLoad
void postLoad() {
+ // TODO(b/188044616): Determine why Eager loading doesn't work here.
+ Hibernate.initialize(domainTransactionRecords);
+ Hibernate.initialize(nsHosts);
+ Hibernate.initialize(dsDataHistories);
+ Hibernate.initialize(gracePeriodHistories);
+
if (domainContent != null) {
domainContent.nsHosts = nullToEmptyImmutableCopy(nsHosts);
domainContent.gracePeriods =
@@ -278,12 +285,6 @@ public class DomainHistory extends HistoryEntry implements SqlEntity {
}
}
}
-
- // TODO(b/188044616): Determine why Eager loading doesn't work here.
- Hibernate.initialize(domainTransactionRecords);
- Hibernate.initialize(nsHosts);
- Hibernate.initialize(dsDataHistories);
- Hibernate.initialize(gracePeriodHistories);
}
// In Datastore, save as a HistoryEntry object regardless of this object's type
diff --git a/core/src/main/java/google/registry/model/host/HostBase.java b/core/src/main/java/google/registry/model/host/HostBase.java
index df4a41b5c..6b9f10b8e 100644
--- a/core/src/main/java/google/registry/model/host/HostBase.java
+++ b/core/src/main/java/google/registry/model/host/HostBase.java
@@ -137,7 +137,7 @@ public class HostBase extends EppResource {
}
@Override
- public Builder asBuilder() {
+ public Builder extends HostBase, ?> asBuilder() {
return new Builder<>(clone(this));
}
diff --git a/core/src/main/java/google/registry/model/host/HostHistory.java b/core/src/main/java/google/registry/model/host/HostHistory.java
index 1313d4a50..02d0661c3 100644
--- a/core/src/main/java/google/registry/model/host/HostHistory.java
+++ b/core/src/main/java/google/registry/model/host/HostHistory.java
@@ -111,7 +111,7 @@ public class HostHistory extends HistoryEntry implements SqlEntity {
@Override
public Optional extends EppResource> getResourceAtPointInTime() {
- return getHostBase();
+ return getHostBase().map(hostBase -> new HostResource.Builder().copyFrom(hostBase).build());
}
@PostLoad
diff --git a/core/src/main/java/google/registry/model/host/HostResource.java b/core/src/main/java/google/registry/model/host/HostResource.java
index 0338b8d1d..185b3219d 100644
--- a/core/src/main/java/google/registry/model/host/HostResource.java
+++ b/core/src/main/java/google/registry/model/host/HostResource.java
@@ -63,5 +63,21 @@ public class HostResource extends HostBase
private Builder(HostResource instance) {
super(instance);
}
+
+ public Builder copyFrom(HostBase hostBase) {
+ return this.setCreationClientId(hostBase.getCreationClientId())
+ .setCreationTime(hostBase.getCreationTime())
+ .setDeletionTime(hostBase.getDeletionTime())
+ .setHostName(hostBase.getHostName())
+ .setInetAddresses(hostBase.getInetAddresses())
+ .setLastTransferTime(hostBase.getLastTransferTime())
+ .setLastSuperordinateChange(hostBase.getLastSuperordinateChange())
+ .setLastEppUpdateClientId(hostBase.getLastEppUpdateClientId())
+ .setLastEppUpdateTime(hostBase.getLastEppUpdateTime())
+ .setPersistedCurrentSponsorClientId(hostBase.getPersistedCurrentSponsorClientId())
+ .setRepoId(hostBase.getRepoId())
+ .setSuperordinateDomain(hostBase.getSuperordinateDomain())
+ .setStatusValues(hostBase.getStatusValues());
+ }
}
}
diff --git a/core/src/main/java/google/registry/rde/PendingDeposit.java b/core/src/main/java/google/registry/rde/PendingDeposit.java
index 9e40c24a6..c8bca8823 100644
--- a/core/src/main/java/google/registry/rde/PendingDeposit.java
+++ b/core/src/main/java/google/registry/rde/PendingDeposit.java
@@ -17,8 +17,17 @@ package google.registry.rde;
import com.google.auto.value.AutoValue;
import google.registry.model.common.Cursor.CursorType;
import google.registry.model.rde.RdeMode;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.io.Serializable;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.BooleanCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
import org.joda.time.DateTime;
import org.joda.time.Duration;
@@ -69,17 +78,9 @@ public abstract class PendingDeposit implements Serializable {
@Nullable
public abstract Integer revision();
- static PendingDeposit create(
+ public static PendingDeposit create(
String tld, DateTime watermark, RdeMode mode, CursorType cursor, Duration interval) {
- return new AutoValue_PendingDeposit(
- false,
- tld,
- watermark,
- mode,
- cursor,
- interval,
- null,
- null);
+ return new AutoValue_PendingDeposit(false, tld, watermark, mode, cursor, interval, null, null);
}
static PendingDeposit createInManualOperation(
@@ -89,15 +90,52 @@ public abstract class PendingDeposit implements Serializable {
String directoryWithTrailingSlash,
@Nullable Integer revision) {
return new AutoValue_PendingDeposit(
- true,
- tld,
- watermark,
- mode,
- null,
- null,
- directoryWithTrailingSlash,
- revision);
+ true, tld, watermark, mode, null, null, directoryWithTrailingSlash, revision);
}
PendingDeposit() {}
+
+ /**
+ * A deterministic coder for {@link PendingDeposit} used during a GroupBy transform.
+ *
+ * We cannot use a {@link SerializableCoder} directly because it does not guarantee
+ * determinism, which is required by GroupBy.
+ */
+ public static class PendingDepositCoder extends AtomicCoder {
+
+ private PendingDepositCoder() {
+ super();
+ }
+
+ private static final PendingDepositCoder INSTANCE = new PendingDepositCoder();
+
+ public static PendingDepositCoder of() {
+ return INSTANCE;
+ }
+
+ @Override
+ public void encode(PendingDeposit value, OutputStream outStream) throws IOException {
+ BooleanCoder.of().encode(value.manual(), outStream);
+ StringUtf8Coder.of().encode(value.tld(), outStream);
+ SerializableCoder.of(DateTime.class).encode(value.watermark(), outStream);
+ SerializableCoder.of(RdeMode.class).encode(value.mode(), outStream);
+ NullableCoder.of(SerializableCoder.of(CursorType.class)).encode(value.cursor(), outStream);
+ NullableCoder.of(SerializableCoder.of(Duration.class)).encode(value.interval(), outStream);
+ NullableCoder.of(StringUtf8Coder.of()).encode(value.directoryWithTrailingSlash(), outStream);
+ NullableCoder.of(VarIntCoder.of()).encode(value.revision(), outStream);
+ }
+
+ @Override
+ public PendingDeposit decode(InputStream inStream) throws IOException {
+ return new AutoValue_PendingDeposit(
+ BooleanCoder.of().decode(inStream),
+ StringUtf8Coder.of().decode(inStream),
+ SerializableCoder.of(DateTime.class).decode(inStream),
+ SerializableCoder.of(RdeMode.class).decode(inStream),
+ NullableCoder.of(SerializableCoder.of(CursorType.class)).decode(inStream),
+ NullableCoder.of(SerializableCoder.of(Duration.class)).decode(inStream),
+ NullableCoder.of(StringUtf8Coder.of()).decode(inStream),
+ NullableCoder.of(VarIntCoder.of()).decode(inStream));
+ }
+ }
}
diff --git a/core/src/main/java/google/registry/rde/RdeFragmenter.java b/core/src/main/java/google/registry/rde/RdeFragmenter.java
new file mode 100644
index 000000000..660037cc3
--- /dev/null
+++ b/core/src/main/java/google/registry/rde/RdeFragmenter.java
@@ -0,0 +1,109 @@
+// Copyright 2021 The Nomulus Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package google.registry.rde;
+
+import static google.registry.model.EppResourceUtils.loadAtPointInTime;
+import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.ImmutableMap;
+import google.registry.model.EppResource;
+import google.registry.model.contact.ContactResource;
+import google.registry.model.domain.DomainBase;
+import google.registry.model.host.HostResource;
+import google.registry.model.rde.RdeMode;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Supplier;
+import org.joda.time.DateTime;
+
+/** Loading cache that turns a resource into XML for the various points in time and modes. */
+public class RdeFragmenter {
+ private final Map> cache = new HashMap<>();
+ private final ImmutableMap> resourceAtTimes;
+ private final RdeMarshaller marshaller;
+
+ long cacheHits = 0;
+ long resourcesNotFound = 0;
+ long resourcesFound = 0;
+
+ public RdeFragmenter(
+ ImmutableMap> resourceAtTimes, RdeMarshaller marshaller) {
+ this.resourceAtTimes = resourceAtTimes;
+ this.marshaller = marshaller;
+ }
+
+ public Optional marshal(DateTime watermark, RdeMode mode) {
+ Optional result = cache.get(WatermarkModePair.create(watermark, mode));
+ if (result != null) {
+ cacheHits++;
+ return result;
+ }
+ EppResource resource = resourceAtTimes.get(watermark).get();
+ if (resource == null) {
+ result = Optional.empty();
+ cache.put(WatermarkModePair.create(watermark, RdeMode.FULL), result);
+ cache.put(WatermarkModePair.create(watermark, RdeMode.THIN), result);
+ resourcesNotFound++;
+ return result;
+ }
+ resourcesFound++;
+ if (resource instanceof DomainBase) {
+ result = Optional.of(marshaller.marshalDomain((DomainBase) resource, mode));
+ cache.put(WatermarkModePair.create(watermark, mode), result);
+ return result;
+ } else if (resource instanceof ContactResource) {
+ result = Optional.of(marshaller.marshalContact((ContactResource) resource));
+ cache.put(WatermarkModePair.create(watermark, RdeMode.FULL), result);
+ cache.put(WatermarkModePair.create(watermark, RdeMode.THIN), result);
+ return result;
+ } else if (resource instanceof HostResource) {
+ HostResource host = (HostResource) resource;
+ result =
+ Optional.of(
+ host.isSubordinate()
+ ? marshaller.marshalSubordinateHost(
+ host,
+ // Note that loadAtPointInTime() does cloneProjectedAtTime(watermark) for
+ // us.
+ Objects.requireNonNull(
+ loadAtPointInTime(
+ tm().loadByKey(host.getSuperordinateDomain()), watermark)))
+ : marshaller.marshalExternalHost(host));
+ cache.put(WatermarkModePair.create(watermark, RdeMode.FULL), result);
+ cache.put(WatermarkModePair.create(watermark, RdeMode.THIN), result);
+ return result;
+ } else {
+ throw new IllegalStateException(
+ String.format(
+ "Resource %s of type %s cannot be converted to XML.",
+ resource, resource.getClass().getSimpleName()));
+ }
+ }
+
+ /** Map key for {@link RdeFragmenter} cache. */
+ @AutoValue
+ abstract static class WatermarkModePair {
+ abstract DateTime watermark();
+
+ abstract RdeMode mode();
+
+ static WatermarkModePair create(DateTime watermark, RdeMode mode) {
+ return new AutoValue_RdeFragmenter_WatermarkModePair(watermark, mode);
+ }
+ }
+}
diff --git a/core/src/main/java/google/registry/rde/RdeStagingMapper.java b/core/src/main/java/google/registry/rde/RdeStagingMapper.java
index ceecc40ea..a9c45aaf7 100644
--- a/core/src/main/java/google/registry/rde/RdeStagingMapper.java
+++ b/core/src/main/java/google/registry/rde/RdeStagingMapper.java
@@ -16,12 +16,10 @@ package google.registry.rde;
import static com.google.common.base.Strings.nullToEmpty;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
-import static google.registry.model.EppResourceUtils.loadAtPointInTime;
import static google.registry.model.EppResourceUtils.loadAtPointInTimeAsync;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import com.google.appengine.tools.mapreduce.Mapper;
-import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSetMultimap;
@@ -34,9 +32,6 @@ import google.registry.model.host.HostResource;
import google.registry.model.rde.RdeMode;
import google.registry.model.registrar.Registrar;
import google.registry.xml.ValidationMode;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
import org.joda.time.DateTime;
@@ -129,7 +124,7 @@ public final class RdeStagingMapper extends Mapper loadAtPointInTimeAsync(resource, input)));
// Convert resource to an XML fragment for each watermark/mode pair lazily and cache the result.
- Fragmenter fragmenter = new Fragmenter(resourceAtTimes);
+ RdeFragmenter fragmenter = new RdeFragmenter(resourceAtTimes, marshaller);
// Emit resource as an XML fragment for all TLDs and modes pending deposit.
long resourcesEmitted = 0;
@@ -157,74 +152,4 @@ public final class RdeStagingMapper extends Mapper> cache = new HashMap<>();
- private final ImmutableMap> resourceAtTimes;
-
- long cacheHits = 0;
- long resourcesNotFound = 0;
- long resourcesFound = 0;
-
- Fragmenter(ImmutableMap> resourceAtTimes) {
- this.resourceAtTimes = resourceAtTimes;
- }
-
- Optional marshal(DateTime watermark, RdeMode mode) {
- Optional result = cache.get(WatermarkModePair.create(watermark, mode));
- if (result != null) {
- cacheHits++;
- return result;
- }
- EppResource resource = resourceAtTimes.get(watermark).get();
- if (resource == null) {
- result = Optional.empty();
- cache.put(WatermarkModePair.create(watermark, RdeMode.FULL), result);
- cache.put(WatermarkModePair.create(watermark, RdeMode.THIN), result);
- resourcesNotFound++;
- return result;
- }
- resourcesFound++;
- if (resource instanceof DomainBase) {
- result = Optional.of(marshaller.marshalDomain((DomainBase) resource, mode));
- cache.put(WatermarkModePair.create(watermark, mode), result);
- return result;
- } else if (resource instanceof ContactResource) {
- result = Optional.of(marshaller.marshalContact((ContactResource) resource));
- cache.put(WatermarkModePair.create(watermark, RdeMode.FULL), result);
- cache.put(WatermarkModePair.create(watermark, RdeMode.THIN), result);
- return result;
- } else if (resource instanceof HostResource) {
- HostResource host = (HostResource) resource;
- result =
- Optional.of(
- host.isSubordinate()
- ? marshaller.marshalSubordinateHost(
- host,
- // Note that loadAtPointInTime() does cloneProjectedAtTime(watermark) for
- // us.
- Objects.requireNonNull(
- loadAtPointInTime(
- tm().loadByKey(host.getSuperordinateDomain()), watermark)))
- : marshaller.marshalExternalHost(host));
- cache.put(WatermarkModePair.create(watermark, RdeMode.FULL), result);
- cache.put(WatermarkModePair.create(watermark, RdeMode.THIN), result);
- return result;
- } else {
- throw new AssertionError(resource.toString());
- }
- }
- }
-
- /** Map key for {@link Fragmenter} cache. */
- @AutoValue
- abstract static class WatermarkModePair {
- abstract DateTime watermark();
- abstract RdeMode mode();
-
- static WatermarkModePair create(DateTime watermark, RdeMode mode) {
- return new AutoValue_RdeStagingMapper_WatermarkModePair(watermark, mode);
- }
- }
}
diff --git a/core/src/main/resources/google/registry/beam/rde_pipeline_metadata.json b/core/src/main/resources/google/registry/beam/rde_pipeline_metadata.json
new file mode 100644
index 000000000..a88f69dd0
--- /dev/null
+++ b/core/src/main/resources/google/registry/beam/rde_pipeline_metadata.json
@@ -0,0 +1,23 @@
+{
+ "name": "RDE/BRDA Deposit Generation",
+ "description": "An Apache Beam pipeline generates RDE or BRDA deposits and deposits them to GCS with GhostRyde encryption.",
+ "parameters": [
+ {
+ "name": "pendings",
+ "label": "The pendings deposits to generate.",
+ "helpText": "A TLD to PendingDeposit map that is serialized and Base64 URL-safe encoded.",
+ "regexes": [
+ "A-Za-z0-9\\-_"
+ ]
+ },
+ {
+ "name": "validationMode",
+ "label": "How strict the marshaller validates the given EPP resources.",
+ "helpText": "If set to LENIENT the marshaller will not warn about missing data on the EPP resources.",
+ "is_optional": true,
+ "regexes": [
+ "^STRICT|LENIENT$"
+ ]
+ }
+ ]
+}
diff --git a/core/src/test/java/google/registry/beam/rde/RdePipelineTest.java b/core/src/test/java/google/registry/beam/rde/RdePipelineTest.java
new file mode 100644
index 000000000..2610ef685
--- /dev/null
+++ b/core/src/test/java/google/registry/beam/rde/RdePipelineTest.java
@@ -0,0 +1,257 @@
+// Copyright 2021 The Nomulus Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package google.registry.beam.rde;
+
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.ImmutableSet.toImmutableSet;
+import static com.google.common.truth.Truth.assertThat;
+import static google.registry.beam.rde.RdePipeline.decodePendings;
+import static google.registry.beam.rde.RdePipeline.encodePendings;
+import static google.registry.model.common.Cursor.CursorType.RDE_STAGING;
+import static google.registry.model.rde.RdeMode.FULL;
+import static google.registry.model.rde.RdeMode.THIN;
+import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
+import static google.registry.persistence.transaction.TransactionManagerFactory.setTm;
+import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
+import static google.registry.rde.RdeResourceType.CONTACT;
+import static google.registry.rde.RdeResourceType.DOMAIN;
+import static google.registry.rde.RdeResourceType.HOST;
+import static google.registry.rde.RdeResourceType.REGISTRAR;
+import static google.registry.testing.AppEngineExtension.loadInitialData;
+import static google.registry.testing.DatabaseHelper.createTld;
+import static google.registry.testing.DatabaseHelper.newHostResource;
+import static google.registry.testing.DatabaseHelper.persistActiveContact;
+import static google.registry.testing.DatabaseHelper.persistActiveDomain;
+import static google.registry.testing.DatabaseHelper.persistDeletedDomain;
+import static google.registry.testing.DatabaseHelper.persistEppResource;
+import static google.registry.testing.DatabaseHelper.persistNewRegistrar;
+import static org.joda.time.Duration.standardDays;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSetMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Streams;
+import google.registry.beam.TestPipelineExtension;
+import google.registry.model.host.HostResource;
+import google.registry.model.registrar.Registrar;
+import google.registry.model.registrar.Registrar.State;
+import google.registry.persistence.transaction.JpaTestRules;
+import google.registry.persistence.transaction.JpaTestRules.JpaIntegrationTestExtension;
+import google.registry.persistence.transaction.TransactionManager;
+import google.registry.rde.DepositFragment;
+import google.registry.rde.PendingDeposit;
+import google.registry.rde.RdeResourceType;
+import google.registry.testing.DatastoreEntityExtension;
+import google.registry.testing.FakeClock;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.values.KV;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+/** Unit tests for {@link RdePipeline}. */
+public class RdePipelineTest {
+
+ private static final String REGISTRAR_NAME_PATTERN =
+ "(.*)";
+
+ private static final String DOMAIN_NAME_PATTERN = "(.*)";
+
+ private static final String CONTACT_ID_PATTERN = "(.*)";
+
+ private static final String HOST_NAME_PATTERN = "(.*)";
+
+ private static final ImmutableSetMultimap PENDINGS =
+ ImmutableSetMultimap.of(
+ "pal",
+ PendingDeposit.create(
+ "pal", DateTime.parse("2000-01-01TZ"), FULL, RDE_STAGING, standardDays(1)),
+ "pal",
+ PendingDeposit.create(
+ "pal", DateTime.parse("2000-01-01TZ"), THIN, RDE_STAGING, standardDays(1)),
+ "fun",
+ PendingDeposit.create(
+ "fun", DateTime.parse("2000-01-01TZ"), FULL, RDE_STAGING, standardDays(1)));
+
+ // This is the default creation time for test data.
+ private final FakeClock clock = new FakeClock(DateTime.parse("1999-12-31TZ"));
+
+ // The pipeline runs in a different thread, which needs to be masqueraded as a GAE thread as well.
+ @RegisterExtension
+ @Order(Order.DEFAULT - 1)
+ final DatastoreEntityExtension datastore = new DatastoreEntityExtension().allThreads(true);
+
+ @RegisterExtension
+ final JpaIntegrationTestExtension database =
+ new JpaTestRules.Builder().withClock(clock).buildIntegrationTestRule();
+
+ @RegisterExtension
+ final TestPipelineExtension pipeline =
+ TestPipelineExtension.create().enableAbandonedNodeEnforcement(true);
+
+ private final RdePipelineOptions options =
+ PipelineOptionsFactory.create().as(RdePipelineOptions.class);
+
+ private RdePipeline rdePipeline;
+
+ private TransactionManager originalTm;
+
+ @BeforeEach
+ void beforeEach() throws Exception {
+ originalTm = tm();
+ setTm(jpaTm());
+ loadInitialData();
+
+ // Two real registrars have been created by loadInitialData(), named "New Registrar" and "The
+ // Registrar". Create one included registrar (external_monitoring) and two excluded ones.
+ Registrar monitoringRegistrar =
+ persistNewRegistrar("monitoring", "monitoring", Registrar.Type.MONITORING, null);
+ Registrar testRegistrar = persistNewRegistrar("test", "test", Registrar.Type.TEST, null);
+ Registrar externalMonitoringRegistrar =
+ persistNewRegistrar(
+ "externalmonitor", "external_monitoring", Registrar.Type.EXTERNAL_MONITORING, 9997L);
+ // Set Registrar states which are required for reporting.
+ tm().transact(
+ () ->
+ tm().putAll(
+ ImmutableList.of(
+ externalMonitoringRegistrar.asBuilder().setState(State.ACTIVE).build(),
+ testRegistrar.asBuilder().setState(State.ACTIVE).build(),
+ monitoringRegistrar.asBuilder().setState(State.ACTIVE).build())));
+
+ createTld("pal");
+ createTld("fun");
+ createTld("cat");
+
+ // Also persists a "contact1234" contact in the process.
+ persistActiveDomain("hello.pal");
+ persistActiveDomain("kitty.fun");
+ // Should not appear anywhere.
+ persistActiveDomain("lol.cat");
+ persistDeletedDomain("deleted.pal", DateTime.parse("1999-12-30TZ"));
+
+ persistActiveContact("contact456");
+
+ HostResource host = persistEppResource(newHostResource("old.host.test"));
+ // Set the clock to 2000-01-02, the updated host should NOT show up in RDE.
+ clock.advanceBy(Duration.standardDays(2));
+ persistEppResource(host.asBuilder().setHostName("new.host.test").build());
+
+ options.setPendings(encodePendings(PENDINGS));
+ // The EPP resources created in tests do not have all the fields populated, using a STRICT
+ // validation mode will result in a lot of warnings during marshalling.
+ options.setValidationMode("LENIENT");
+ rdePipeline = new RdePipeline(options);
+ }
+
+ @AfterEach
+ void afterEach() {
+ setTm(originalTm);
+ }
+
+ @Test
+ void testSuccess_encodeAndDecodePendingsMap() throws Exception {
+ String encodedString = encodePendings(PENDINGS);
+ assertThat(decodePendings(encodedString)).isEqualTo(PENDINGS);
+ }
+
+ @Test
+ void testSuccess_createFragments() {
+ PAssert.that(
+ rdePipeline
+ .createFragments(pipeline)
+ .apply("Group by PendingDeposit", GroupByKey.create()))
+ .satisfies(
+ kvs -> {
+ kvs.forEach(
+ kv -> {
+ // Registrar fragments.
+ assertThat(
+ getFragmentForType(kv, REGISTRAR)
+ .map(getXmlElement(REGISTRAR_NAME_PATTERN))
+ .collect(toImmutableSet()))
+ // The same registrars are attached to all the pending deposits.
+ .containsExactly("New Registrar", "The Registrar", "external_monitoring");
+ // Domain fragments.
+ assertThat(
+ getFragmentForType(kv, DOMAIN)
+ .map(getXmlElement(DOMAIN_NAME_PATTERN))
+ .anyMatch(
+ domain ->
+ // Deleted domain should not be included
+ domain.equals("deleted.pal")
+ // Only domains on the pending deposit's tld should
+ // appear.
+ || !kv.getKey()
+ .tld()
+ .equals(
+ Iterables.get(
+ Splitter.on('.').split(domain), 1))))
+ .isFalse();
+ if (kv.getKey().mode().equals(FULL)) {
+ // Contact fragments.
+ assertThat(
+ getFragmentForType(kv, CONTACT)
+ .map(getXmlElement(CONTACT_ID_PATTERN))
+ .collect(toImmutableSet()))
+ // The same contacts are attached too all pending deposits.
+ .containsExactly("contact1234", "contact456");
+ // Host fragments.
+ assertThat(
+ getFragmentForType(kv, HOST)
+ .map(getXmlElement(HOST_NAME_PATTERN))
+ .collect(toImmutableSet()))
+ // Should load the resource before update.
+ .containsExactly("old.host.test");
+ } else {
+ // BRDA does not contain contact or hosts.
+ assertThat(
+ Streams.stream(kv.getValue())
+ .anyMatch(
+ fragment ->
+ fragment.type().equals(CONTACT)
+ || fragment.type().equals(HOST)))
+ .isFalse();
+ }
+ });
+ return null;
+ });
+ pipeline.run().waitUntilFinish();
+ }
+
+ private static Function getXmlElement(String pattern) {
+ return (fragment) -> {
+ Matcher matcher = Pattern.compile(pattern).matcher(fragment.xml());
+ checkState(matcher.find(), "Missing %s in xml.", pattern);
+ return matcher.group(1);
+ };
+ }
+
+ private static Stream getFragmentForType(
+ KV> kv, RdeResourceType type) {
+ return Streams.stream(kv.getValue()).filter(fragment -> fragment.type().equals(type));
+ }
+}
diff --git a/core/src/test/java/google/registry/flows/EppPointInTimeTest.java b/core/src/test/java/google/registry/flows/EppPointInTimeTest.java
index 81a7438cb..31b087746 100644
--- a/core/src/test/java/google/registry/flows/EppPointInTimeTest.java
+++ b/core/src/test/java/google/registry/flows/EppPointInTimeTest.java
@@ -145,7 +145,7 @@ class EppPointInTimeTest {
tm().clearSessionCache();
assertAboutImmutableObjects()
.that(loadAtPointInTime(latest, timeAtCreate.plusDays(1)))
- .hasFieldsEqualTo(domainAfterCreate);
+ .isEqualExceptFields(domainAfterCreate, "updateTimestamp");
tm().clearSessionCache();
if (tm().isOfy()) {
@@ -159,18 +159,18 @@ class EppPointInTimeTest {
// second update occurred one millisecond later.
assertAboutImmutableObjects()
.that(loadAtPointInTime(latest, timeAtFirstUpdate))
- .hasFieldsEqualTo(domainAfterFirstUpdate);
+ .isEqualExceptFields(domainAfterFirstUpdate, "updateTimestamp");
}
tm().clearSessionCache();
assertAboutImmutableObjects()
.that(loadAtPointInTime(latest, timeAtSecondUpdate))
- .hasFieldsEqualTo(domainAfterSecondUpdate);
+ .isEqualExceptFields(domainAfterSecondUpdate, "updateTimestamp");
tm().clearSessionCache();
assertAboutImmutableObjects()
.that(loadAtPointInTime(latest, timeAtSecondUpdate.plusDays(1)))
- .hasFieldsEqualTo(domainAfterSecondUpdate);
+ .isEqualExceptFields(domainAfterSecondUpdate, "updateTimestamp");
// Deletion time has millisecond granularity due to isActive() check.
tm().clearSessionCache();
diff --git a/core/src/test/java/google/registry/model/contact/ContactResourceTest.java b/core/src/test/java/google/registry/model/contact/ContactResourceTest.java
index ebe10f94e..db34f5ea6 100644
--- a/core/src/test/java/google/registry/model/contact/ContactResourceTest.java
+++ b/core/src/test/java/google/registry/model/contact/ContactResourceTest.java
@@ -122,6 +122,13 @@ public class ContactResourceTest extends EntityTestCase {
contactResource = persistResource(cloneAndSetAutoTimestamps(originalContact));
}
+ @Test
+ void testContactBaseToContactResource() {
+ ImmutableObjectSubject.assertAboutImmutableObjects()
+ .that(new ContactResource.Builder().copyFrom(contactResource).build())
+ .isEqualExceptFields(contactResource, "updateTimestamp", "revisions");
+ }
+
@Test
void testCloudSqlPersistence_failWhenViolateForeignKeyConstraint() {
assertThrowForeignKeyViolation(() -> jpaTm().transact(() -> jpaTm().insert(originalContact)));
diff --git a/core/src/test/java/google/registry/model/domain/DomainBaseTest.java b/core/src/test/java/google/registry/model/domain/DomainBaseTest.java
index 3ee6faa20..05b81ae8d 100644
--- a/core/src/test/java/google/registry/model/domain/DomainBaseTest.java
+++ b/core/src/test/java/google/registry/model/domain/DomainBaseTest.java
@@ -39,6 +39,7 @@ import com.google.common.collect.Streams;
import com.googlecode.objectify.Key;
import google.registry.model.EntityTestCase;
import google.registry.model.ImmutableObject;
+import google.registry.model.ImmutableObjectSubject;
import google.registry.model.billing.BillingEvent;
import google.registry.model.billing.BillingEvent.Reason;
import google.registry.model.contact.ContactResource;
@@ -171,6 +172,13 @@ public class DomainBaseTest extends EntityTestCase {
.build()));
}
+ @Test
+ void testDomainContentToDomainBase() {
+ ImmutableObjectSubject.assertAboutImmutableObjects()
+ .that(new DomainBase.Builder().copyFrom(domain).build())
+ .isEqualExceptFields(domain, "updateTimestamp", "revisions");
+ }
+
@Test
void testPersistence() {
// Note that this only verifies that the value stored under the foreign key is the same as that
diff --git a/core/src/test/java/google/registry/model/host/HostResourceTest.java b/core/src/test/java/google/registry/model/host/HostResourceTest.java
index f902eeae2..e9ec70cd2 100644
--- a/core/src/test/java/google/registry/model/host/HostResourceTest.java
+++ b/core/src/test/java/google/registry/model/host/HostResourceTest.java
@@ -31,6 +31,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.net.InetAddresses;
import google.registry.model.EntityTestCase;
+import google.registry.model.ImmutableObjectSubject;
import google.registry.model.domain.DomainBase;
import google.registry.model.eppcommon.StatusValue;
import google.registry.model.eppcommon.Trid;
@@ -89,6 +90,13 @@ class HostResourceTest extends EntityTestCase {
.build()));
}
+ @TestOfyAndSql
+ void testHostBaseToHostResource() {
+ ImmutableObjectSubject.assertAboutImmutableObjects()
+ .that(new HostResource.Builder().copyFrom(host).build())
+ .isEqualExceptFields(host, "updateTimestamp", "revisions");
+ }
+
@TestOfyAndSql
void testPersistence() {
HostResource newHost = host.asBuilder().setRepoId("NEWHOST").build();
diff --git a/core/src/test/java/google/registry/testing/DatabaseHelper.java b/core/src/test/java/google/registry/testing/DatabaseHelper.java
index 4e68e9840..8bf82685b 100644
--- a/core/src/test/java/google/registry/testing/DatabaseHelper.java
+++ b/core/src/test/java/google/registry/testing/DatabaseHelper.java
@@ -1085,8 +1085,7 @@ public class DatabaseHelper {
.setClientId(resource.getCreationClientId())
.setType(getHistoryEntryType(resource))
.setModificationTime(tm().getTransactionTime())
- .build()
- .toChildHistoryEntity());
+ .build());
ofyTmOrDoNothing(
() -> tm().put(ForeignKeyIndex.create(resource, resource.getDeletionTime())));
});
diff --git a/core/src/test/java/google/registry/testing/DatastoreEntityExtension.java b/core/src/test/java/google/registry/testing/DatastoreEntityExtension.java
index cf61d703e..f7fd131a0 100644
--- a/core/src/test/java/google/registry/testing/DatastoreEntityExtension.java
+++ b/core/src/test/java/google/registry/testing/DatastoreEntityExtension.java
@@ -18,6 +18,8 @@ import static google.registry.model.ofy.ObjectifyService.auditedOfy;
import com.google.apphosting.api.ApiProxy;
import com.google.apphosting.api.ApiProxy.Environment;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.util.Map;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
@@ -25,7 +27,7 @@ import org.junit.jupiter.api.extension.ExtensionContext;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
/**
- * Allows instantiation of Datastore {@code Entity entities} without the heavyweight {@code
+ * Allows instantiation of Datastore {@code Entity}s without the heavyweight {@link
* AppEngineExtension}.
*
* When an Ofy key is created, by calling the various Key.create() methods, whether the current
@@ -44,6 +46,27 @@ public class DatastoreEntityExtension implements BeforeEachCallback, AfterEachCa
private static final Environment PLACEHOLDER_ENV = new PlaceholderEnvironment();
+ private boolean allThreads = false;
+
+ /**
+ * Whether all threads should be masqueraded as GAE threads.
+ *
+ *
This is particularly useful when new threads are spawned during a test. For example when
+ * testing Beam pipelines, the test pipeline runs the transforms in separate threads than the test
+ * thread. If Ofy keys are created in transforms, this value needs to be set to true.
+ *
+ *
Warning: by setting this value to true, any thread spawned by the current JVM will be have
+ * the thread local property set to the placeholder value during the execution of the current
+ * test, including those running other tests in parallel. This may or may not cause an issue when
+ * other tests have {@link AppEngineExtension} registered, that creates a much more fully
+ * functional GAE test environment. Consider moving tests using this extension to {@code
+ * outcastTest} if necessary.
+ */
+ public DatastoreEntityExtension allThreads(boolean value) {
+ allThreads = value;
+ return this;
+ }
+
@Override
public void beforeEach(ExtensionContext context) {
ApiProxy.setEnvironmentForCurrentThread(PLACEHOLDER_ENV);
@@ -51,12 +74,21 @@ public class DatastoreEntityExtension implements BeforeEachCallback, AfterEachCa
// will load the ObjectifyService class, whose static initialization block registers all Ofy
// entities.
auditedOfy();
+ if (allThreads) {
+ ApiProxy.setEnvironmentFactory(() -> PLACEHOLDER_ENV);
+ }
}
@Override
- public void afterEach(ExtensionContext context) {
+ public void afterEach(ExtensionContext context)
+ throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
// Clear the cached instance.
- ApiProxy.setEnvironmentForCurrentThread(null);
+ ApiProxy.clearEnvironmentForCurrentThread();
+ if (allThreads) {
+ Method method = ApiProxy.class.getDeclaredMethod("clearEnvironmentFactory");
+ method.setAccessible(true);
+ method.invoke(null);
+ }
}
private static final class PlaceholderEnvironment implements Environment {
diff --git a/release/cloudbuild-nomulus.yaml b/release/cloudbuild-nomulus.yaml
index 5bf60e177..62b00a24f 100644
--- a/release/cloudbuild-nomulus.yaml
+++ b/release/cloudbuild-nomulus.yaml
@@ -93,7 +93,9 @@ steps:
google.registry.beam.spec11.Spec11Pipeline \
google/registry/beam/spec11_pipeline_metadata.json \
google.registry.beam.invoicing.InvoicingPipeline \
- google/registry/beam/invoicing_pipeline_metadata.json
+ google/registry/beam/invoicing_pipeline_metadata.json \
+ google.registry.beam.rde.RdePipeline \
+ google/registry/beam/rde_pipeline_metadata.json
# Tentatively build and publish Cloud SQL schema jar here, before schema release
# process is finalized. Also publish nomulus:core jars that are needed for
# server/schema compatibility tests.