Add a Beam pipeline to generate RDE deposit (part 1) (#1219)

This is the first part of the RdeStagingAction SQL migration where the
mapper logic is implemented in Beam.

A few helper methods are added to convert the DomainContent, HostBase
and ContactBase to their respective terminal child classes. This is
necessary and possible because the child classes do not have extra
fields and the base classes exist only to be embedded to other entities
(such as the various HistoryEntry entities). The conversion is necessary
because most of our code expects the terminal classes, such as the
RdeMarshaller's various marshallXXX() methods. The alternative would be
to change all the call sites, which seems to be much more disruptive.

Unfortunately there is is no good way to do this conversion than just
creating a builder and setting every fields there is.

<!-- Reviewable:start -->
---
This change is [<img src="https://reviewable.io/review_button.svg" height="34" align="absmiddle" alt="Reviewable"/>](https://reviewable.io/reviews/google/nomulus/1219)
<!-- Reviewable:end -->
This commit is contained in:
Lai Jiang 2021-06-30 13:54:24 -04:00 committed by GitHub
parent bfe720ba1b
commit b068e459c2
23 changed files with 887 additions and 115 deletions

View file

@ -780,6 +780,10 @@ if (environment in ['alpha', 'crash']) {
mainClass: 'google.registry.beam.invoicing.InvoicingPipeline',
metaData: 'google/registry/beam/invoicing_pipeline_metadata.json'
],
[
mainClass: 'google.registry.beam.rde.RdePipeline',
metaData: 'google/registry/beam/rde_pipeline_metadata.json'
],
]
project.tasks.create("stage_beam_pipelines") {
doLast {

View file

@ -0,0 +1,260 @@
// Copyright 2021 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.beam.rde;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static google.registry.model.EppResourceUtils.loadAtPointInTimeAsync;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.BaseEncoding;
import google.registry.beam.common.RegistryJpaIO;
import google.registry.model.EppResource;
import google.registry.model.contact.ContactResource;
import google.registry.model.domain.DomainBase;
import google.registry.model.host.HostResource;
import google.registry.model.rde.RdeMode;
import google.registry.model.registrar.Registrar;
import google.registry.model.registrar.Registrar.Type;
import google.registry.persistence.VKey;
import google.registry.rde.DepositFragment;
import google.registry.rde.PendingDeposit;
import google.registry.rde.PendingDeposit.PendingDepositCoder;
import google.registry.rde.RdeFragmenter;
import google.registry.rde.RdeMarshaller;
import google.registry.xml.ValidationMode;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;
import javax.persistence.Entity;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.DateTime;
/**
* Definition of a Dataflow Flex template, which generates RDE/BRDA deposits.
*
* <p>To stage this template locally, run the {@code stage_beam_pipeline.sh} shell script.
*
* <p>Then, you can run the staged template via the API client library, gCloud or a raw REST call.
*
* @see <a href="https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates">Using
* Flex Templates</a>
*/
public class RdePipeline implements Serializable {
private final RdeMarshaller marshaller;
private final ImmutableSetMultimap<String, PendingDeposit> pendings;
// Registrars to be excluded from data escrow. Not including the sandbox-only OTE type so that
// if sneaks into production we would get an extra signal.
private static final ImmutableSet<Type> IGNORED_REGISTRAR_TYPES =
Sets.immutableEnumSet(Registrar.Type.MONITORING, Registrar.Type.TEST);
private static final String EPP_RESOURCE_QUERY =
"SELECT id FROM %entity% "
+ "WHERE COALESCE(creationClientId, '') NOT LIKE 'prober-%' "
+ "AND COALESCE(currentSponsorClientId, '') NOT LIKE 'prober-%' "
+ "AND COALESCE(lastEppUpdateClientId, '') NOT LIKE 'prober-%'";
public static String createEppResourceQuery(Class<? extends EppResource> clazz) {
return EPP_RESOURCE_QUERY.replace("%entity%", clazz.getAnnotation(Entity.class).name())
+ (clazz.equals(DomainBase.class) ? " AND tld in (:tlds)" : "");
}
RdePipeline(RdePipelineOptions options) throws IOException, ClassNotFoundException {
this.marshaller = new RdeMarshaller(ValidationMode.valueOf(options.getValidationMode()));
this.pendings = decodePendings(options.getPendings());
}
@VisibleForTesting
PipelineResult run(Pipeline pipeline) {
createFragments(pipeline);
return pipeline.run();
}
PipelineResult run() {
return run(Pipeline.create());
}
PCollection<KV<PendingDeposit, DepositFragment>> createFragments(Pipeline pipeline) {
PCollection<KV<PendingDeposit, DepositFragment>> fragments =
PCollectionList.of(processRegistrars(pipeline))
.and(processNonRegistrarEntities(pipeline, DomainBase.class))
.and(processNonRegistrarEntities(pipeline, ContactResource.class))
.and(processNonRegistrarEntities(pipeline, HostResource.class))
.apply(Flatten.pCollections())
.setCoder(
KvCoder.of(PendingDepositCoder.of(), SerializableCoder.of(DepositFragment.class)));
return fragments;
}
PCollection<KV<PendingDeposit, DepositFragment>> processRegistrars(Pipeline pipeline) {
return pipeline
.apply(
"Read all production Registrar entities",
RegistryJpaIO.read(
"SELECT clientIdentifier FROM Registrar WHERE type NOT IN (:types)",
ImmutableMap.of("types", IGNORED_REGISTRAR_TYPES),
String.class,
// TODO: consider adding coders for entities and pass them directly instead of using
// VKeys.
id -> VKey.createSql(Registrar.class, id)))
.apply(
"Marshal Registrar into DepositFragment",
FlatMapElements.into(
TypeDescriptors.kvs(
TypeDescriptor.of(PendingDeposit.class),
TypeDescriptor.of(DepositFragment.class)))
.via(
(VKey<Registrar> key) -> {
Registrar registrar = jpaTm().transact(() -> jpaTm().loadByKey(key));
DepositFragment fragment = marshaller.marshalRegistrar(registrar);
return pendings.values().stream()
.map(pending -> KV.of(pending, fragment))
.collect(toImmutableSet());
}));
}
@SuppressWarnings("deprecation") // Reshuffle is still recommended by Dataflow.
<T extends EppResource>
PCollection<KV<PendingDeposit, DepositFragment>> processNonRegistrarEntities(
Pipeline pipeline, Class<T> clazz) {
return createInputs(pipeline, clazz)
.apply("Marshal " + clazz.getSimpleName() + " into DepositFragment", mapToFragments(clazz))
.setCoder(KvCoder.of(PendingDepositCoder.of(), SerializableCoder.of(DepositFragment.class)))
.apply(
"Reshuffle KV<PendingDeposit, DepositFragment> of "
+ clazz.getSimpleName()
+ " to prevent fusion",
Reshuffle.of());
}
<T extends EppResource> PCollection<VKey<T>> createInputs(Pipeline pipeline, Class<T> clazz) {
return pipeline.apply(
"Read all production " + clazz.getSimpleName() + " entities",
RegistryJpaIO.read(
createEppResourceQuery(clazz),
clazz.equals(DomainBase.class)
? ImmutableMap.of("tlds", pendings.keySet())
: ImmutableMap.of(),
String.class,
// TODO: consider adding coders for entities and pass them directly instead of using
// VKeys.
x -> VKey.create(clazz, x)));
}
<T extends EppResource>
FlatMapElements<VKey<T>, KV<PendingDeposit, DepositFragment>> mapToFragments(Class<T> clazz) {
return FlatMapElements.into(
TypeDescriptors.kvs(
TypeDescriptor.of(PendingDeposit.class), TypeDescriptor.of(DepositFragment.class)))
.via(
(VKey<T> key) -> {
T resource = jpaTm().transact(() -> jpaTm().loadByKey(key));
// The set of all TLDs to which this resource should be emitted.
ImmutableSet<String> tlds =
clazz.equals(DomainBase.class)
? ImmutableSet.of(((DomainBase) resource).getTld())
: pendings.keySet();
// Get the set of all point-in-time watermarks we need, to minimize rewinding.
ImmutableSet<DateTime> dates =
tlds.stream()
.map(pendings::get)
.flatMap(ImmutableSet::stream)
.map(PendingDeposit::watermark)
.collect(toImmutableSet());
// Launch asynchronous fetches of point-in-time representations of resource.
ImmutableMap<DateTime, Supplier<EppResource>> resourceAtTimes =
ImmutableMap.copyOf(
Maps.asMap(dates, input -> loadAtPointInTimeAsync(resource, input)));
// Convert resource to an XML fragment for each watermark/mode pair lazily and cache
// the result.
RdeFragmenter fragmenter = new RdeFragmenter(resourceAtTimes, marshaller);
List<KV<PendingDeposit, DepositFragment>> results = new ArrayList<>();
for (String tld : tlds) {
for (PendingDeposit pending : pendings.get(tld)) {
// Hosts and contacts don't get included in BRDA deposits.
if (pending.mode() == RdeMode.THIN && !clazz.equals(DomainBase.class)) {
continue;
}
Optional<DepositFragment> fragment =
fragmenter.marshal(pending.watermark(), pending.mode());
fragment.ifPresent(
depositFragment -> results.add(KV.of(pending, depositFragment)));
}
}
return results;
});
}
/**
* Decodes the pipeline option extracted from the URL parameter sent by the pipeline launcher to
* the original TLD to pending deposit map.
*/
@SuppressWarnings("unchecked")
static ImmutableSetMultimap<String, PendingDeposit> decodePendings(String encodedPending)
throws IOException, ClassNotFoundException {
try (ObjectInputStream ois =
new ObjectInputStream(
new ByteArrayInputStream(
BaseEncoding.base64Url().omitPadding().decode(encodedPending)))) {
return (ImmutableSetMultimap<String, PendingDeposit>) ois.readObject();
}
}
/**
* Encodes the TLD to pending deposit map in an URL safe string that is sent to the pipeline
* worker by the pipeline launcher as a pipeline option.
*/
public static String encodePendings(ImmutableSetMultimap<String, PendingDeposit> pendings)
throws IOException {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(pendings);
oos.flush();
return BaseEncoding.base64Url().omitPadding().encode(baos.toByteArray());
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException {
PipelineOptionsFactory.register(RdePipelineOptions.class);
RdePipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(RdePipelineOptions.class);
new RdePipeline(options).run();
}
}

View file

@ -0,0 +1,32 @@
// Copyright 2021 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.beam.rde;
import google.registry.beam.common.RegistryPipelineOptions;
import org.apache.beam.sdk.options.Description;
/** Custom options for running the spec11 pipeline. */
public interface RdePipelineOptions extends RegistryPipelineOptions {
@Description("The Base64-encoded serialized map of TLDs to PendingDeposit.")
String getPendings();
void setPendings(String value);
@Description("The validation mode (LENIENT|STRICT) that the RDE marshaller uses.")
String getValidationMode();
void setValidationMode(String value);
}

View file

@ -398,7 +398,7 @@ public final class EppResourceUtils {
.orElse(null);
if (resourceAtPointInTime == null) {
logger.atSevere().log(
"Couldn't load resource at % for key %s, falling back to resource %s.",
"Couldn't load resource at %s for key %s, falling back to resource %s.",
timestamp, resource.createVKey(), resource);
return resource;
}

View file

@ -110,7 +110,8 @@ public class ContactHistory extends HistoryEntry implements SqlEntity {
@Override
public Optional<? extends EppResource> getResourceAtPointInTime() {
return getContactBase();
return getContactBase()
.map(contactBase -> new ContactResource.Builder().copyFrom(contactBase).build());
}
@PostLoad

View file

@ -79,5 +79,26 @@ public class ContactResource extends ContactBase
private Builder(ContactResource instance) {
super(instance);
}
public Builder copyFrom(ContactBase contactBase) {
return this.setAuthInfo(contactBase.getAuthInfo())
.setContactId(contactBase.getContactId())
.setCreationClientId(contactBase.getCreationClientId())
.setCreationTime(contactBase.getCreationTime())
.setDeletionTime(contactBase.getDeletionTime())
.setDisclose(contactBase.getDisclose())
.setEmailAddress(contactBase.getEmailAddress())
.setFaxNumber(contactBase.getFaxNumber())
.setInternationalizedPostalInfo(contactBase.getInternationalizedPostalInfo())
.setLastTransferTime(contactBase.getLastTransferTime())
.setLastEppUpdateClientId(contactBase.getLastEppUpdateClientId())
.setLastEppUpdateTime(contactBase.getLastEppUpdateTime())
.setLocalizedPostalInfo(contactBase.getLocalizedPostalInfo())
.setPersistedCurrentSponsorClientId(contactBase.getPersistedCurrentSponsorClientId())
.setRepoId(contactBase.getRepoId())
.setStatusValues(contactBase.getStatusValues())
.setTransferData(contactBase.getTransferData())
.setVoiceNumber(contactBase.getVoiceNumber());
}
}
}

View file

@ -181,5 +181,34 @@ public class DomainBase extends DomainContent
Builder(DomainBase instance) {
super(instance);
}
public Builder copyFrom(DomainContent domainContent) {
return this.setAuthInfo(domainContent.getAuthInfo())
.setAutorenewPollMessage(domainContent.getAutorenewPollMessage())
.setAutorenewBillingEvent(domainContent.getAutorenewBillingEvent())
.setAutorenewEndTime(domainContent.getAutorenewEndTime())
.setContacts(domainContent.getContacts())
.setCreationClientId(domainContent.getCreationClientId())
.setCreationTime(domainContent.getCreationTime())
.setDomainName(domainContent.getDomainName())
.setDeletePollMessage(domainContent.getDeletePollMessage())
.setDsData(domainContent.getDsData())
.setDeletionTime(domainContent.getDeletionTime())
.setGracePeriods(domainContent.getGracePeriods())
.setIdnTableName(domainContent.getIdnTableName())
.setLastTransferTime(domainContent.getLastTransferTime())
.setLaunchNotice(domainContent.getLaunchNotice())
.setLastEppUpdateClientId(domainContent.getLastEppUpdateClientId())
.setLastEppUpdateTime(domainContent.getLastEppUpdateTime())
.setNameservers(domainContent.getNameservers())
.setPersistedCurrentSponsorClientId(domainContent.getPersistedCurrentSponsorClientId())
.setRegistrant(domainContent.getRegistrant())
.setRegistrationExpirationTime(domainContent.getRegistrationExpirationTime())
.setRepoId(domainContent.getRepoId())
.setSmdId(domainContent.getSmdId())
.setSubordinateHosts(domainContent.getSubordinateHosts())
.setStatusValues(domainContent.getStatusValues())
.setTransferData(domainContent.getTransferData());
}
}
}

View file

@ -252,11 +252,18 @@ public class DomainHistory extends HistoryEntry implements SqlEntity {
@Override
public Optional<? extends EppResource> getResourceAtPointInTime() {
return getDomainContent();
return getDomainContent()
.map(domainContent -> new DomainBase.Builder().copyFrom(domainContent).build());
}
@PostLoad
void postLoad() {
// TODO(b/188044616): Determine why Eager loading doesn't work here.
Hibernate.initialize(domainTransactionRecords);
Hibernate.initialize(nsHosts);
Hibernate.initialize(dsDataHistories);
Hibernate.initialize(gracePeriodHistories);
if (domainContent != null) {
domainContent.nsHosts = nullToEmptyImmutableCopy(nsHosts);
domainContent.gracePeriods =
@ -278,12 +285,6 @@ public class DomainHistory extends HistoryEntry implements SqlEntity {
}
}
}
// TODO(b/188044616): Determine why Eager loading doesn't work here.
Hibernate.initialize(domainTransactionRecords);
Hibernate.initialize(nsHosts);
Hibernate.initialize(dsDataHistories);
Hibernate.initialize(gracePeriodHistories);
}
// In Datastore, save as a HistoryEntry object regardless of this object's type

View file

@ -137,7 +137,7 @@ public class HostBase extends EppResource {
}
@Override
public Builder asBuilder() {
public Builder<? extends HostBase, ?> asBuilder() {
return new Builder<>(clone(this));
}

View file

@ -111,7 +111,7 @@ public class HostHistory extends HistoryEntry implements SqlEntity {
@Override
public Optional<? extends EppResource> getResourceAtPointInTime() {
return getHostBase();
return getHostBase().map(hostBase -> new HostResource.Builder().copyFrom(hostBase).build());
}
@PostLoad

View file

@ -63,5 +63,21 @@ public class HostResource extends HostBase
private Builder(HostResource instance) {
super(instance);
}
public Builder copyFrom(HostBase hostBase) {
return this.setCreationClientId(hostBase.getCreationClientId())
.setCreationTime(hostBase.getCreationTime())
.setDeletionTime(hostBase.getDeletionTime())
.setHostName(hostBase.getHostName())
.setInetAddresses(hostBase.getInetAddresses())
.setLastTransferTime(hostBase.getLastTransferTime())
.setLastSuperordinateChange(hostBase.getLastSuperordinateChange())
.setLastEppUpdateClientId(hostBase.getLastEppUpdateClientId())
.setLastEppUpdateTime(hostBase.getLastEppUpdateTime())
.setPersistedCurrentSponsorClientId(hostBase.getPersistedCurrentSponsorClientId())
.setRepoId(hostBase.getRepoId())
.setSuperordinateDomain(hostBase.getSuperordinateDomain())
.setStatusValues(hostBase.getStatusValues());
}
}
}

View file

@ -17,8 +17,17 @@ package google.registry.rde;
import com.google.auto.value.AutoValue;
import google.registry.model.common.Cursor.CursorType;
import google.registry.model.rde.RdeMode;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.BooleanCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.joda.time.DateTime;
import org.joda.time.Duration;
@ -69,17 +78,9 @@ public abstract class PendingDeposit implements Serializable {
@Nullable
public abstract Integer revision();
static PendingDeposit create(
public static PendingDeposit create(
String tld, DateTime watermark, RdeMode mode, CursorType cursor, Duration interval) {
return new AutoValue_PendingDeposit(
false,
tld,
watermark,
mode,
cursor,
interval,
null,
null);
return new AutoValue_PendingDeposit(false, tld, watermark, mode, cursor, interval, null, null);
}
static PendingDeposit createInManualOperation(
@ -89,15 +90,52 @@ public abstract class PendingDeposit implements Serializable {
String directoryWithTrailingSlash,
@Nullable Integer revision) {
return new AutoValue_PendingDeposit(
true,
tld,
watermark,
mode,
null,
null,
directoryWithTrailingSlash,
revision);
true, tld, watermark, mode, null, null, directoryWithTrailingSlash, revision);
}
PendingDeposit() {}
/**
* A deterministic coder for {@link PendingDeposit} used during a GroupBy transform.
*
* <p>We cannot use a {@link SerializableCoder} directly because it does not guarantee
* determinism, which is required by GroupBy.
*/
public static class PendingDepositCoder extends AtomicCoder<PendingDeposit> {
private PendingDepositCoder() {
super();
}
private static final PendingDepositCoder INSTANCE = new PendingDepositCoder();
public static PendingDepositCoder of() {
return INSTANCE;
}
@Override
public void encode(PendingDeposit value, OutputStream outStream) throws IOException {
BooleanCoder.of().encode(value.manual(), outStream);
StringUtf8Coder.of().encode(value.tld(), outStream);
SerializableCoder.of(DateTime.class).encode(value.watermark(), outStream);
SerializableCoder.of(RdeMode.class).encode(value.mode(), outStream);
NullableCoder.of(SerializableCoder.of(CursorType.class)).encode(value.cursor(), outStream);
NullableCoder.of(SerializableCoder.of(Duration.class)).encode(value.interval(), outStream);
NullableCoder.of(StringUtf8Coder.of()).encode(value.directoryWithTrailingSlash(), outStream);
NullableCoder.of(VarIntCoder.of()).encode(value.revision(), outStream);
}
@Override
public PendingDeposit decode(InputStream inStream) throws IOException {
return new AutoValue_PendingDeposit(
BooleanCoder.of().decode(inStream),
StringUtf8Coder.of().decode(inStream),
SerializableCoder.of(DateTime.class).decode(inStream),
SerializableCoder.of(RdeMode.class).decode(inStream),
NullableCoder.of(SerializableCoder.of(CursorType.class)).decode(inStream),
NullableCoder.of(SerializableCoder.of(Duration.class)).decode(inStream),
NullableCoder.of(StringUtf8Coder.of()).decode(inStream),
NullableCoder.of(VarIntCoder.of()).decode(inStream));
}
}
}

View file

@ -0,0 +1,109 @@
// Copyright 2021 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.rde;
import static google.registry.model.EppResourceUtils.loadAtPointInTime;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableMap;
import google.registry.model.EppResource;
import google.registry.model.contact.ContactResource;
import google.registry.model.domain.DomainBase;
import google.registry.model.host.HostResource;
import google.registry.model.rde.RdeMode;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
import org.joda.time.DateTime;
/** Loading cache that turns a resource into XML for the various points in time and modes. */
public class RdeFragmenter {
private final Map<WatermarkModePair, Optional<DepositFragment>> cache = new HashMap<>();
private final ImmutableMap<DateTime, Supplier<EppResource>> resourceAtTimes;
private final RdeMarshaller marshaller;
long cacheHits = 0;
long resourcesNotFound = 0;
long resourcesFound = 0;
public RdeFragmenter(
ImmutableMap<DateTime, Supplier<EppResource>> resourceAtTimes, RdeMarshaller marshaller) {
this.resourceAtTimes = resourceAtTimes;
this.marshaller = marshaller;
}
public Optional<DepositFragment> marshal(DateTime watermark, RdeMode mode) {
Optional<DepositFragment> result = cache.get(WatermarkModePair.create(watermark, mode));
if (result != null) {
cacheHits++;
return result;
}
EppResource resource = resourceAtTimes.get(watermark).get();
if (resource == null) {
result = Optional.empty();
cache.put(WatermarkModePair.create(watermark, RdeMode.FULL), result);
cache.put(WatermarkModePair.create(watermark, RdeMode.THIN), result);
resourcesNotFound++;
return result;
}
resourcesFound++;
if (resource instanceof DomainBase) {
result = Optional.of(marshaller.marshalDomain((DomainBase) resource, mode));
cache.put(WatermarkModePair.create(watermark, mode), result);
return result;
} else if (resource instanceof ContactResource) {
result = Optional.of(marshaller.marshalContact((ContactResource) resource));
cache.put(WatermarkModePair.create(watermark, RdeMode.FULL), result);
cache.put(WatermarkModePair.create(watermark, RdeMode.THIN), result);
return result;
} else if (resource instanceof HostResource) {
HostResource host = (HostResource) resource;
result =
Optional.of(
host.isSubordinate()
? marshaller.marshalSubordinateHost(
host,
// Note that loadAtPointInTime() does cloneProjectedAtTime(watermark) for
// us.
Objects.requireNonNull(
loadAtPointInTime(
tm().loadByKey(host.getSuperordinateDomain()), watermark)))
: marshaller.marshalExternalHost(host));
cache.put(WatermarkModePair.create(watermark, RdeMode.FULL), result);
cache.put(WatermarkModePair.create(watermark, RdeMode.THIN), result);
return result;
} else {
throw new IllegalStateException(
String.format(
"Resource %s of type %s cannot be converted to XML.",
resource, resource.getClass().getSimpleName()));
}
}
/** Map key for {@link RdeFragmenter} cache. */
@AutoValue
abstract static class WatermarkModePair {
abstract DateTime watermark();
abstract RdeMode mode();
static WatermarkModePair create(DateTime watermark, RdeMode mode) {
return new AutoValue_RdeFragmenter_WatermarkModePair(watermark, mode);
}
}
}

View file

@ -16,12 +16,10 @@ package google.registry.rde;
import static com.google.common.base.Strings.nullToEmpty;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static google.registry.model.EppResourceUtils.loadAtPointInTime;
import static google.registry.model.EppResourceUtils.loadAtPointInTimeAsync;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import com.google.appengine.tools.mapreduce.Mapper;
import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSetMultimap;
@ -34,9 +32,6 @@ import google.registry.model.host.HostResource;
import google.registry.model.rde.RdeMode;
import google.registry.model.registrar.Registrar;
import google.registry.xml.ValidationMode;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
import org.joda.time.DateTime;
@ -129,7 +124,7 @@ public final class RdeStagingMapper extends Mapper<EppResource, PendingDeposit,
ImmutableMap.copyOf(Maps.asMap(dates, input -> loadAtPointInTimeAsync(resource, input)));
// Convert resource to an XML fragment for each watermark/mode pair lazily and cache the result.
Fragmenter fragmenter = new Fragmenter(resourceAtTimes);
RdeFragmenter fragmenter = new RdeFragmenter(resourceAtTimes, marshaller);
// Emit resource as an XML fragment for all TLDs and modes pending deposit.
long resourcesEmitted = 0;
@ -157,74 +152,4 @@ public final class RdeStagingMapper extends Mapper<EppResource, PendingDeposit,
// Avoid running out of memory.
tm().clearSessionCache();
}
/** Loading cache that turns a resource into XML for the various points in time and modes. */
private class Fragmenter {
private final Map<WatermarkModePair, Optional<DepositFragment>> cache = new HashMap<>();
private final ImmutableMap<DateTime, Supplier<EppResource>> resourceAtTimes;
long cacheHits = 0;
long resourcesNotFound = 0;
long resourcesFound = 0;
Fragmenter(ImmutableMap<DateTime, Supplier<EppResource>> resourceAtTimes) {
this.resourceAtTimes = resourceAtTimes;
}
Optional<DepositFragment> marshal(DateTime watermark, RdeMode mode) {
Optional<DepositFragment> result = cache.get(WatermarkModePair.create(watermark, mode));
if (result != null) {
cacheHits++;
return result;
}
EppResource resource = resourceAtTimes.get(watermark).get();
if (resource == null) {
result = Optional.empty();
cache.put(WatermarkModePair.create(watermark, RdeMode.FULL), result);
cache.put(WatermarkModePair.create(watermark, RdeMode.THIN), result);
resourcesNotFound++;
return result;
}
resourcesFound++;
if (resource instanceof DomainBase) {
result = Optional.of(marshaller.marshalDomain((DomainBase) resource, mode));
cache.put(WatermarkModePair.create(watermark, mode), result);
return result;
} else if (resource instanceof ContactResource) {
result = Optional.of(marshaller.marshalContact((ContactResource) resource));
cache.put(WatermarkModePair.create(watermark, RdeMode.FULL), result);
cache.put(WatermarkModePair.create(watermark, RdeMode.THIN), result);
return result;
} else if (resource instanceof HostResource) {
HostResource host = (HostResource) resource;
result =
Optional.of(
host.isSubordinate()
? marshaller.marshalSubordinateHost(
host,
// Note that loadAtPointInTime() does cloneProjectedAtTime(watermark) for
// us.
Objects.requireNonNull(
loadAtPointInTime(
tm().loadByKey(host.getSuperordinateDomain()), watermark)))
: marshaller.marshalExternalHost(host));
cache.put(WatermarkModePair.create(watermark, RdeMode.FULL), result);
cache.put(WatermarkModePair.create(watermark, RdeMode.THIN), result);
return result;
} else {
throw new AssertionError(resource.toString());
}
}
}
/** Map key for {@link Fragmenter} cache. */
@AutoValue
abstract static class WatermarkModePair {
abstract DateTime watermark();
abstract RdeMode mode();
static WatermarkModePair create(DateTime watermark, RdeMode mode) {
return new AutoValue_RdeStagingMapper_WatermarkModePair(watermark, mode);
}
}
}

View file

@ -0,0 +1,23 @@
{
"name": "RDE/BRDA Deposit Generation",
"description": "An Apache Beam pipeline generates RDE or BRDA deposits and deposits them to GCS with GhostRyde encryption.",
"parameters": [
{
"name": "pendings",
"label": "The pendings deposits to generate.",
"helpText": "A TLD to PendingDeposit map that is serialized and Base64 URL-safe encoded.",
"regexes": [
"A-Za-z0-9\\-_"
]
},
{
"name": "validationMode",
"label": "How strict the marshaller validates the given EPP resources.",
"helpText": "If set to LENIENT the marshaller will not warn about missing data on the EPP resources.",
"is_optional": true,
"regexes": [
"^STRICT|LENIENT$"
]
}
]
}

View file

@ -0,0 +1,257 @@
// Copyright 2021 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.beam.rde;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.truth.Truth.assertThat;
import static google.registry.beam.rde.RdePipeline.decodePendings;
import static google.registry.beam.rde.RdePipeline.encodePendings;
import static google.registry.model.common.Cursor.CursorType.RDE_STAGING;
import static google.registry.model.rde.RdeMode.FULL;
import static google.registry.model.rde.RdeMode.THIN;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import static google.registry.persistence.transaction.TransactionManagerFactory.setTm;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import static google.registry.rde.RdeResourceType.CONTACT;
import static google.registry.rde.RdeResourceType.DOMAIN;
import static google.registry.rde.RdeResourceType.HOST;
import static google.registry.rde.RdeResourceType.REGISTRAR;
import static google.registry.testing.AppEngineExtension.loadInitialData;
import static google.registry.testing.DatabaseHelper.createTld;
import static google.registry.testing.DatabaseHelper.newHostResource;
import static google.registry.testing.DatabaseHelper.persistActiveContact;
import static google.registry.testing.DatabaseHelper.persistActiveDomain;
import static google.registry.testing.DatabaseHelper.persistDeletedDomain;
import static google.registry.testing.DatabaseHelper.persistEppResource;
import static google.registry.testing.DatabaseHelper.persistNewRegistrar;
import static org.joda.time.Duration.standardDays;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Streams;
import google.registry.beam.TestPipelineExtension;
import google.registry.model.host.HostResource;
import google.registry.model.registrar.Registrar;
import google.registry.model.registrar.Registrar.State;
import google.registry.persistence.transaction.JpaTestRules;
import google.registry.persistence.transaction.JpaTestRules.JpaIntegrationTestExtension;
import google.registry.persistence.transaction.TransactionManager;
import google.registry.rde.DepositFragment;
import google.registry.rde.PendingDeposit;
import google.registry.rde.RdeResourceType;
import google.registry.testing.DatastoreEntityExtension;
import google.registry.testing.FakeClock;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.values.KV;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
/** Unit tests for {@link RdePipeline}. */
public class RdePipelineTest {
private static final String REGISTRAR_NAME_PATTERN =
"<rdeRegistrar:name>(.*)</rdeRegistrar:name>";
private static final String DOMAIN_NAME_PATTERN = "<rdeDomain:name>(.*)</rdeDomain:name>";
private static final String CONTACT_ID_PATTERN = "<rdeContact:id>(.*)</rdeContact:id>";
private static final String HOST_NAME_PATTERN = "<rdeHost:name>(.*)</rdeHost:name>";
private static final ImmutableSetMultimap<String, PendingDeposit> PENDINGS =
ImmutableSetMultimap.of(
"pal",
PendingDeposit.create(
"pal", DateTime.parse("2000-01-01TZ"), FULL, RDE_STAGING, standardDays(1)),
"pal",
PendingDeposit.create(
"pal", DateTime.parse("2000-01-01TZ"), THIN, RDE_STAGING, standardDays(1)),
"fun",
PendingDeposit.create(
"fun", DateTime.parse("2000-01-01TZ"), FULL, RDE_STAGING, standardDays(1)));
// This is the default creation time for test data.
private final FakeClock clock = new FakeClock(DateTime.parse("1999-12-31TZ"));
// The pipeline runs in a different thread, which needs to be masqueraded as a GAE thread as well.
@RegisterExtension
@Order(Order.DEFAULT - 1)
final DatastoreEntityExtension datastore = new DatastoreEntityExtension().allThreads(true);
@RegisterExtension
final JpaIntegrationTestExtension database =
new JpaTestRules.Builder().withClock(clock).buildIntegrationTestRule();
@RegisterExtension
final TestPipelineExtension pipeline =
TestPipelineExtension.create().enableAbandonedNodeEnforcement(true);
private final RdePipelineOptions options =
PipelineOptionsFactory.create().as(RdePipelineOptions.class);
private RdePipeline rdePipeline;
private TransactionManager originalTm;
@BeforeEach
void beforeEach() throws Exception {
originalTm = tm();
setTm(jpaTm());
loadInitialData();
// Two real registrars have been created by loadInitialData(), named "New Registrar" and "The
// Registrar". Create one included registrar (external_monitoring) and two excluded ones.
Registrar monitoringRegistrar =
persistNewRegistrar("monitoring", "monitoring", Registrar.Type.MONITORING, null);
Registrar testRegistrar = persistNewRegistrar("test", "test", Registrar.Type.TEST, null);
Registrar externalMonitoringRegistrar =
persistNewRegistrar(
"externalmonitor", "external_monitoring", Registrar.Type.EXTERNAL_MONITORING, 9997L);
// Set Registrar states which are required for reporting.
tm().transact(
() ->
tm().putAll(
ImmutableList.of(
externalMonitoringRegistrar.asBuilder().setState(State.ACTIVE).build(),
testRegistrar.asBuilder().setState(State.ACTIVE).build(),
monitoringRegistrar.asBuilder().setState(State.ACTIVE).build())));
createTld("pal");
createTld("fun");
createTld("cat");
// Also persists a "contact1234" contact in the process.
persistActiveDomain("hello.pal");
persistActiveDomain("kitty.fun");
// Should not appear anywhere.
persistActiveDomain("lol.cat");
persistDeletedDomain("deleted.pal", DateTime.parse("1999-12-30TZ"));
persistActiveContact("contact456");
HostResource host = persistEppResource(newHostResource("old.host.test"));
// Set the clock to 2000-01-02, the updated host should NOT show up in RDE.
clock.advanceBy(Duration.standardDays(2));
persistEppResource(host.asBuilder().setHostName("new.host.test").build());
options.setPendings(encodePendings(PENDINGS));
// The EPP resources created in tests do not have all the fields populated, using a STRICT
// validation mode will result in a lot of warnings during marshalling.
options.setValidationMode("LENIENT");
rdePipeline = new RdePipeline(options);
}
@AfterEach
void afterEach() {
setTm(originalTm);
}
@Test
void testSuccess_encodeAndDecodePendingsMap() throws Exception {
String encodedString = encodePendings(PENDINGS);
assertThat(decodePendings(encodedString)).isEqualTo(PENDINGS);
}
@Test
void testSuccess_createFragments() {
PAssert.that(
rdePipeline
.createFragments(pipeline)
.apply("Group by PendingDeposit", GroupByKey.create()))
.satisfies(
kvs -> {
kvs.forEach(
kv -> {
// Registrar fragments.
assertThat(
getFragmentForType(kv, REGISTRAR)
.map(getXmlElement(REGISTRAR_NAME_PATTERN))
.collect(toImmutableSet()))
// The same registrars are attached to all the pending deposits.
.containsExactly("New Registrar", "The Registrar", "external_monitoring");
// Domain fragments.
assertThat(
getFragmentForType(kv, DOMAIN)
.map(getXmlElement(DOMAIN_NAME_PATTERN))
.anyMatch(
domain ->
// Deleted domain should not be included
domain.equals("deleted.pal")
// Only domains on the pending deposit's tld should
// appear.
|| !kv.getKey()
.tld()
.equals(
Iterables.get(
Splitter.on('.').split(domain), 1))))
.isFalse();
if (kv.getKey().mode().equals(FULL)) {
// Contact fragments.
assertThat(
getFragmentForType(kv, CONTACT)
.map(getXmlElement(CONTACT_ID_PATTERN))
.collect(toImmutableSet()))
// The same contacts are attached too all pending deposits.
.containsExactly("contact1234", "contact456");
// Host fragments.
assertThat(
getFragmentForType(kv, HOST)
.map(getXmlElement(HOST_NAME_PATTERN))
.collect(toImmutableSet()))
// Should load the resource before update.
.containsExactly("old.host.test");
} else {
// BRDA does not contain contact or hosts.
assertThat(
Streams.stream(kv.getValue())
.anyMatch(
fragment ->
fragment.type().equals(CONTACT)
|| fragment.type().equals(HOST)))
.isFalse();
}
});
return null;
});
pipeline.run().waitUntilFinish();
}
private static Function<DepositFragment, String> getXmlElement(String pattern) {
return (fragment) -> {
Matcher matcher = Pattern.compile(pattern).matcher(fragment.xml());
checkState(matcher.find(), "Missing %s in xml.", pattern);
return matcher.group(1);
};
}
private static Stream<DepositFragment> getFragmentForType(
KV<PendingDeposit, Iterable<DepositFragment>> kv, RdeResourceType type) {
return Streams.stream(kv.getValue()).filter(fragment -> fragment.type().equals(type));
}
}

View file

@ -145,7 +145,7 @@ class EppPointInTimeTest {
tm().clearSessionCache();
assertAboutImmutableObjects()
.that(loadAtPointInTime(latest, timeAtCreate.plusDays(1)))
.hasFieldsEqualTo(domainAfterCreate);
.isEqualExceptFields(domainAfterCreate, "updateTimestamp");
tm().clearSessionCache();
if (tm().isOfy()) {
@ -159,18 +159,18 @@ class EppPointInTimeTest {
// second update occurred one millisecond later.
assertAboutImmutableObjects()
.that(loadAtPointInTime(latest, timeAtFirstUpdate))
.hasFieldsEqualTo(domainAfterFirstUpdate);
.isEqualExceptFields(domainAfterFirstUpdate, "updateTimestamp");
}
tm().clearSessionCache();
assertAboutImmutableObjects()
.that(loadAtPointInTime(latest, timeAtSecondUpdate))
.hasFieldsEqualTo(domainAfterSecondUpdate);
.isEqualExceptFields(domainAfterSecondUpdate, "updateTimestamp");
tm().clearSessionCache();
assertAboutImmutableObjects()
.that(loadAtPointInTime(latest, timeAtSecondUpdate.plusDays(1)))
.hasFieldsEqualTo(domainAfterSecondUpdate);
.isEqualExceptFields(domainAfterSecondUpdate, "updateTimestamp");
// Deletion time has millisecond granularity due to isActive() check.
tm().clearSessionCache();

View file

@ -122,6 +122,13 @@ public class ContactResourceTest extends EntityTestCase {
contactResource = persistResource(cloneAndSetAutoTimestamps(originalContact));
}
@Test
void testContactBaseToContactResource() {
ImmutableObjectSubject.assertAboutImmutableObjects()
.that(new ContactResource.Builder().copyFrom(contactResource).build())
.isEqualExceptFields(contactResource, "updateTimestamp", "revisions");
}
@Test
void testCloudSqlPersistence_failWhenViolateForeignKeyConstraint() {
assertThrowForeignKeyViolation(() -> jpaTm().transact(() -> jpaTm().insert(originalContact)));

View file

@ -39,6 +39,7 @@ import com.google.common.collect.Streams;
import com.googlecode.objectify.Key;
import google.registry.model.EntityTestCase;
import google.registry.model.ImmutableObject;
import google.registry.model.ImmutableObjectSubject;
import google.registry.model.billing.BillingEvent;
import google.registry.model.billing.BillingEvent.Reason;
import google.registry.model.contact.ContactResource;
@ -171,6 +172,13 @@ public class DomainBaseTest extends EntityTestCase {
.build()));
}
@Test
void testDomainContentToDomainBase() {
ImmutableObjectSubject.assertAboutImmutableObjects()
.that(new DomainBase.Builder().copyFrom(domain).build())
.isEqualExceptFields(domain, "updateTimestamp", "revisions");
}
@Test
void testPersistence() {
// Note that this only verifies that the value stored under the foreign key is the same as that

View file

@ -31,6 +31,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.net.InetAddresses;
import google.registry.model.EntityTestCase;
import google.registry.model.ImmutableObjectSubject;
import google.registry.model.domain.DomainBase;
import google.registry.model.eppcommon.StatusValue;
import google.registry.model.eppcommon.Trid;
@ -89,6 +90,13 @@ class HostResourceTest extends EntityTestCase {
.build()));
}
@TestOfyAndSql
void testHostBaseToHostResource() {
ImmutableObjectSubject.assertAboutImmutableObjects()
.that(new HostResource.Builder().copyFrom(host).build())
.isEqualExceptFields(host, "updateTimestamp", "revisions");
}
@TestOfyAndSql
void testPersistence() {
HostResource newHost = host.asBuilder().setRepoId("NEWHOST").build();

View file

@ -1085,8 +1085,7 @@ public class DatabaseHelper {
.setClientId(resource.getCreationClientId())
.setType(getHistoryEntryType(resource))
.setModificationTime(tm().getTransactionTime())
.build()
.toChildHistoryEntity());
.build());
ofyTmOrDoNothing(
() -> tm().put(ForeignKeyIndex.create(resource, resource.getDeletionTime())));
});

