Fix a in issue with RDE (#1593)

For some inexplicable reason, the RDE beam pipeline in both sandbox and
production has been broken for the past week or so. Our investigations
revealed that during the CoGropuByKey stage, some repo ID -> revision ID
pairs were duplicated. This may be a problem with the Dataflow runtime
which somehow introduced the duplicate during reshuffling.

This PR attempts to fix the symptom only by deduping the revision IDs. We
will do some more investigation and possibly follow up with the Dataflow
team if we determine it is an upstream issue.

TESTED=deployed the pipeline and successfully run sandbox RDE with it.
This commit is contained in:
Lai Jiang 2022-04-15 10:32:44 -04:00 committed by GitHub
parent a3b8ad4cfc
commit 98461c1875

View file

@ -15,6 +15,7 @@
package google.registry.beam.rde;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static google.registry.beam.rde.RdePipeline.TupleTags.DOMAIN_FRAGMENTS;
import static google.registry.beam.rde.RdePipeline.TupleTags.EXTERNAL_HOST_FRAGMENTS;
@ -28,10 +29,12 @@ import static google.registry.model.reporting.HistoryEntryDao.RESOURCE_TYPES_TO_
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import static org.apache.beam.sdk.values.TypeDescriptors.kvs;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.collect.Streams;
import com.google.common.flogger.FluentLogger;
import com.google.common.io.BaseEncoding;
import dagger.BindsInstance;
import dagger.Component;
@ -198,6 +201,8 @@ public class RdePipeline implements Serializable {
HostHistory.class,
"hostBase");
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
@Inject
RdePipeline(RdePipelineOptions options, GcsUtils gcsUtils, CloudTasksUtils cloudTasksUtils) {
this.options = options;
@ -357,6 +362,33 @@ public class RdePipeline implements Serializable {
.setCoder(KvCoder.of(StringUtf8Coder.of(), VarLongCoder.of()));
}
private <T extends HistoryEntry> EppResource loadResourceByHistoryEntryId(
Class<T> historyEntryClazz, String repoId, Iterable<Long> revisionIds) {
ImmutableList<Long> ids = ImmutableList.copyOf(revisionIds);
// The size should always be 1 because we are only getting one repo ID -> revision ID pair per
// repo ID from the source transform (the JPA query in the method above). But for some reason
// after CoGroupByKey (joining the revision IDs and the pending deposits on repo IDs), in
// #removedUnreferencedResources, duplicate revision IDs are sometimes introduced. Here we
// attempt to deduplicate the iterable. If it contains multiple revision IDs that are NOT the
// same, we have a more serious problem as we cannot be sure which one to use. We should use the
// highest revision ID, but we don't even know where it comes from, as the query should
// definitively only give us one revision ID per repo ID. In this case we have to abort and
// require manual intervention.
if (ids.size() != 1) {
ImmutableSet<Long> dedupedIds = ImmutableSet.copyOf(ids);
checkState(
dedupedIds.size() == 1,
"Multiple unique revision IDs detected for %s repo ID %s: %s",
EPP_RESOURCE_FIELD_NAME.get(historyEntryClazz),
repoId,
ids);
logger.atSevere().log(
"Duplicate revision IDs detected for %s repo ID %s: %s",
EPP_RESOURCE_FIELD_NAME.get(historyEntryClazz), repoId, ids);
}
return loadResourceByHistoryEntryId(historyEntryClazz, repoId, ids.get(0));
}
private <T extends HistoryEntry> EppResource loadResourceByHistoryEntryId(
Class<T> historyEntryClazz, String repoId, long revisionId) {
try {
@ -516,7 +548,7 @@ public class RdePipeline implements Serializable {
loadResourceByHistoryEntryId(
ContactHistory.class,
kv.getKey(),
kv.getValue().getOnly(REVISION_ID));
kv.getValue().getAll(REVISION_ID));
DepositFragment fragment = marshaller.marshalContact(contact);
ImmutableSet<KV<PendingDeposit, DepositFragment>> fragments =
Streams.stream(kv.getValue().getAll(PENDING_DEPOSIT))
@ -549,8 +581,8 @@ public class RdePipeline implements Serializable {
loadResourceByHistoryEntryId(
HostHistory.class,
kv.getKey(),
kv.getValue().getOnly(REVISION_ID));
// When a host is subordinate, we need to find it's superordinate domain and
kv.getValue().getAll(REVISION_ID));
// When a host is subordinate, we need to find its superordinate domain and
// include it in the deposit as well.
if (host.isSubordinate()) {
subordinateHostCounter.inc();
@ -627,7 +659,7 @@ public class RdePipeline implements Serializable {
loadResourceByHistoryEntryId(
DomainHistory.class,
kv.getKey(),
kv.getValue().getOnly(REVISION_ID));
kv.getValue().getAll(REVISION_ID));
ImmutableSet.Builder<KV<PendingDeposit, DepositFragment>> results =
new ImmutableSet.Builder<>();
for (KV<String, CoGbkResult> hostToPendingDeposits :
@ -637,7 +669,7 @@ public class RdePipeline implements Serializable {
loadResourceByHistoryEntryId(
HostHistory.class,
hostToPendingDeposits.getKey(),
hostToPendingDeposits.getValue().getOnly(REVISION_ID));
hostToPendingDeposits.getValue().getAll(REVISION_ID));
DepositFragment fragment =
marshaller.marshalSubordinateHost(host, superordinateDomain);
Streams.stream(hostToPendingDeposits.getValue().getAll(PENDING_DEPOSIT))
@ -696,6 +728,7 @@ public class RdePipeline implements Serializable {
* CoGbkResult}s are used.
*/
protected abstract static class TupleTags {
protected static final TupleTag<KV<PendingDeposit, DepositFragment>> DOMAIN_FRAGMENTS =
new TupleTag<KV<PendingDeposit, DepositFragment>>() {};
@ -729,10 +762,12 @@ public class RdePipeline implements Serializable {
UtilsModule.class
})
interface RdePipelineComponent {
RdePipeline rdePipeline();
@Component.Builder
interface Builder {
@BindsInstance
Builder options(RdePipelineOptions options);