diff --git a/core/src/main/java/google/registry/beam/rde/RdePipeline.java b/core/src/main/java/google/registry/beam/rde/RdePipeline.java index ced195ea5..5185228aa 100644 --- a/core/src/main/java/google/registry/beam/rde/RdePipeline.java +++ b/core/src/main/java/google/registry/beam/rde/RdePipeline.java @@ -15,6 +15,7 @@ package google.registry.beam.rde; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static google.registry.beam.rde.RdePipeline.TupleTags.DOMAIN_FRAGMENTS; import static google.registry.beam.rde.RdePipeline.TupleTags.EXTERNAL_HOST_FRAGMENTS; @@ -28,10 +29,12 @@ import static google.registry.model.reporting.HistoryEntryDao.RESOURCE_TYPES_TO_ import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; import static org.apache.beam.sdk.values.TypeDescriptors.kvs; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import com.google.common.collect.Streams; +import com.google.common.flogger.FluentLogger; import com.google.common.io.BaseEncoding; import dagger.BindsInstance; import dagger.Component; @@ -198,6 +201,8 @@ public class RdePipeline implements Serializable { HostHistory.class, "hostBase"); + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); + @Inject RdePipeline(RdePipelineOptions options, GcsUtils gcsUtils, CloudTasksUtils cloudTasksUtils) { this.options = options; @@ -357,6 +362,33 @@ public class RdePipeline implements Serializable { .setCoder(KvCoder.of(StringUtf8Coder.of(), VarLongCoder.of())); } + private EppResource loadResourceByHistoryEntryId( + Class historyEntryClazz, String repoId, Iterable revisionIds) { + ImmutableList ids = ImmutableList.copyOf(revisionIds); + // The size should always be 1 because we are only getting one repo ID -> revision ID pair per + // repo ID from the source transform (the JPA query in the method above). But for some reason + // after CoGroupByKey (joining the revision IDs and the pending deposits on repo IDs), in + // #removedUnreferencedResources, duplicate revision IDs are sometimes introduced. Here we + // attempt to deduplicate the iterable. If it contains multiple revision IDs that are NOT the + // same, we have a more serious problem as we cannot be sure which one to use. We should use the + // highest revision ID, but we don't even know where it comes from, as the query should + // definitively only give us one revision ID per repo ID. In this case we have to abort and + // require manual intervention. + if (ids.size() != 1) { + ImmutableSet dedupedIds = ImmutableSet.copyOf(ids); + checkState( + dedupedIds.size() == 1, + "Multiple unique revision IDs detected for %s repo ID %s: %s", + EPP_RESOURCE_FIELD_NAME.get(historyEntryClazz), + repoId, + ids); + logger.atSevere().log( + "Duplicate revision IDs detected for %s repo ID %s: %s", + EPP_RESOURCE_FIELD_NAME.get(historyEntryClazz), repoId, ids); + } + return loadResourceByHistoryEntryId(historyEntryClazz, repoId, ids.get(0)); + } + private EppResource loadResourceByHistoryEntryId( Class historyEntryClazz, String repoId, long revisionId) { try { @@ -516,7 +548,7 @@ public class RdePipeline implements Serializable { loadResourceByHistoryEntryId( ContactHistory.class, kv.getKey(), - kv.getValue().getOnly(REVISION_ID)); + kv.getValue().getAll(REVISION_ID)); DepositFragment fragment = marshaller.marshalContact(contact); ImmutableSet> fragments = Streams.stream(kv.getValue().getAll(PENDING_DEPOSIT)) @@ -549,8 +581,8 @@ public class RdePipeline implements Serializable { loadResourceByHistoryEntryId( HostHistory.class, kv.getKey(), - kv.getValue().getOnly(REVISION_ID)); - // When a host is subordinate, we need to find it's superordinate domain and + kv.getValue().getAll(REVISION_ID)); + // When a host is subordinate, we need to find its superordinate domain and // include it in the deposit as well. if (host.isSubordinate()) { subordinateHostCounter.inc(); @@ -627,7 +659,7 @@ public class RdePipeline implements Serializable { loadResourceByHistoryEntryId( DomainHistory.class, kv.getKey(), - kv.getValue().getOnly(REVISION_ID)); + kv.getValue().getAll(REVISION_ID)); ImmutableSet.Builder> results = new ImmutableSet.Builder<>(); for (KV hostToPendingDeposits : @@ -637,7 +669,7 @@ public class RdePipeline implements Serializable { loadResourceByHistoryEntryId( HostHistory.class, hostToPendingDeposits.getKey(), - hostToPendingDeposits.getValue().getOnly(REVISION_ID)); + hostToPendingDeposits.getValue().getAll(REVISION_ID)); DepositFragment fragment = marshaller.marshalSubordinateHost(host, superordinateDomain); Streams.stream(hostToPendingDeposits.getValue().getAll(PENDING_DEPOSIT)) @@ -696,6 +728,7 @@ public class RdePipeline implements Serializable { * CoGbkResult}s are used. */ protected abstract static class TupleTags { + protected static final TupleTag> DOMAIN_FRAGMENTS = new TupleTag>() {}; @@ -729,10 +762,12 @@ public class RdePipeline implements Serializable { UtilsModule.class }) interface RdePipelineComponent { + RdePipeline rdePipeline(); @Component.Builder interface Builder { + @BindsInstance Builder options(RdePipelineOptions options);