From b068e459c248371d07bf5322e929e53b7983be96 Mon Sep 17 00:00:00 2001 From: Lai Jiang Date: Wed, 30 Jun 2021 13:54:24 -0400 Subject: [PATCH] Add a Beam pipeline to generate RDE deposit (part 1) (#1219) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This is the first part of the RdeStagingAction SQL migration where the mapper logic is implemented in Beam. A few helper methods are added to convert the DomainContent, HostBase and ContactBase to their respective terminal child classes. This is necessary and possible because the child classes do not have extra fields and the base classes exist only to be embedded to other entities (such as the various HistoryEntry entities). The conversion is necessary because most of our code expects the terminal classes, such as the RdeMarshaller's various marshallXXX() methods. The alternative would be to change all the call sites, which seems to be much more disruptive. Unfortunately there is is no good way to do this conversion than just creating a builder and setting every fields there is. --- This change is [Reviewable](https://reviewable.io/reviews/google/nomulus/1219) --- core/build.gradle | 4 + .../google/registry/beam/rde/RdePipeline.java | 260 ++++++++++++++++++ .../registry/beam/rde/RdePipelineOptions.java | 32 +++ .../registry/model/EppResourceUtils.java | 2 +- .../model/contact/ContactHistory.java | 3 +- .../model/contact/ContactResource.java | 21 ++ .../registry/model/domain/DomainBase.java | 29 ++ .../registry/model/domain/DomainHistory.java | 15 +- .../google/registry/model/host/HostBase.java | 2 +- .../registry/model/host/HostHistory.java | 2 +- .../registry/model/host/HostResource.java | 16 ++ .../google/registry/rde/PendingDeposit.java | 74 +++-- .../google/registry/rde/RdeFragmenter.java | 109 ++++++++ .../google/registry/rde/RdeStagingMapper.java | 77 +----- .../registry/beam/rde_pipeline_metadata.json | 23 ++ .../registry/beam/rde/RdePipelineTest.java | 257 +++++++++++++++++ .../registry/flows/EppPointInTimeTest.java | 8 +- .../model/contact/ContactResourceTest.java | 7 + .../registry/model/domain/DomainBaseTest.java | 8 + .../registry/model/host/HostResourceTest.java | 8 + .../registry/testing/DatabaseHelper.java | 3 +- .../testing/DatastoreEntityExtension.java | 38 ++- release/cloudbuild-nomulus.yaml | 4 +- 23 files changed, 887 insertions(+), 115 deletions(-) create mode 100644 core/src/main/java/google/registry/beam/rde/RdePipeline.java create mode 100644 core/src/main/java/google/registry/beam/rde/RdePipelineOptions.java create mode 100644 core/src/main/java/google/registry/rde/RdeFragmenter.java create mode 100644 core/src/main/resources/google/registry/beam/rde_pipeline_metadata.json create mode 100644 core/src/test/java/google/registry/beam/rde/RdePipelineTest.java diff --git a/core/build.gradle b/core/build.gradle index 9cb919016..ca7809029 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -780,6 +780,10 @@ if (environment in ['alpha', 'crash']) { mainClass: 'google.registry.beam.invoicing.InvoicingPipeline', metaData: 'google/registry/beam/invoicing_pipeline_metadata.json' ], + [ + mainClass: 'google.registry.beam.rde.RdePipeline', + metaData: 'google/registry/beam/rde_pipeline_metadata.json' + ], ] project.tasks.create("stage_beam_pipelines") { doLast { diff --git a/core/src/main/java/google/registry/beam/rde/RdePipeline.java b/core/src/main/java/google/registry/beam/rde/RdePipeline.java new file mode 100644 index 000000000..c59f19eee --- /dev/null +++ b/core/src/main/java/google/registry/beam/rde/RdePipeline.java @@ -0,0 +1,260 @@ +// Copyright 2021 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam.rde; + +import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static google.registry.model.EppResourceUtils.loadAtPointInTimeAsync; +import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ImmutableSetMultimap; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.common.io.BaseEncoding; +import google.registry.beam.common.RegistryJpaIO; +import google.registry.model.EppResource; +import google.registry.model.contact.ContactResource; +import google.registry.model.domain.DomainBase; +import google.registry.model.host.HostResource; +import google.registry.model.rde.RdeMode; +import google.registry.model.registrar.Registrar; +import google.registry.model.registrar.Registrar.Type; +import google.registry.persistence.VKey; +import google.registry.rde.DepositFragment; +import google.registry.rde.PendingDeposit; +import google.registry.rde.PendingDeposit.PendingDepositCoder; +import google.registry.rde.RdeFragmenter; +import google.registry.rde.RdeMarshaller; +import google.registry.xml.ValidationMode; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.function.Supplier; +import javax.persistence.Entity; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.FlatMapElements; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.Reshuffle; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.joda.time.DateTime; + +/** + * Definition of a Dataflow Flex template, which generates RDE/BRDA deposits. + * + *

To stage this template locally, run the {@code stage_beam_pipeline.sh} shell script. + * + *

Then, you can run the staged template via the API client library, gCloud or a raw REST call. + * + * @see Using + * Flex Templates + */ +public class RdePipeline implements Serializable { + + private final RdeMarshaller marshaller; + private final ImmutableSetMultimap pendings; + + // Registrars to be excluded from data escrow. Not including the sandbox-only OTE type so that + // if sneaks into production we would get an extra signal. + private static final ImmutableSet IGNORED_REGISTRAR_TYPES = + Sets.immutableEnumSet(Registrar.Type.MONITORING, Registrar.Type.TEST); + + private static final String EPP_RESOURCE_QUERY = + "SELECT id FROM %entity% " + + "WHERE COALESCE(creationClientId, '') NOT LIKE 'prober-%' " + + "AND COALESCE(currentSponsorClientId, '') NOT LIKE 'prober-%' " + + "AND COALESCE(lastEppUpdateClientId, '') NOT LIKE 'prober-%'"; + + public static String createEppResourceQuery(Class clazz) { + return EPP_RESOURCE_QUERY.replace("%entity%", clazz.getAnnotation(Entity.class).name()) + + (clazz.equals(DomainBase.class) ? " AND tld in (:tlds)" : ""); + } + + RdePipeline(RdePipelineOptions options) throws IOException, ClassNotFoundException { + this.marshaller = new RdeMarshaller(ValidationMode.valueOf(options.getValidationMode())); + this.pendings = decodePendings(options.getPendings()); + } + + @VisibleForTesting + PipelineResult run(Pipeline pipeline) { + createFragments(pipeline); + return pipeline.run(); + } + + PipelineResult run() { + return run(Pipeline.create()); + } + + PCollection> createFragments(Pipeline pipeline) { + PCollection> fragments = + PCollectionList.of(processRegistrars(pipeline)) + .and(processNonRegistrarEntities(pipeline, DomainBase.class)) + .and(processNonRegistrarEntities(pipeline, ContactResource.class)) + .and(processNonRegistrarEntities(pipeline, HostResource.class)) + .apply(Flatten.pCollections()) + .setCoder( + KvCoder.of(PendingDepositCoder.of(), SerializableCoder.of(DepositFragment.class))); + return fragments; + } + + PCollection> processRegistrars(Pipeline pipeline) { + return pipeline + .apply( + "Read all production Registrar entities", + RegistryJpaIO.read( + "SELECT clientIdentifier FROM Registrar WHERE type NOT IN (:types)", + ImmutableMap.of("types", IGNORED_REGISTRAR_TYPES), + String.class, + // TODO: consider adding coders for entities and pass them directly instead of using + // VKeys. + id -> VKey.createSql(Registrar.class, id))) + .apply( + "Marshal Registrar into DepositFragment", + FlatMapElements.into( + TypeDescriptors.kvs( + TypeDescriptor.of(PendingDeposit.class), + TypeDescriptor.of(DepositFragment.class))) + .via( + (VKey key) -> { + Registrar registrar = jpaTm().transact(() -> jpaTm().loadByKey(key)); + DepositFragment fragment = marshaller.marshalRegistrar(registrar); + return pendings.values().stream() + .map(pending -> KV.of(pending, fragment)) + .collect(toImmutableSet()); + })); + } + + @SuppressWarnings("deprecation") // Reshuffle is still recommended by Dataflow. + + PCollection> processNonRegistrarEntities( + Pipeline pipeline, Class clazz) { + return createInputs(pipeline, clazz) + .apply("Marshal " + clazz.getSimpleName() + " into DepositFragment", mapToFragments(clazz)) + .setCoder(KvCoder.of(PendingDepositCoder.of(), SerializableCoder.of(DepositFragment.class))) + .apply( + "Reshuffle KV of " + + clazz.getSimpleName() + + " to prevent fusion", + Reshuffle.of()); + } + + PCollection> createInputs(Pipeline pipeline, Class clazz) { + return pipeline.apply( + "Read all production " + clazz.getSimpleName() + " entities", + RegistryJpaIO.read( + createEppResourceQuery(clazz), + clazz.equals(DomainBase.class) + ? ImmutableMap.of("tlds", pendings.keySet()) + : ImmutableMap.of(), + String.class, + // TODO: consider adding coders for entities and pass them directly instead of using + // VKeys. + x -> VKey.create(clazz, x))); + } + + + FlatMapElements, KV> mapToFragments(Class clazz) { + return FlatMapElements.into( + TypeDescriptors.kvs( + TypeDescriptor.of(PendingDeposit.class), TypeDescriptor.of(DepositFragment.class))) + .via( + (VKey key) -> { + T resource = jpaTm().transact(() -> jpaTm().loadByKey(key)); + // The set of all TLDs to which this resource should be emitted. + ImmutableSet tlds = + clazz.equals(DomainBase.class) + ? ImmutableSet.of(((DomainBase) resource).getTld()) + : pendings.keySet(); + // Get the set of all point-in-time watermarks we need, to minimize rewinding. + ImmutableSet dates = + tlds.stream() + .map(pendings::get) + .flatMap(ImmutableSet::stream) + .map(PendingDeposit::watermark) + .collect(toImmutableSet()); + // Launch asynchronous fetches of point-in-time representations of resource. + ImmutableMap> resourceAtTimes = + ImmutableMap.copyOf( + Maps.asMap(dates, input -> loadAtPointInTimeAsync(resource, input))); + // Convert resource to an XML fragment for each watermark/mode pair lazily and cache + // the result. + RdeFragmenter fragmenter = new RdeFragmenter(resourceAtTimes, marshaller); + List> results = new ArrayList<>(); + for (String tld : tlds) { + for (PendingDeposit pending : pendings.get(tld)) { + // Hosts and contacts don't get included in BRDA deposits. + if (pending.mode() == RdeMode.THIN && !clazz.equals(DomainBase.class)) { + continue; + } + Optional fragment = + fragmenter.marshal(pending.watermark(), pending.mode()); + fragment.ifPresent( + depositFragment -> results.add(KV.of(pending, depositFragment))); + } + } + return results; + }); + } + + /** + * Decodes the pipeline option extracted from the URL parameter sent by the pipeline launcher to + * the original TLD to pending deposit map. + */ + @SuppressWarnings("unchecked") + static ImmutableSetMultimap decodePendings(String encodedPending) + throws IOException, ClassNotFoundException { + try (ObjectInputStream ois = + new ObjectInputStream( + new ByteArrayInputStream( + BaseEncoding.base64Url().omitPadding().decode(encodedPending)))) { + return (ImmutableSetMultimap) ois.readObject(); + } + } + + /** + * Encodes the TLD to pending deposit map in an URL safe string that is sent to the pipeline + * worker by the pipeline launcher as a pipeline option. + */ + public static String encodePendings(ImmutableSetMultimap pendings) + throws IOException { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + ObjectOutputStream oos = new ObjectOutputStream(baos); + oos.writeObject(pendings); + oos.flush(); + return BaseEncoding.base64Url().omitPadding().encode(baos.toByteArray()); + } + } + + public static void main(String[] args) throws IOException, ClassNotFoundException { + PipelineOptionsFactory.register(RdePipelineOptions.class); + RdePipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(RdePipelineOptions.class); + new RdePipeline(options).run(); + } +} diff --git a/core/src/main/java/google/registry/beam/rde/RdePipelineOptions.java b/core/src/main/java/google/registry/beam/rde/RdePipelineOptions.java new file mode 100644 index 000000000..d82de1ce8 --- /dev/null +++ b/core/src/main/java/google/registry/beam/rde/RdePipelineOptions.java @@ -0,0 +1,32 @@ +// Copyright 2021 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam.rde; + +import google.registry.beam.common.RegistryPipelineOptions; +import org.apache.beam.sdk.options.Description; + +/** Custom options for running the spec11 pipeline. */ +public interface RdePipelineOptions extends RegistryPipelineOptions { + + @Description("The Base64-encoded serialized map of TLDs to PendingDeposit.") + String getPendings(); + + void setPendings(String value); + + @Description("The validation mode (LENIENT|STRICT) that the RDE marshaller uses.") + String getValidationMode(); + + void setValidationMode(String value); +} diff --git a/core/src/main/java/google/registry/model/EppResourceUtils.java b/core/src/main/java/google/registry/model/EppResourceUtils.java index cc943bf6d..29a5dd7c3 100644 --- a/core/src/main/java/google/registry/model/EppResourceUtils.java +++ b/core/src/main/java/google/registry/model/EppResourceUtils.java @@ -398,7 +398,7 @@ public final class EppResourceUtils { .orElse(null); if (resourceAtPointInTime == null) { logger.atSevere().log( - "Couldn't load resource at % for key %s, falling back to resource %s.", + "Couldn't load resource at %s for key %s, falling back to resource %s.", timestamp, resource.createVKey(), resource); return resource; } diff --git a/core/src/main/java/google/registry/model/contact/ContactHistory.java b/core/src/main/java/google/registry/model/contact/ContactHistory.java index 4f5ac5cdb..57f5deadb 100644 --- a/core/src/main/java/google/registry/model/contact/ContactHistory.java +++ b/core/src/main/java/google/registry/model/contact/ContactHistory.java @@ -110,7 +110,8 @@ public class ContactHistory extends HistoryEntry implements SqlEntity { @Override public Optional getResourceAtPointInTime() { - return getContactBase(); + return getContactBase() + .map(contactBase -> new ContactResource.Builder().copyFrom(contactBase).build()); } @PostLoad diff --git a/core/src/main/java/google/registry/model/contact/ContactResource.java b/core/src/main/java/google/registry/model/contact/ContactResource.java index c41feea20..8258202dc 100644 --- a/core/src/main/java/google/registry/model/contact/ContactResource.java +++ b/core/src/main/java/google/registry/model/contact/ContactResource.java @@ -79,5 +79,26 @@ public class ContactResource extends ContactBase private Builder(ContactResource instance) { super(instance); } + + public Builder copyFrom(ContactBase contactBase) { + return this.setAuthInfo(contactBase.getAuthInfo()) + .setContactId(contactBase.getContactId()) + .setCreationClientId(contactBase.getCreationClientId()) + .setCreationTime(contactBase.getCreationTime()) + .setDeletionTime(contactBase.getDeletionTime()) + .setDisclose(contactBase.getDisclose()) + .setEmailAddress(contactBase.getEmailAddress()) + .setFaxNumber(contactBase.getFaxNumber()) + .setInternationalizedPostalInfo(contactBase.getInternationalizedPostalInfo()) + .setLastTransferTime(contactBase.getLastTransferTime()) + .setLastEppUpdateClientId(contactBase.getLastEppUpdateClientId()) + .setLastEppUpdateTime(contactBase.getLastEppUpdateTime()) + .setLocalizedPostalInfo(contactBase.getLocalizedPostalInfo()) + .setPersistedCurrentSponsorClientId(contactBase.getPersistedCurrentSponsorClientId()) + .setRepoId(contactBase.getRepoId()) + .setStatusValues(contactBase.getStatusValues()) + .setTransferData(contactBase.getTransferData()) + .setVoiceNumber(contactBase.getVoiceNumber()); + } } } diff --git a/core/src/main/java/google/registry/model/domain/DomainBase.java b/core/src/main/java/google/registry/model/domain/DomainBase.java index 633ddfde7..acdce32e7 100644 --- a/core/src/main/java/google/registry/model/domain/DomainBase.java +++ b/core/src/main/java/google/registry/model/domain/DomainBase.java @@ -181,5 +181,34 @@ public class DomainBase extends DomainContent Builder(DomainBase instance) { super(instance); } + + public Builder copyFrom(DomainContent domainContent) { + return this.setAuthInfo(domainContent.getAuthInfo()) + .setAutorenewPollMessage(domainContent.getAutorenewPollMessage()) + .setAutorenewBillingEvent(domainContent.getAutorenewBillingEvent()) + .setAutorenewEndTime(domainContent.getAutorenewEndTime()) + .setContacts(domainContent.getContacts()) + .setCreationClientId(domainContent.getCreationClientId()) + .setCreationTime(domainContent.getCreationTime()) + .setDomainName(domainContent.getDomainName()) + .setDeletePollMessage(domainContent.getDeletePollMessage()) + .setDsData(domainContent.getDsData()) + .setDeletionTime(domainContent.getDeletionTime()) + .setGracePeriods(domainContent.getGracePeriods()) + .setIdnTableName(domainContent.getIdnTableName()) + .setLastTransferTime(domainContent.getLastTransferTime()) + .setLaunchNotice(domainContent.getLaunchNotice()) + .setLastEppUpdateClientId(domainContent.getLastEppUpdateClientId()) + .setLastEppUpdateTime(domainContent.getLastEppUpdateTime()) + .setNameservers(domainContent.getNameservers()) + .setPersistedCurrentSponsorClientId(domainContent.getPersistedCurrentSponsorClientId()) + .setRegistrant(domainContent.getRegistrant()) + .setRegistrationExpirationTime(domainContent.getRegistrationExpirationTime()) + .setRepoId(domainContent.getRepoId()) + .setSmdId(domainContent.getSmdId()) + .setSubordinateHosts(domainContent.getSubordinateHosts()) + .setStatusValues(domainContent.getStatusValues()) + .setTransferData(domainContent.getTransferData()); + } } } diff --git a/core/src/main/java/google/registry/model/domain/DomainHistory.java b/core/src/main/java/google/registry/model/domain/DomainHistory.java index 9fb4721e8..68e841c89 100644 --- a/core/src/main/java/google/registry/model/domain/DomainHistory.java +++ b/core/src/main/java/google/registry/model/domain/DomainHistory.java @@ -252,11 +252,18 @@ public class DomainHistory extends HistoryEntry implements SqlEntity { @Override public Optional getResourceAtPointInTime() { - return getDomainContent(); + return getDomainContent() + .map(domainContent -> new DomainBase.Builder().copyFrom(domainContent).build()); } @PostLoad void postLoad() { + // TODO(b/188044616): Determine why Eager loading doesn't work here. + Hibernate.initialize(domainTransactionRecords); + Hibernate.initialize(nsHosts); + Hibernate.initialize(dsDataHistories); + Hibernate.initialize(gracePeriodHistories); + if (domainContent != null) { domainContent.nsHosts = nullToEmptyImmutableCopy(nsHosts); domainContent.gracePeriods = @@ -278,12 +285,6 @@ public class DomainHistory extends HistoryEntry implements SqlEntity { } } } - - // TODO(b/188044616): Determine why Eager loading doesn't work here. - Hibernate.initialize(domainTransactionRecords); - Hibernate.initialize(nsHosts); - Hibernate.initialize(dsDataHistories); - Hibernate.initialize(gracePeriodHistories); } // In Datastore, save as a HistoryEntry object regardless of this object's type diff --git a/core/src/main/java/google/registry/model/host/HostBase.java b/core/src/main/java/google/registry/model/host/HostBase.java index df4a41b5c..6b9f10b8e 100644 --- a/core/src/main/java/google/registry/model/host/HostBase.java +++ b/core/src/main/java/google/registry/model/host/HostBase.java @@ -137,7 +137,7 @@ public class HostBase extends EppResource { } @Override - public Builder asBuilder() { + public Builder asBuilder() { return new Builder<>(clone(this)); } diff --git a/core/src/main/java/google/registry/model/host/HostHistory.java b/core/src/main/java/google/registry/model/host/HostHistory.java index 1313d4a50..02d0661c3 100644 --- a/core/src/main/java/google/registry/model/host/HostHistory.java +++ b/core/src/main/java/google/registry/model/host/HostHistory.java @@ -111,7 +111,7 @@ public class HostHistory extends HistoryEntry implements SqlEntity { @Override public Optional getResourceAtPointInTime() { - return getHostBase(); + return getHostBase().map(hostBase -> new HostResource.Builder().copyFrom(hostBase).build()); } @PostLoad diff --git a/core/src/main/java/google/registry/model/host/HostResource.java b/core/src/main/java/google/registry/model/host/HostResource.java index 0338b8d1d..185b3219d 100644 --- a/core/src/main/java/google/registry/model/host/HostResource.java +++ b/core/src/main/java/google/registry/model/host/HostResource.java @@ -63,5 +63,21 @@ public class HostResource extends HostBase private Builder(HostResource instance) { super(instance); } + + public Builder copyFrom(HostBase hostBase) { + return this.setCreationClientId(hostBase.getCreationClientId()) + .setCreationTime(hostBase.getCreationTime()) + .setDeletionTime(hostBase.getDeletionTime()) + .setHostName(hostBase.getHostName()) + .setInetAddresses(hostBase.getInetAddresses()) + .setLastTransferTime(hostBase.getLastTransferTime()) + .setLastSuperordinateChange(hostBase.getLastSuperordinateChange()) + .setLastEppUpdateClientId(hostBase.getLastEppUpdateClientId()) + .setLastEppUpdateTime(hostBase.getLastEppUpdateTime()) + .setPersistedCurrentSponsorClientId(hostBase.getPersistedCurrentSponsorClientId()) + .setRepoId(hostBase.getRepoId()) + .setSuperordinateDomain(hostBase.getSuperordinateDomain()) + .setStatusValues(hostBase.getStatusValues()); + } } } diff --git a/core/src/main/java/google/registry/rde/PendingDeposit.java b/core/src/main/java/google/registry/rde/PendingDeposit.java index 9e40c24a6..c8bca8823 100644 --- a/core/src/main/java/google/registry/rde/PendingDeposit.java +++ b/core/src/main/java/google/registry/rde/PendingDeposit.java @@ -17,8 +17,17 @@ package google.registry.rde; import com.google.auto.value.AutoValue; import google.registry.model.common.Cursor.CursorType; import google.registry.model.rde.RdeMode; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.io.Serializable; import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.BooleanCoder; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; import org.joda.time.DateTime; import org.joda.time.Duration; @@ -69,17 +78,9 @@ public abstract class PendingDeposit implements Serializable { @Nullable public abstract Integer revision(); - static PendingDeposit create( + public static PendingDeposit create( String tld, DateTime watermark, RdeMode mode, CursorType cursor, Duration interval) { - return new AutoValue_PendingDeposit( - false, - tld, - watermark, - mode, - cursor, - interval, - null, - null); + return new AutoValue_PendingDeposit(false, tld, watermark, mode, cursor, interval, null, null); } static PendingDeposit createInManualOperation( @@ -89,15 +90,52 @@ public abstract class PendingDeposit implements Serializable { String directoryWithTrailingSlash, @Nullable Integer revision) { return new AutoValue_PendingDeposit( - true, - tld, - watermark, - mode, - null, - null, - directoryWithTrailingSlash, - revision); + true, tld, watermark, mode, null, null, directoryWithTrailingSlash, revision); } PendingDeposit() {} + + /** + * A deterministic coder for {@link PendingDeposit} used during a GroupBy transform. + * + *

We cannot use a {@link SerializableCoder} directly because it does not guarantee + * determinism, which is required by GroupBy. + */ + public static class PendingDepositCoder extends AtomicCoder { + + private PendingDepositCoder() { + super(); + } + + private static final PendingDepositCoder INSTANCE = new PendingDepositCoder(); + + public static PendingDepositCoder of() { + return INSTANCE; + } + + @Override + public void encode(PendingDeposit value, OutputStream outStream) throws IOException { + BooleanCoder.of().encode(value.manual(), outStream); + StringUtf8Coder.of().encode(value.tld(), outStream); + SerializableCoder.of(DateTime.class).encode(value.watermark(), outStream); + SerializableCoder.of(RdeMode.class).encode(value.mode(), outStream); + NullableCoder.of(SerializableCoder.of(CursorType.class)).encode(value.cursor(), outStream); + NullableCoder.of(SerializableCoder.of(Duration.class)).encode(value.interval(), outStream); + NullableCoder.of(StringUtf8Coder.of()).encode(value.directoryWithTrailingSlash(), outStream); + NullableCoder.of(VarIntCoder.of()).encode(value.revision(), outStream); + } + + @Override + public PendingDeposit decode(InputStream inStream) throws IOException { + return new AutoValue_PendingDeposit( + BooleanCoder.of().decode(inStream), + StringUtf8Coder.of().decode(inStream), + SerializableCoder.of(DateTime.class).decode(inStream), + SerializableCoder.of(RdeMode.class).decode(inStream), + NullableCoder.of(SerializableCoder.of(CursorType.class)).decode(inStream), + NullableCoder.of(SerializableCoder.of(Duration.class)).decode(inStream), + NullableCoder.of(StringUtf8Coder.of()).decode(inStream), + NullableCoder.of(VarIntCoder.of()).decode(inStream)); + } + } } diff --git a/core/src/main/java/google/registry/rde/RdeFragmenter.java b/core/src/main/java/google/registry/rde/RdeFragmenter.java new file mode 100644 index 000000000..660037cc3 --- /dev/null +++ b/core/src/main/java/google/registry/rde/RdeFragmenter.java @@ -0,0 +1,109 @@ +// Copyright 2021 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.rde; + +import static google.registry.model.EppResourceUtils.loadAtPointInTime; +import static google.registry.persistence.transaction.TransactionManagerFactory.tm; + +import com.google.auto.value.AutoValue; +import com.google.common.collect.ImmutableMap; +import google.registry.model.EppResource; +import google.registry.model.contact.ContactResource; +import google.registry.model.domain.DomainBase; +import google.registry.model.host.HostResource; +import google.registry.model.rde.RdeMode; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Supplier; +import org.joda.time.DateTime; + +/** Loading cache that turns a resource into XML for the various points in time and modes. */ +public class RdeFragmenter { + private final Map> cache = new HashMap<>(); + private final ImmutableMap> resourceAtTimes; + private final RdeMarshaller marshaller; + + long cacheHits = 0; + long resourcesNotFound = 0; + long resourcesFound = 0; + + public RdeFragmenter( + ImmutableMap> resourceAtTimes, RdeMarshaller marshaller) { + this.resourceAtTimes = resourceAtTimes; + this.marshaller = marshaller; + } + + public Optional marshal(DateTime watermark, RdeMode mode) { + Optional result = cache.get(WatermarkModePair.create(watermark, mode)); + if (result != null) { + cacheHits++; + return result; + } + EppResource resource = resourceAtTimes.get(watermark).get(); + if (resource == null) { + result = Optional.empty(); + cache.put(WatermarkModePair.create(watermark, RdeMode.FULL), result); + cache.put(WatermarkModePair.create(watermark, RdeMode.THIN), result); + resourcesNotFound++; + return result; + } + resourcesFound++; + if (resource instanceof DomainBase) { + result = Optional.of(marshaller.marshalDomain((DomainBase) resource, mode)); + cache.put(WatermarkModePair.create(watermark, mode), result); + return result; + } else if (resource instanceof ContactResource) { + result = Optional.of(marshaller.marshalContact((ContactResource) resource)); + cache.put(WatermarkModePair.create(watermark, RdeMode.FULL), result); + cache.put(WatermarkModePair.create(watermark, RdeMode.THIN), result); + return result; + } else if (resource instanceof HostResource) { + HostResource host = (HostResource) resource; + result = + Optional.of( + host.isSubordinate() + ? marshaller.marshalSubordinateHost( + host, + // Note that loadAtPointInTime() does cloneProjectedAtTime(watermark) for + // us. + Objects.requireNonNull( + loadAtPointInTime( + tm().loadByKey(host.getSuperordinateDomain()), watermark))) + : marshaller.marshalExternalHost(host)); + cache.put(WatermarkModePair.create(watermark, RdeMode.FULL), result); + cache.put(WatermarkModePair.create(watermark, RdeMode.THIN), result); + return result; + } else { + throw new IllegalStateException( + String.format( + "Resource %s of type %s cannot be converted to XML.", + resource, resource.getClass().getSimpleName())); + } + } + + /** Map key for {@link RdeFragmenter} cache. */ + @AutoValue + abstract static class WatermarkModePair { + abstract DateTime watermark(); + + abstract RdeMode mode(); + + static WatermarkModePair create(DateTime watermark, RdeMode mode) { + return new AutoValue_RdeFragmenter_WatermarkModePair(watermark, mode); + } + } +} diff --git a/core/src/main/java/google/registry/rde/RdeStagingMapper.java b/core/src/main/java/google/registry/rde/RdeStagingMapper.java index ceecc40ea..a9c45aaf7 100644 --- a/core/src/main/java/google/registry/rde/RdeStagingMapper.java +++ b/core/src/main/java/google/registry/rde/RdeStagingMapper.java @@ -16,12 +16,10 @@ package google.registry.rde; import static com.google.common.base.Strings.nullToEmpty; import static com.google.common.collect.ImmutableSet.toImmutableSet; -import static google.registry.model.EppResourceUtils.loadAtPointInTime; import static google.registry.model.EppResourceUtils.loadAtPointInTimeAsync; import static google.registry.persistence.transaction.TransactionManagerFactory.tm; import com.google.appengine.tools.mapreduce.Mapper; -import com.google.auto.value.AutoValue; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSetMultimap; @@ -34,9 +32,6 @@ import google.registry.model.host.HostResource; import google.registry.model.rde.RdeMode; import google.registry.model.registrar.Registrar; import google.registry.xml.ValidationMode; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.function.Supplier; import org.joda.time.DateTime; @@ -129,7 +124,7 @@ public final class RdeStagingMapper extends Mapper loadAtPointInTimeAsync(resource, input))); // Convert resource to an XML fragment for each watermark/mode pair lazily and cache the result. - Fragmenter fragmenter = new Fragmenter(resourceAtTimes); + RdeFragmenter fragmenter = new RdeFragmenter(resourceAtTimes, marshaller); // Emit resource as an XML fragment for all TLDs and modes pending deposit. long resourcesEmitted = 0; @@ -157,74 +152,4 @@ public final class RdeStagingMapper extends Mapper> cache = new HashMap<>(); - private final ImmutableMap> resourceAtTimes; - - long cacheHits = 0; - long resourcesNotFound = 0; - long resourcesFound = 0; - - Fragmenter(ImmutableMap> resourceAtTimes) { - this.resourceAtTimes = resourceAtTimes; - } - - Optional marshal(DateTime watermark, RdeMode mode) { - Optional result = cache.get(WatermarkModePair.create(watermark, mode)); - if (result != null) { - cacheHits++; - return result; - } - EppResource resource = resourceAtTimes.get(watermark).get(); - if (resource == null) { - result = Optional.empty(); - cache.put(WatermarkModePair.create(watermark, RdeMode.FULL), result); - cache.put(WatermarkModePair.create(watermark, RdeMode.THIN), result); - resourcesNotFound++; - return result; - } - resourcesFound++; - if (resource instanceof DomainBase) { - result = Optional.of(marshaller.marshalDomain((DomainBase) resource, mode)); - cache.put(WatermarkModePair.create(watermark, mode), result); - return result; - } else if (resource instanceof ContactResource) { - result = Optional.of(marshaller.marshalContact((ContactResource) resource)); - cache.put(WatermarkModePair.create(watermark, RdeMode.FULL), result); - cache.put(WatermarkModePair.create(watermark, RdeMode.THIN), result); - return result; - } else if (resource instanceof HostResource) { - HostResource host = (HostResource) resource; - result = - Optional.of( - host.isSubordinate() - ? marshaller.marshalSubordinateHost( - host, - // Note that loadAtPointInTime() does cloneProjectedAtTime(watermark) for - // us. - Objects.requireNonNull( - loadAtPointInTime( - tm().loadByKey(host.getSuperordinateDomain()), watermark))) - : marshaller.marshalExternalHost(host)); - cache.put(WatermarkModePair.create(watermark, RdeMode.FULL), result); - cache.put(WatermarkModePair.create(watermark, RdeMode.THIN), result); - return result; - } else { - throw new AssertionError(resource.toString()); - } - } - } - - /** Map key for {@link Fragmenter} cache. */ - @AutoValue - abstract static class WatermarkModePair { - abstract DateTime watermark(); - abstract RdeMode mode(); - - static WatermarkModePair create(DateTime watermark, RdeMode mode) { - return new AutoValue_RdeStagingMapper_WatermarkModePair(watermark, mode); - } - } } diff --git a/core/src/main/resources/google/registry/beam/rde_pipeline_metadata.json b/core/src/main/resources/google/registry/beam/rde_pipeline_metadata.json new file mode 100644 index 000000000..a88f69dd0 --- /dev/null +++ b/core/src/main/resources/google/registry/beam/rde_pipeline_metadata.json @@ -0,0 +1,23 @@ +{ + "name": "RDE/BRDA Deposit Generation", + "description": "An Apache Beam pipeline generates RDE or BRDA deposits and deposits them to GCS with GhostRyde encryption.", + "parameters": [ + { + "name": "pendings", + "label": "The pendings deposits to generate.", + "helpText": "A TLD to PendingDeposit map that is serialized and Base64 URL-safe encoded.", + "regexes": [ + "A-Za-z0-9\\-_" + ] + }, + { + "name": "validationMode", + "label": "How strict the marshaller validates the given EPP resources.", + "helpText": "If set to LENIENT the marshaller will not warn about missing data on the EPP resources.", + "is_optional": true, + "regexes": [ + "^STRICT|LENIENT$" + ] + } + ] +} diff --git a/core/src/test/java/google/registry/beam/rde/RdePipelineTest.java b/core/src/test/java/google/registry/beam/rde/RdePipelineTest.java new file mode 100644 index 000000000..2610ef685 --- /dev/null +++ b/core/src/test/java/google/registry/beam/rde/RdePipelineTest.java @@ -0,0 +1,257 @@ +// Copyright 2021 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam.rde; + +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static com.google.common.truth.Truth.assertThat; +import static google.registry.beam.rde.RdePipeline.decodePendings; +import static google.registry.beam.rde.RdePipeline.encodePendings; +import static google.registry.model.common.Cursor.CursorType.RDE_STAGING; +import static google.registry.model.rde.RdeMode.FULL; +import static google.registry.model.rde.RdeMode.THIN; +import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; +import static google.registry.persistence.transaction.TransactionManagerFactory.setTm; +import static google.registry.persistence.transaction.TransactionManagerFactory.tm; +import static google.registry.rde.RdeResourceType.CONTACT; +import static google.registry.rde.RdeResourceType.DOMAIN; +import static google.registry.rde.RdeResourceType.HOST; +import static google.registry.rde.RdeResourceType.REGISTRAR; +import static google.registry.testing.AppEngineExtension.loadInitialData; +import static google.registry.testing.DatabaseHelper.createTld; +import static google.registry.testing.DatabaseHelper.newHostResource; +import static google.registry.testing.DatabaseHelper.persistActiveContact; +import static google.registry.testing.DatabaseHelper.persistActiveDomain; +import static google.registry.testing.DatabaseHelper.persistDeletedDomain; +import static google.registry.testing.DatabaseHelper.persistEppResource; +import static google.registry.testing.DatabaseHelper.persistNewRegistrar; +import static org.joda.time.Duration.standardDays; + +import com.google.common.base.Splitter; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSetMultimap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Streams; +import google.registry.beam.TestPipelineExtension; +import google.registry.model.host.HostResource; +import google.registry.model.registrar.Registrar; +import google.registry.model.registrar.Registrar.State; +import google.registry.persistence.transaction.JpaTestRules; +import google.registry.persistence.transaction.JpaTestRules.JpaIntegrationTestExtension; +import google.registry.persistence.transaction.TransactionManager; +import google.registry.rde.DepositFragment; +import google.registry.rde.PendingDeposit; +import google.registry.rde.RdeResourceType; +import google.registry.testing.DatastoreEntityExtension; +import google.registry.testing.FakeClock; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Stream; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.values.KV; +import org.joda.time.DateTime; +import org.joda.time.Duration; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +/** Unit tests for {@link RdePipeline}. */ +public class RdePipelineTest { + + private static final String REGISTRAR_NAME_PATTERN = + "(.*)"; + + private static final String DOMAIN_NAME_PATTERN = "(.*)"; + + private static final String CONTACT_ID_PATTERN = "(.*)"; + + private static final String HOST_NAME_PATTERN = "(.*)"; + + private static final ImmutableSetMultimap PENDINGS = + ImmutableSetMultimap.of( + "pal", + PendingDeposit.create( + "pal", DateTime.parse("2000-01-01TZ"), FULL, RDE_STAGING, standardDays(1)), + "pal", + PendingDeposit.create( + "pal", DateTime.parse("2000-01-01TZ"), THIN, RDE_STAGING, standardDays(1)), + "fun", + PendingDeposit.create( + "fun", DateTime.parse("2000-01-01TZ"), FULL, RDE_STAGING, standardDays(1))); + + // This is the default creation time for test data. + private final FakeClock clock = new FakeClock(DateTime.parse("1999-12-31TZ")); + + // The pipeline runs in a different thread, which needs to be masqueraded as a GAE thread as well. + @RegisterExtension + @Order(Order.DEFAULT - 1) + final DatastoreEntityExtension datastore = new DatastoreEntityExtension().allThreads(true); + + @RegisterExtension + final JpaIntegrationTestExtension database = + new JpaTestRules.Builder().withClock(clock).buildIntegrationTestRule(); + + @RegisterExtension + final TestPipelineExtension pipeline = + TestPipelineExtension.create().enableAbandonedNodeEnforcement(true); + + private final RdePipelineOptions options = + PipelineOptionsFactory.create().as(RdePipelineOptions.class); + + private RdePipeline rdePipeline; + + private TransactionManager originalTm; + + @BeforeEach + void beforeEach() throws Exception { + originalTm = tm(); + setTm(jpaTm()); + loadInitialData(); + + // Two real registrars have been created by loadInitialData(), named "New Registrar" and "The + // Registrar". Create one included registrar (external_monitoring) and two excluded ones. + Registrar monitoringRegistrar = + persistNewRegistrar("monitoring", "monitoring", Registrar.Type.MONITORING, null); + Registrar testRegistrar = persistNewRegistrar("test", "test", Registrar.Type.TEST, null); + Registrar externalMonitoringRegistrar = + persistNewRegistrar( + "externalmonitor", "external_monitoring", Registrar.Type.EXTERNAL_MONITORING, 9997L); + // Set Registrar states which are required for reporting. + tm().transact( + () -> + tm().putAll( + ImmutableList.of( + externalMonitoringRegistrar.asBuilder().setState(State.ACTIVE).build(), + testRegistrar.asBuilder().setState(State.ACTIVE).build(), + monitoringRegistrar.asBuilder().setState(State.ACTIVE).build()))); + + createTld("pal"); + createTld("fun"); + createTld("cat"); + + // Also persists a "contact1234" contact in the process. + persistActiveDomain("hello.pal"); + persistActiveDomain("kitty.fun"); + // Should not appear anywhere. + persistActiveDomain("lol.cat"); + persistDeletedDomain("deleted.pal", DateTime.parse("1999-12-30TZ")); + + persistActiveContact("contact456"); + + HostResource host = persistEppResource(newHostResource("old.host.test")); + // Set the clock to 2000-01-02, the updated host should NOT show up in RDE. + clock.advanceBy(Duration.standardDays(2)); + persistEppResource(host.asBuilder().setHostName("new.host.test").build()); + + options.setPendings(encodePendings(PENDINGS)); + // The EPP resources created in tests do not have all the fields populated, using a STRICT + // validation mode will result in a lot of warnings during marshalling. + options.setValidationMode("LENIENT"); + rdePipeline = new RdePipeline(options); + } + + @AfterEach + void afterEach() { + setTm(originalTm); + } + + @Test + void testSuccess_encodeAndDecodePendingsMap() throws Exception { + String encodedString = encodePendings(PENDINGS); + assertThat(decodePendings(encodedString)).isEqualTo(PENDINGS); + } + + @Test + void testSuccess_createFragments() { + PAssert.that( + rdePipeline + .createFragments(pipeline) + .apply("Group by PendingDeposit", GroupByKey.create())) + .satisfies( + kvs -> { + kvs.forEach( + kv -> { + // Registrar fragments. + assertThat( + getFragmentForType(kv, REGISTRAR) + .map(getXmlElement(REGISTRAR_NAME_PATTERN)) + .collect(toImmutableSet())) + // The same registrars are attached to all the pending deposits. + .containsExactly("New Registrar", "The Registrar", "external_monitoring"); + // Domain fragments. + assertThat( + getFragmentForType(kv, DOMAIN) + .map(getXmlElement(DOMAIN_NAME_PATTERN)) + .anyMatch( + domain -> + // Deleted domain should not be included + domain.equals("deleted.pal") + // Only domains on the pending deposit's tld should + // appear. + || !kv.getKey() + .tld() + .equals( + Iterables.get( + Splitter.on('.').split(domain), 1)))) + .isFalse(); + if (kv.getKey().mode().equals(FULL)) { + // Contact fragments. + assertThat( + getFragmentForType(kv, CONTACT) + .map(getXmlElement(CONTACT_ID_PATTERN)) + .collect(toImmutableSet())) + // The same contacts are attached too all pending deposits. + .containsExactly("contact1234", "contact456"); + // Host fragments. + assertThat( + getFragmentForType(kv, HOST) + .map(getXmlElement(HOST_NAME_PATTERN)) + .collect(toImmutableSet())) + // Should load the resource before update. + .containsExactly("old.host.test"); + } else { + // BRDA does not contain contact or hosts. + assertThat( + Streams.stream(kv.getValue()) + .anyMatch( + fragment -> + fragment.type().equals(CONTACT) + || fragment.type().equals(HOST))) + .isFalse(); + } + }); + return null; + }); + pipeline.run().waitUntilFinish(); + } + + private static Function getXmlElement(String pattern) { + return (fragment) -> { + Matcher matcher = Pattern.compile(pattern).matcher(fragment.xml()); + checkState(matcher.find(), "Missing %s in xml.", pattern); + return matcher.group(1); + }; + } + + private static Stream getFragmentForType( + KV> kv, RdeResourceType type) { + return Streams.stream(kv.getValue()).filter(fragment -> fragment.type().equals(type)); + } +} diff --git a/core/src/test/java/google/registry/flows/EppPointInTimeTest.java b/core/src/test/java/google/registry/flows/EppPointInTimeTest.java index 81a7438cb..31b087746 100644 --- a/core/src/test/java/google/registry/flows/EppPointInTimeTest.java +++ b/core/src/test/java/google/registry/flows/EppPointInTimeTest.java @@ -145,7 +145,7 @@ class EppPointInTimeTest { tm().clearSessionCache(); assertAboutImmutableObjects() .that(loadAtPointInTime(latest, timeAtCreate.plusDays(1))) - .hasFieldsEqualTo(domainAfterCreate); + .isEqualExceptFields(domainAfterCreate, "updateTimestamp"); tm().clearSessionCache(); if (tm().isOfy()) { @@ -159,18 +159,18 @@ class EppPointInTimeTest { // second update occurred one millisecond later. assertAboutImmutableObjects() .that(loadAtPointInTime(latest, timeAtFirstUpdate)) - .hasFieldsEqualTo(domainAfterFirstUpdate); + .isEqualExceptFields(domainAfterFirstUpdate, "updateTimestamp"); } tm().clearSessionCache(); assertAboutImmutableObjects() .that(loadAtPointInTime(latest, timeAtSecondUpdate)) - .hasFieldsEqualTo(domainAfterSecondUpdate); + .isEqualExceptFields(domainAfterSecondUpdate, "updateTimestamp"); tm().clearSessionCache(); assertAboutImmutableObjects() .that(loadAtPointInTime(latest, timeAtSecondUpdate.plusDays(1))) - .hasFieldsEqualTo(domainAfterSecondUpdate); + .isEqualExceptFields(domainAfterSecondUpdate, "updateTimestamp"); // Deletion time has millisecond granularity due to isActive() check. tm().clearSessionCache(); diff --git a/core/src/test/java/google/registry/model/contact/ContactResourceTest.java b/core/src/test/java/google/registry/model/contact/ContactResourceTest.java index ebe10f94e..db34f5ea6 100644 --- a/core/src/test/java/google/registry/model/contact/ContactResourceTest.java +++ b/core/src/test/java/google/registry/model/contact/ContactResourceTest.java @@ -122,6 +122,13 @@ public class ContactResourceTest extends EntityTestCase { contactResource = persistResource(cloneAndSetAutoTimestamps(originalContact)); } + @Test + void testContactBaseToContactResource() { + ImmutableObjectSubject.assertAboutImmutableObjects() + .that(new ContactResource.Builder().copyFrom(contactResource).build()) + .isEqualExceptFields(contactResource, "updateTimestamp", "revisions"); + } + @Test void testCloudSqlPersistence_failWhenViolateForeignKeyConstraint() { assertThrowForeignKeyViolation(() -> jpaTm().transact(() -> jpaTm().insert(originalContact))); diff --git a/core/src/test/java/google/registry/model/domain/DomainBaseTest.java b/core/src/test/java/google/registry/model/domain/DomainBaseTest.java index 3ee6faa20..05b81ae8d 100644 --- a/core/src/test/java/google/registry/model/domain/DomainBaseTest.java +++ b/core/src/test/java/google/registry/model/domain/DomainBaseTest.java @@ -39,6 +39,7 @@ import com.google.common.collect.Streams; import com.googlecode.objectify.Key; import google.registry.model.EntityTestCase; import google.registry.model.ImmutableObject; +import google.registry.model.ImmutableObjectSubject; import google.registry.model.billing.BillingEvent; import google.registry.model.billing.BillingEvent.Reason; import google.registry.model.contact.ContactResource; @@ -171,6 +172,13 @@ public class DomainBaseTest extends EntityTestCase { .build())); } + @Test + void testDomainContentToDomainBase() { + ImmutableObjectSubject.assertAboutImmutableObjects() + .that(new DomainBase.Builder().copyFrom(domain).build()) + .isEqualExceptFields(domain, "updateTimestamp", "revisions"); + } + @Test void testPersistence() { // Note that this only verifies that the value stored under the foreign key is the same as that diff --git a/core/src/test/java/google/registry/model/host/HostResourceTest.java b/core/src/test/java/google/registry/model/host/HostResourceTest.java index f902eeae2..e9ec70cd2 100644 --- a/core/src/test/java/google/registry/model/host/HostResourceTest.java +++ b/core/src/test/java/google/registry/model/host/HostResourceTest.java @@ -31,6 +31,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.net.InetAddresses; import google.registry.model.EntityTestCase; +import google.registry.model.ImmutableObjectSubject; import google.registry.model.domain.DomainBase; import google.registry.model.eppcommon.StatusValue; import google.registry.model.eppcommon.Trid; @@ -89,6 +90,13 @@ class HostResourceTest extends EntityTestCase { .build())); } + @TestOfyAndSql + void testHostBaseToHostResource() { + ImmutableObjectSubject.assertAboutImmutableObjects() + .that(new HostResource.Builder().copyFrom(host).build()) + .isEqualExceptFields(host, "updateTimestamp", "revisions"); + } + @TestOfyAndSql void testPersistence() { HostResource newHost = host.asBuilder().setRepoId("NEWHOST").build(); diff --git a/core/src/test/java/google/registry/testing/DatabaseHelper.java b/core/src/test/java/google/registry/testing/DatabaseHelper.java index 4e68e9840..8bf82685b 100644 --- a/core/src/test/java/google/registry/testing/DatabaseHelper.java +++ b/core/src/test/java/google/registry/testing/DatabaseHelper.java @@ -1085,8 +1085,7 @@ public class DatabaseHelper { .setClientId(resource.getCreationClientId()) .setType(getHistoryEntryType(resource)) .setModificationTime(tm().getTransactionTime()) - .build() - .toChildHistoryEntity()); + .build()); ofyTmOrDoNothing( () -> tm().put(ForeignKeyIndex.create(resource, resource.getDeletionTime()))); }); diff --git a/core/src/test/java/google/registry/testing/DatastoreEntityExtension.java b/core/src/test/java/google/registry/testing/DatastoreEntityExtension.java index cf61d703e..f7fd131a0 100644 --- a/core/src/test/java/google/registry/testing/DatastoreEntityExtension.java +++ b/core/src/test/java/google/registry/testing/DatastoreEntityExtension.java @@ -18,6 +18,8 @@ import static google.registry.model.ofy.ObjectifyService.auditedOfy; import com.google.apphosting.api.ApiProxy; import com.google.apphosting.api.ApiProxy.Environment; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.util.Map; import org.junit.jupiter.api.extension.AfterEachCallback; import org.junit.jupiter.api.extension.BeforeEachCallback; @@ -25,7 +27,7 @@ import org.junit.jupiter.api.extension.ExtensionContext; import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; /** - * Allows instantiation of Datastore {@code Entity entities} without the heavyweight {@code + * Allows instantiation of Datastore {@code Entity}s without the heavyweight {@link * AppEngineExtension}. * *

When an Ofy key is created, by calling the various Key.create() methods, whether the current @@ -44,6 +46,27 @@ public class DatastoreEntityExtension implements BeforeEachCallback, AfterEachCa private static final Environment PLACEHOLDER_ENV = new PlaceholderEnvironment(); + private boolean allThreads = false; + + /** + * Whether all threads should be masqueraded as GAE threads. + * + *

This is particularly useful when new threads are spawned during a test. For example when + * testing Beam pipelines, the test pipeline runs the transforms in separate threads than the test + * thread. If Ofy keys are created in transforms, this value needs to be set to true. + * + *

Warning: by setting this value to true, any thread spawned by the current JVM will be have + * the thread local property set to the placeholder value during the execution of the current + * test, including those running other tests in parallel. This may or may not cause an issue when + * other tests have {@link AppEngineExtension} registered, that creates a much more fully + * functional GAE test environment. Consider moving tests using this extension to {@code + * outcastTest} if necessary. + */ + public DatastoreEntityExtension allThreads(boolean value) { + allThreads = value; + return this; + } + @Override public void beforeEach(ExtensionContext context) { ApiProxy.setEnvironmentForCurrentThread(PLACEHOLDER_ENV); @@ -51,12 +74,21 @@ public class DatastoreEntityExtension implements BeforeEachCallback, AfterEachCa // will load the ObjectifyService class, whose static initialization block registers all Ofy // entities. auditedOfy(); + if (allThreads) { + ApiProxy.setEnvironmentFactory(() -> PLACEHOLDER_ENV); + } } @Override - public void afterEach(ExtensionContext context) { + public void afterEach(ExtensionContext context) + throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { // Clear the cached instance. - ApiProxy.setEnvironmentForCurrentThread(null); + ApiProxy.clearEnvironmentForCurrentThread(); + if (allThreads) { + Method method = ApiProxy.class.getDeclaredMethod("clearEnvironmentFactory"); + method.setAccessible(true); + method.invoke(null); + } } private static final class PlaceholderEnvironment implements Environment { diff --git a/release/cloudbuild-nomulus.yaml b/release/cloudbuild-nomulus.yaml index 5bf60e177..62b00a24f 100644 --- a/release/cloudbuild-nomulus.yaml +++ b/release/cloudbuild-nomulus.yaml @@ -93,7 +93,9 @@ steps: google.registry.beam.spec11.Spec11Pipeline \ google/registry/beam/spec11_pipeline_metadata.json \ google.registry.beam.invoicing.InvoicingPipeline \ - google/registry/beam/invoicing_pipeline_metadata.json + google/registry/beam/invoicing_pipeline_metadata.json \ + google.registry.beam.rde.RdePipeline \ + google/registry/beam/rde_pipeline_metadata.json # Tentatively build and publish Cloud SQL schema jar here, before schema release # process is finalized. Also publish nomulus:core jars that are needed for # server/schema compatibility tests.