Add counters to the RdeStaging mapreduce

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=191339016
This commit is contained in:
guyben 2018-04-02 13:30:46 -07:00 committed by Ben McIlwain
parent 3dff2ba4c7
commit 70efdc1cb7

View file

@ -25,7 +25,6 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSetMultimap; import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Streams;
import com.googlecode.objectify.Result; import com.googlecode.objectify.Result;
import google.registry.model.EppResource; import google.registry.model.EppResource;
import google.registry.model.contact.ContactResource; import google.registry.model.contact.ContactResource;
@ -62,12 +61,15 @@ public final class RdeStagingMapper extends Mapper<EppResource, PendingDeposit,
// emitted from the mapper. Without this, a cursor might never advance because no EppResource // emitted from the mapper. Without this, a cursor might never advance because no EppResource
// entity exists at the watermark. // entity exists at the watermark.
if (resource == null) { if (resource == null) {
long registrarsEmitted = 0;
for (Registrar registrar : Registrar.loadAllCached()) { for (Registrar registrar : Registrar.loadAllCached()) {
DepositFragment fragment = marshaller.marshalRegistrar(registrar); DepositFragment fragment = marshaller.marshalRegistrar(registrar);
for (PendingDeposit pending : pendings.values()) { for (PendingDeposit pending : pendings.values()) {
emit(pending, fragment); emit(pending, fragment);
registrarsEmitted++;
} }
} }
getContext().incrementCounter("registrars emitted", registrarsEmitted);
return; return;
} }
@ -75,6 +77,7 @@ public final class RdeStagingMapper extends Mapper<EppResource, PendingDeposit,
if (!(resource instanceof ContactResource if (!(resource instanceof ContactResource
|| resource instanceof DomainResource || resource instanceof DomainResource
|| resource instanceof HostResource)) { || resource instanceof HostResource)) {
getContext().incrementCounter("polymorphic entities skipped");
return; return;
} }
@ -82,24 +85,31 @@ public final class RdeStagingMapper extends Mapper<EppResource, PendingDeposit,
if (nullToEmpty(resource.getCreationClientId()).startsWith("prober-") if (nullToEmpty(resource.getCreationClientId()).startsWith("prober-")
|| nullToEmpty(resource.getPersistedCurrentSponsorClientId()).startsWith("prober-") || nullToEmpty(resource.getPersistedCurrentSponsorClientId()).startsWith("prober-")
|| nullToEmpty(resource.getLastEppUpdateClientId()).startsWith("prober-")) { || nullToEmpty(resource.getLastEppUpdateClientId()).startsWith("prober-")) {
getContext().incrementCounter("prober data skipped");
return; return;
} }
// Contacts and hosts get emitted on all TLDs, even if domains don't reference them. // The set of all TLDs to which this resource should be emitted.
boolean shouldEmitOnAllTlds = !(resource instanceof DomainResource); ImmutableSet<String> tlds;
if (resource instanceof DomainResource) {
// Get the set of all TLDs to which this resource should be emitted. String tld = ((DomainResource) resource).getTld();
ImmutableSet<String> tlds = if (!pendings.containsKey(tld)) {
shouldEmitOnAllTlds getContext().incrementCounter("DomainResource of an unneeded TLD skipped");
? pendings.keySet() return;
: ImmutableSet.of(((DomainResource) resource).getTld()); }
getContext().incrementCounter("DomainResource instances");
tlds = ImmutableSet.of(tld);
} else {
getContext().incrementCounter("non-DomainResource instances");
// Contacts and hosts get emitted on all TLDs, even if domains don't reference them.
tlds = pendings.keySet();
}
// Get the set of all point-in-time watermarks we need, to minimize rewinding. // Get the set of all point-in-time watermarks we need, to minimize rewinding.
ImmutableSet<DateTime> dates = ImmutableSet<DateTime> dates =
Streams.stream( tlds.stream()
shouldEmitOnAllTlds .map(pendings::get)
? pendings.values() .flatMap(ImmutableSet::stream)
: pendings.get(((DomainResource) resource).getTld()))
.map(PendingDeposit::watermark) .map(PendingDeposit::watermark)
.collect(toImmutableSet()); .collect(toImmutableSet());
@ -111,6 +121,7 @@ public final class RdeStagingMapper extends Mapper<EppResource, PendingDeposit,
Fragmenter fragmenter = new Fragmenter(resourceAtTimes); Fragmenter fragmenter = new Fragmenter(resourceAtTimes);
// Emit resource as an XML fragment for all TLDs and modes pending deposit. // Emit resource as an XML fragment for all TLDs and modes pending deposit.
long resourcesEmitted = 0;
for (String tld : tlds) { for (String tld : tlds) {
for (PendingDeposit pending : pendings.get(tld)) { for (PendingDeposit pending : pendings.get(tld)) {
// Hosts and contacts don't get included in BRDA deposits. // Hosts and contacts don't get included in BRDA deposits.
@ -119,11 +130,18 @@ public final class RdeStagingMapper extends Mapper<EppResource, PendingDeposit,
|| resource instanceof HostResource)) { || resource instanceof HostResource)) {
continue; continue;
} }
fragmenter Optional<DepositFragment> fragment =
.marshal(pending.watermark(), pending.mode()) fragmenter.marshal(pending.watermark(), pending.mode());
.ifPresent(fragment -> emit(pending, fragment)); if (fragment.isPresent()) {
emit(pending, fragment.get());
resourcesEmitted++;
}
} }
} }
getContext().incrementCounter("resources emitted", resourcesEmitted);
getContext().incrementCounter("fragmenter cache hits", fragmenter.cacheHits);
getContext().incrementCounter("fragmenter resources not found", fragmenter.resourcesNotFound);
getContext().incrementCounter("fragmenter resources found", fragmenter.resourcesFound);
// Avoid running out of memory. // Avoid running out of memory.
ofy().clearSessionCache(); ofy().clearSessionCache();
@ -134,6 +152,10 @@ public final class RdeStagingMapper extends Mapper<EppResource, PendingDeposit,
private final Map<WatermarkModePair, Optional<DepositFragment>> cache = new HashMap<>(); private final Map<WatermarkModePair, Optional<DepositFragment>> cache = new HashMap<>();
private final ImmutableMap<DateTime, Result<EppResource>> resourceAtTimes; private final ImmutableMap<DateTime, Result<EppResource>> resourceAtTimes;
long cacheHits = 0;
long resourcesNotFound = 0;
long resourcesFound = 0;
Fragmenter(ImmutableMap<DateTime, Result<EppResource>> resourceAtTimes) { Fragmenter(ImmutableMap<DateTime, Result<EppResource>> resourceAtTimes) {
this.resourceAtTimes = resourceAtTimes; this.resourceAtTimes = resourceAtTimes;
} }
@ -141,6 +163,7 @@ public final class RdeStagingMapper extends Mapper<EppResource, PendingDeposit,
Optional<DepositFragment> marshal(DateTime watermark, RdeMode mode) { Optional<DepositFragment> marshal(DateTime watermark, RdeMode mode) {
Optional<DepositFragment> result = cache.get(WatermarkModePair.create(watermark, mode)); Optional<DepositFragment> result = cache.get(WatermarkModePair.create(watermark, mode));
if (result != null) { if (result != null) {
cacheHits++;
return result; return result;
} }
EppResource resource = resourceAtTimes.get(watermark).now(); EppResource resource = resourceAtTimes.get(watermark).now();
@ -148,8 +171,10 @@ public final class RdeStagingMapper extends Mapper<EppResource, PendingDeposit,
result = Optional.empty(); result = Optional.empty();
cache.put(WatermarkModePair.create(watermark, RdeMode.FULL), result); cache.put(WatermarkModePair.create(watermark, RdeMode.FULL), result);
cache.put(WatermarkModePair.create(watermark, RdeMode.THIN), result); cache.put(WatermarkModePair.create(watermark, RdeMode.THIN), result);
resourcesNotFound++;
return result; return result;
} }
resourcesFound++;
if (resource instanceof DomainResource) { if (resource instanceof DomainResource) {
result = Optional.of(marshaller.marshalDomain((DomainResource) resource, mode)); result = Optional.of(marshaller.marshalDomain((DomainResource) resource, mode));
cache.put(WatermarkModePair.create(watermark, mode), result); cache.put(WatermarkModePair.create(watermark, mode), result);