From 70efdc1cb7989e13f24ba70ca7988a84a22c734d Mon Sep 17 00:00:00 2001 From: guyben Date: Mon, 2 Apr 2018 13:30:46 -0700 Subject: [PATCH] Add counters to the RdeStaging mapreduce ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=191339016 --- .../google/registry/rde/RdeStagingMapper.java | 57 +++++++++++++------ 1 file changed, 41 insertions(+), 16 deletions(-) diff --git a/java/google/registry/rde/RdeStagingMapper.java b/java/google/registry/rde/RdeStagingMapper.java index 8e9b41316..09e347b7a 100644 --- a/java/google/registry/rde/RdeStagingMapper.java +++ b/java/google/registry/rde/RdeStagingMapper.java @@ -25,7 +25,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSetMultimap; import com.google.common.collect.Maps; -import com.google.common.collect.Streams; import com.googlecode.objectify.Result; import google.registry.model.EppResource; import google.registry.model.contact.ContactResource; @@ -62,12 +61,15 @@ public final class RdeStagingMapper extends Mapper tlds = - shouldEmitOnAllTlds - ? pendings.keySet() - : ImmutableSet.of(((DomainResource) resource).getTld()); + // The set of all TLDs to which this resource should be emitted. + ImmutableSet tlds; + if (resource instanceof DomainResource) { + String tld = ((DomainResource) resource).getTld(); + if (!pendings.containsKey(tld)) { + getContext().incrementCounter("DomainResource of an unneeded TLD skipped"); + return; + } + getContext().incrementCounter("DomainResource instances"); + tlds = ImmutableSet.of(tld); + } else { + getContext().incrementCounter("non-DomainResource instances"); + // Contacts and hosts get emitted on all TLDs, even if domains don't reference them. + tlds = pendings.keySet(); + } // Get the set of all point-in-time watermarks we need, to minimize rewinding. ImmutableSet dates = - Streams.stream( - shouldEmitOnAllTlds - ? pendings.values() - : pendings.get(((DomainResource) resource).getTld())) + tlds.stream() + .map(pendings::get) + .flatMap(ImmutableSet::stream) .map(PendingDeposit::watermark) .collect(toImmutableSet()); @@ -111,6 +121,7 @@ public final class RdeStagingMapper extends Mapper emit(pending, fragment)); + Optional fragment = + fragmenter.marshal(pending.watermark(), pending.mode()); + if (fragment.isPresent()) { + emit(pending, fragment.get()); + resourcesEmitted++; + } } } + getContext().incrementCounter("resources emitted", resourcesEmitted); + getContext().incrementCounter("fragmenter cache hits", fragmenter.cacheHits); + getContext().incrementCounter("fragmenter resources not found", fragmenter.resourcesNotFound); + getContext().incrementCounter("fragmenter resources found", fragmenter.resourcesFound); // Avoid running out of memory. ofy().clearSessionCache(); @@ -134,6 +152,10 @@ public final class RdeStagingMapper extends Mapper> cache = new HashMap<>(); private final ImmutableMap> resourceAtTimes; + long cacheHits = 0; + long resourcesNotFound = 0; + long resourcesFound = 0; + Fragmenter(ImmutableMap> resourceAtTimes) { this.resourceAtTimes = resourceAtTimes; } @@ -141,6 +163,7 @@ public final class RdeStagingMapper extends Mapper marshal(DateTime watermark, RdeMode mode) { Optional result = cache.get(WatermarkModePair.create(watermark, mode)); if (result != null) { + cacheHits++; return result; } EppResource resource = resourceAtTimes.get(watermark).now(); @@ -148,8 +171,10 @@ public final class RdeStagingMapper extends Mapper