From 98461c18752c995835e2d6360335d4d02ff95f67 Mon Sep 17 00:00:00 2001 From: Lai Jiang Date: Fri, 15 Apr 2022 10:32:44 -0400 Subject: [PATCH] Fix a in issue with RDE (#1593) For some inexplicable reason, the RDE beam pipeline in both sandbox and production has been broken for the past week or so. Our investigations revealed that during the CoGropuByKey stage, some repo ID -> revision ID pairs were duplicated. This may be a problem with the Dataflow runtime which somehow introduced the duplicate during reshuffling. This PR attempts to fix the symptom only by deduping the revision IDs. We will do some more investigation and possibly follow up with the Dataflow team if we determine it is an upstream issue. TESTED=deployed the pipeline and successfully run sandbox RDE with it. --- .../google/registry/beam/rde/RdePipeline.java | 45 ++++++++++++++++--- 1 file changed, 40 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/google/registry/beam/rde/RdePipeline.java b/core/src/main/java/google/registry/beam/rde/RdePipeline.java index ced195ea5..5185228aa 100644 --- a/core/src/main/java/google/registry/beam/rde/RdePipeline.java +++ b/core/src/main/java/google/registry/beam/rde/RdePipeline.java @@ -15,6 +15,7 @@ package google.registry.beam.rde; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static google.registry.beam.rde.RdePipeline.TupleTags.DOMAIN_FRAGMENTS; import static google.registry.beam.rde.RdePipeline.TupleTags.EXTERNAL_HOST_FRAGMENTS; @@ -28,10 +29,12 @@ import static google.registry.model.reporting.HistoryEntryDao.RESOURCE_TYPES_TO_ import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; import static org.apache.beam.sdk.values.TypeDescriptors.kvs; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import com.google.common.collect.Streams; +import com.google.common.flogger.FluentLogger; import com.google.common.io.BaseEncoding; import dagger.BindsInstance; import dagger.Component; @@ -198,6 +201,8 @@ public class RdePipeline implements Serializable { HostHistory.class, "hostBase"); + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); + @Inject RdePipeline(RdePipelineOptions options, GcsUtils gcsUtils, CloudTasksUtils cloudTasksUtils) { this.options = options; @@ -357,6 +362,33 @@ public class RdePipeline implements Serializable { .setCoder(KvCoder.of(StringUtf8Coder.of(), VarLongCoder.of())); } + private EppResource loadResourceByHistoryEntryId( + Class historyEntryClazz, String repoId, Iterable revisionIds) { + ImmutableList ids = ImmutableList.copyOf(revisionIds); + // The size should always be 1 because we are only getting one repo ID -> revision ID pair per + // repo ID from the source transform (the JPA query in the method above). But for some reason + // after CoGroupByKey (joining the revision IDs and the pending deposits on repo IDs), in + // #removedUnreferencedResources, duplicate revision IDs are sometimes introduced. Here we + // attempt to deduplicate the iterable. If it contains multiple revision IDs that are NOT the + // same, we have a more serious problem as we cannot be sure which one to use. We should use the + // highest revision ID, but we don't even know where it comes from, as the query should + // definitively only give us one revision ID per repo ID. In this case we have to abort and + // require manual intervention. + if (ids.size() != 1) { + ImmutableSet dedupedIds = ImmutableSet.copyOf(ids); + checkState( + dedupedIds.size() == 1, + "Multiple unique revision IDs detected for %s repo ID %s: %s", + EPP_RESOURCE_FIELD_NAME.get(historyEntryClazz), + repoId, + ids); + logger.atSevere().log( + "Duplicate revision IDs detected for %s repo ID %s: %s", + EPP_RESOURCE_FIELD_NAME.get(historyEntryClazz), repoId, ids); + } + return loadResourceByHistoryEntryId(historyEntryClazz, repoId, ids.get(0)); + } + private EppResource loadResourceByHistoryEntryId( Class historyEntryClazz, String repoId, long revisionId) { try { @@ -516,7 +548,7 @@ public class RdePipeline implements Serializable { loadResourceByHistoryEntryId( ContactHistory.class, kv.getKey(), - kv.getValue().getOnly(REVISION_ID)); + kv.getValue().getAll(REVISION_ID)); DepositFragment fragment = marshaller.marshalContact(contact); ImmutableSet> fragments = Streams.stream(kv.getValue().getAll(PENDING_DEPOSIT)) @@ -549,8 +581,8 @@ public class RdePipeline implements Serializable { loadResourceByHistoryEntryId( HostHistory.class, kv.getKey(), - kv.getValue().getOnly(REVISION_ID)); - // When a host is subordinate, we need to find it's superordinate domain and + kv.getValue().getAll(REVISION_ID)); + // When a host is subordinate, we need to find its superordinate domain and // include it in the deposit as well. if (host.isSubordinate()) { subordinateHostCounter.inc(); @@ -627,7 +659,7 @@ public class RdePipeline implements Serializable { loadResourceByHistoryEntryId( DomainHistory.class, kv.getKey(), - kv.getValue().getOnly(REVISION_ID)); + kv.getValue().getAll(REVISION_ID)); ImmutableSet.Builder> results = new ImmutableSet.Builder<>(); for (KV hostToPendingDeposits : @@ -637,7 +669,7 @@ public class RdePipeline implements Serializable { loadResourceByHistoryEntryId( HostHistory.class, hostToPendingDeposits.getKey(), - hostToPendingDeposits.getValue().getOnly(REVISION_ID)); + hostToPendingDeposits.getValue().getAll(REVISION_ID)); DepositFragment fragment = marshaller.marshalSubordinateHost(host, superordinateDomain); Streams.stream(hostToPendingDeposits.getValue().getAll(PENDING_DEPOSIT)) @@ -696,6 +728,7 @@ public class RdePipeline implements Serializable { * CoGbkResult}s are used. */ protected abstract static class TupleTags { + protected static final TupleTag> DOMAIN_FRAGMENTS = new TupleTag>() {}; @@ -729,10 +762,12 @@ public class RdePipeline implements Serializable { UtilsModule.class }) interface RdePipelineComponent { + RdePipeline rdePipeline(); @Component.Builder interface Builder { + @BindsInstance Builder options(RdePipelineOptions options);