View file

@ -18,6 +18,8 @@ import static google.registry.model.ofy.ObjectifyService.auditedOfy;
import com.google.apphosting.api.ApiProxy;
import com.google.apphosting.api.ApiProxy.Environment;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Map;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
@ -25,7 +27,7 @@ import org.junit.jupiter.api.extension.ExtensionContext;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
/**
* Allows instantiation of Datastore {@code Entity entities} without the heavyweight {@code
* Allows instantiation of Datastore {@code Entity}s without the heavyweight {@link
* AppEngineExtension}.
*
* <p>When an Ofy key is created, by calling the various Key.create() methods, whether the current
@ -44,6 +46,27 @@ public class DatastoreEntityExtension implements BeforeEachCallback, AfterEachCa
private static final Environment PLACEHOLDER_ENV = new PlaceholderEnvironment();
private boolean allThreads = false;
/**
* Whether all threads should be masqueraded as GAE threads.
*
* <p>This is particularly useful when new threads are spawned during a test. For example when
* testing Beam pipelines, the test pipeline runs the transforms in separate threads than the test
* thread. If Ofy keys are created in transforms, this value needs to be set to true.
*
* <p>Warning: by setting this value to true, any thread spawned by the current JVM will be have
* the thread local property set to the placeholder value during the execution of the current
* test, including those running other tests in parallel. This may or may not cause an issue when
* other tests have {@link AppEngineExtension} registered, that creates a much more fully
* functional GAE test environment. Consider moving tests using this extension to {@code
* outcastTest} if necessary.
*/
public DatastoreEntityExtension allThreads(boolean value) {
allThreads = value;
return this;
}
@Override
public void beforeEach(ExtensionContext context) {
ApiProxy.setEnvironmentForCurrentThread(PLACEHOLDER_ENV);
@ -51,12 +74,21 @@ public class DatastoreEntityExtension implements BeforeEachCallback, AfterEachCa
// will load the ObjectifyService class, whose static initialization block registers all Ofy
// entities.
auditedOfy();
if (allThreads) {
ApiProxy.setEnvironmentFactory(() -> PLACEHOLDER_ENV);
}
}
@Override
public void afterEach(ExtensionContext context) {
public void afterEach(ExtensionContext context)
throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
// Clear the cached instance.
ApiProxy.setEnvironmentForCurrentThread(null);
ApiProxy.clearEnvironmentForCurrentThread();
if (allThreads) {
Method method = ApiProxy.class.getDeclaredMethod("clearEnvironmentFactory");
method.setAccessible(true);
method.invoke(null);
}
}
private static final class PlaceholderEnvironment implements Environment {

View file

@ -93,7 +93,9 @@ steps:
google.registry.beam.spec11.Spec11Pipeline \
google/registry/beam/spec11_pipeline_metadata.json \
google.registry.beam.invoicing.InvoicingPipeline \
google/registry/beam/invoicing_pipeline_metadata.json
google/registry/beam/invoicing_pipeline_metadata.json \
google.registry.beam.rde.RdePipeline \
google/registry/beam/rde_pipeline_metadata.json
# Tentatively build and publish Cloud SQL schema jar here, before schema release
# process is finalized. Also publish nomulus:core jars that are needed for
# server/schema compatibility tests.