Change to metrics to keep track of when the metric value was first set

This CL also adds IncrementableMetric#reset() methods to allow resetting the
value and start timestamp of IncrementableMetrics.

This is necessary because some backends, like Stackdriver, use non-monotonic
changes in cumulative metric values to detect timeseries restarts. Tracking and
re-setting the start timestamp allows users to track mostly monotonic metrics
which may have non-monotonic discontinuities.

See https://cloud.google.com/monitoring/api/ref_v3/rest/v3/TimeSeries#Point for
more details.

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=130795229
This commit is contained in:
shikhman 2016-08-19 14:56:35 -07:00 committed by Ben McIlwain
parent b6eaba08eb
commit 91f8b6da38
7 changed files with 192 additions and 26 deletions

View file

@ -20,8 +20,12 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.AtomicLongMap; import com.google.common.util.concurrent.AtomicLongMap;
import com.google.common.util.concurrent.Striped;
import google.registry.monitoring.metrics.MetricSchema.Kind; import google.registry.monitoring.metrics.MetricSchema.Kind;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import javax.annotation.concurrent.ThreadSafe; import javax.annotation.concurrent.ThreadSafe;
import org.joda.time.Instant; import org.joda.time.Instant;
@ -29,17 +33,53 @@ import org.joda.time.Instant;
* A metric which stores Long values. It is stateful and can be changed in increments. * A metric which stores Long values. It is stateful and can be changed in increments.
* *
* <p>This metric is generally suitable for counters, such as requests served or errors generated. * <p>This metric is generally suitable for counters, such as requests served or errors generated.
*
* <p>The start of the {@link MetricPoint#interval()} of values of instances of this metric will be
* set to the time that the metric was first set or last {@link #reset()}.
*/ */
@ThreadSafe @ThreadSafe
public final class Counter extends AbstractMetric<Long> public final class Counter extends AbstractMetric<Long>
implements SettableMetric<Long>, IncrementableMetric { implements SettableMetric<Long>, IncrementableMetric {
/**
* The below constants replicate the default initial capacity, load factor, and concurrency level
* for {@link ConcurrentHashMap} as of Java SE 7. They are hardcoded here so that the concurrency
* level in {@code valueLocks} below can be set identically.
*/
private static final int HASHMAP_INITIAL_CAPACITY = 16;
private static final float HASHMAP_LOAD_FACTOR = 0.75f;
private static final int HASHMAP_CONCURRENCY_LEVEL = 16;
private static final String LABEL_COUNT_ERROR = private static final String LABEL_COUNT_ERROR =
"The count of labelValues must be equal to the underlying " "The count of labelValues must be equal to the underlying "
+ "MetricDescriptor's count of labels."; + "MetricDescriptor's count of labels.";
/**
* A map of the {@link Counter} values, with a list of label values as the keys.
*
* <p>This should be modified in a critical section with {@code valueStartTimestamps} so that the
* values are in sync.
*/
private final AtomicLongMap<ImmutableList<String>> values = AtomicLongMap.create(); private final AtomicLongMap<ImmutableList<String>> values = AtomicLongMap.create();
/**
* A map of the {@link Instant} that each value was created, with a list of label values as the
* keys. The start timestamp (as part of the {@link MetricPoint#interval()} can be used by
* implementations of {@link MetricWriter} to encode resets of monotonic counters.
*/
private final ConcurrentHashMap<ImmutableList<String>, Instant> valueStartTimestamps =
new ConcurrentHashMap<>(
HASHMAP_INITIAL_CAPACITY, HASHMAP_LOAD_FACTOR, HASHMAP_CONCURRENCY_LEVEL);
/**
* A fine-grained lock to ensure that {@code values} and {@code valueStartTimestamps} are modified
* and read in a critical section. The initialization parameter is the concurrency level, set to
* match the default concurrency level of {@link ConcurrentHashMap}.
*
* @see Striped
*/
private final Striped<Lock> valueLocks = Striped.lock(HASHMAP_CONCURRENCY_LEVEL);
Counter( Counter(
String name, String name,
String description, String description,
@ -49,8 +89,16 @@ public final class Counter extends AbstractMetric<Long>
} }
@VisibleForTesting @VisibleForTesting
void incrementBy(long offset, ImmutableList<String> labelValues) { void incrementBy(long offset, Instant startTime, ImmutableList<String> labelValues) {
values.addAndGet(labelValues, offset); Lock lock = valueLocks.get(labelValues);
lock.lock();
try {
values.addAndGet(labelValues, offset);
valueStartTimestamps.putIfAbsent(labelValues, startTime);
} finally {
lock.unlock();
}
} }
@Override @Override
@ -58,14 +106,14 @@ public final class Counter extends AbstractMetric<Long>
checkArgument(labelValues.length == this.getMetricSchema().labels().size(), LABEL_COUNT_ERROR); checkArgument(labelValues.length == this.getMetricSchema().labels().size(), LABEL_COUNT_ERROR);
checkArgument(offset >= 0, "The offset provided must be non-negative"); checkArgument(offset >= 0, "The offset provided must be non-negative");
incrementBy(offset, ImmutableList.copyOf(labelValues)); incrementBy(offset, Instant.now(), ImmutableList.copyOf(labelValues));
} }
@Override @Override
public final void increment(String... labelValues) { public final void increment(String... labelValues) {
checkArgument(labelValues.length == this.getMetricSchema().labels().size(), LABEL_COUNT_ERROR); checkArgument(labelValues.length == this.getMetricSchema().labels().size(), LABEL_COUNT_ERROR);
incrementBy(1L, ImmutableList.copyOf(labelValues)); incrementBy(1L, Instant.now(), ImmutableList.copyOf(labelValues));
} }
/** /**
@ -83,23 +131,87 @@ public final class Counter extends AbstractMetric<Long>
} }
@VisibleForTesting @VisibleForTesting
final ImmutableList<MetricPoint<Long>> getTimestampedValues(Instant timestamp) { final ImmutableList<MetricPoint<Long>> getTimestampedValues(Instant endTimestamp) {
ImmutableList.Builder<MetricPoint<Long>> timestampedValues = new ImmutableList.Builder<>(); ImmutableList.Builder<MetricPoint<Long>> timestampedValues = new ImmutableList.Builder<>();
for (Entry<ImmutableList<String>, Long> entry : values.asMap().entrySet()) { for (Entry<ImmutableList<String>, Long> entry : values.asMap().entrySet()) {
timestampedValues.add(MetricPoint.create(this, entry.getKey(), timestamp, entry.getValue())); ImmutableList<String> labelValues = entry.getKey();
valueLocks.get(labelValues).lock();
Instant startTimestamp;
try {
startTimestamp = valueStartTimestamps.get(labelValues);
} finally {
valueLocks.get(labelValues).unlock();
}
timestampedValues.add(
MetricPoint.create(this, labelValues, startTimestamp, endTimestamp, entry.getValue()));
} }
return timestampedValues.build(); return timestampedValues.build();
} }
@VisibleForTesting @VisibleForTesting
final void set(Long value, ImmutableList<String> labelValues) { final void set(Long value, Instant startTime, ImmutableList<String> labelValues) {
this.values.put(labelValues, value); Lock lock = valueLocks.get(labelValues);
lock.lock();
try {
this.values.put(labelValues, value);
valueStartTimestamps.putIfAbsent(labelValues, startTime);
} finally {
lock.unlock();
}
} }
@Override @Override
public final void set(Long value, String... labelValues) { public final void set(Long value, String... labelValues) {
checkArgument(labelValues.length == this.getMetricSchema().labels().size(), LABEL_COUNT_ERROR); checkArgument(labelValues.length == this.getMetricSchema().labels().size(), LABEL_COUNT_ERROR);
set(value, ImmutableList.copyOf(labelValues)); set(value, Instant.now(), ImmutableList.copyOf(labelValues));
}
@VisibleForTesting
final void reset(Instant startTime) {
// Lock the entire set of values so that all existing values will have a consistent timestamp
// after this call, without the possibility of interleaving with another reset() call.
Set<ImmutableList<String>> keys = values.asMap().keySet();
for (int i = 0; i < valueLocks.size(); i++) {
valueLocks.getAt(i).lock();
}
for (ImmutableList<String> labelValues : keys) {
this.values.put(labelValues, 0);
this.valueStartTimestamps.put(labelValues, startTime);
}
for (int i = 0; i < valueLocks.size(); i++) {
valueLocks.getAt(i).unlock();
}
}
@Override
public final void reset() {
reset(Instant.now());
}
@VisibleForTesting
final void reset(Instant startTime, ImmutableList<String> labelValues) {
Lock lock = valueLocks.get(labelValues);
lock.lock();
try {
this.values.put(labelValues, 0);
this.valueStartTimestamps.put(labelValues, startTime);
} finally {
lock.unlock();
}
}
@Override
public final void reset(String... labelValues) {
checkArgument(labelValues.length == this.getMetricSchema().labels().size(), LABEL_COUNT_ERROR);
reset(Instant.now(), ImmutableList.copyOf(labelValues));
} }
} }

View file

@ -23,9 +23,9 @@ package google.registry.monitoring.metrics;
public interface IncrementableMetric extends Metric<Long> { public interface IncrementableMetric extends Metric<Long> {
/** /**
* Increments a metric by 1 for the given set of label values. * Increments a metric by 1 for the given label values.
* *
* Use this method rather than {@link IncrementableMetric#incrementBy(long, String...)} if the * <p>Use this method rather than {@link IncrementableMetric#incrementBy(long, String...)} if the
* increment value is zero, as it will be slightly more performant. * increment value is zero, as it will be slightly more performant.
* *
* <p>If the metric is undefined for given label values, it will be incremented from zero. * <p>If the metric is undefined for given label values, it will be incremented from zero.
@ -37,7 +37,7 @@ public interface IncrementableMetric extends Metric<Long> {
void increment(String... labelValues); void increment(String... labelValues);
/** /**
* Increments a metric by the given non-negative offset for the given set of label values. * Increments a metric by the given non-negative offset for the given label values.
* *
* <p>If the metric is undefined for given label values, it will be incremented from zero. * <p>If the metric is undefined for given label values, it will be incremented from zero.
* *
@ -48,4 +48,20 @@ public interface IncrementableMetric extends Metric<Long> {
* @throws IllegalArgumentException if the offset is negative. * @throws IllegalArgumentException if the offset is negative.
*/ */
void incrementBy(long offset, String... labelValues); void incrementBy(long offset, String... labelValues);
/**
* Resets the value and start timestamp of the metric for the given label values.
*
* <p>This is useful if the counter is tracking a value that is reset as part of a retrying
* transaction, for example.
*/
void reset(String... labelValues);
/**
* Atomically resets the value and start timestamp of the metric for all label values.
*
* <p>This is useful if the counter is tracking values that are reset as part of a retrying
* transaction, for example.
*/
void reset();
} }

View file

@ -24,8 +24,8 @@ import com.google.common.collect.ImmutableList;
public interface Metric<V> { public interface Metric<V> {
/** /**
* Returns the list of the latest {@link MetricPoint} instances for every label-value combination * Returns the latest {@link MetricPoint} instances for every label-value combination tracked for
* tracked for this metric. * this metric.
*/ */
ImmutableList<MetricPoint<V>> getTimestampedValues(); ImmutableList<MetricPoint<V>> getTimestampedValues();

View file

@ -19,10 +19,11 @@ import static com.google.common.base.Preconditions.checkArgument;
import com.google.auto.value.AutoValue; import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import org.joda.time.Instant; import org.joda.time.Instant;
import org.joda.time.Interval;
/** /**
* Value type class to store a point-in-time snapshot of a {@link Metric} value for a given label * Value type class to store a snapshot of a {@link Metric} value for a given label value tuple and
* value tuple. * time {@link Interval}.
*/ */
@AutoValue @AutoValue
public abstract class MetricPoint<V> { public abstract class MetricPoint<V> {
@ -34,21 +35,43 @@ public abstract class MetricPoint<V> {
MetricPoint() {} MetricPoint() {}
/** /**
* Returns a new {@link MetricPoint}. Callers should insure that the count of {@code labelValues} * Returns a new {@link MetricPoint} representing a value at a specific {@link Instant}.
* matches the count of labels for the given metric. *
* <p>Callers should insure that the count of {@code labelValues} matches the count of labels for
* the given metric.
*/ */
static <V> MetricPoint<V> create( static <V> MetricPoint<V> create(
Metric<V> metric, ImmutableList<String> labelValues, Instant timestamp, V value) { Metric<V> metric, ImmutableList<String> labelValues, Instant timestamp, V value) {
checkArgument( checkArgument(
labelValues.size() == metric.getMetricSchema().labels().size(), LABEL_COUNT_ERROR); labelValues.size() == metric.getMetricSchema().labels().size(), LABEL_COUNT_ERROR);
return new AutoValue_MetricPoint<>(metric, labelValues, timestamp, value); return new AutoValue_MetricPoint<>(
metric, labelValues, new Interval(timestamp, timestamp), value);
}
/**
* Returns a new {@link MetricPoint} representing a value over an {@link Interval} from {@code
* startTime} to {@code endTime}.
*
* <p>Callers should insure that the count of {@code labelValues} matches the count of labels for
* the given metric.
*/
static <V> MetricPoint<V> create(
Metric<V> metric,
ImmutableList<String> labelValues,
Instant startTime,
Instant endTime,
V value) {
checkArgument(
labelValues.size() == metric.getMetricSchema().labels().size(), LABEL_COUNT_ERROR);
return new AutoValue_MetricPoint<>(
metric, labelValues, new Interval(startTime, endTime), value);
} }
public abstract Metric<V> metric(); public abstract Metric<V> metric();
public abstract ImmutableList<String> labelValues(); public abstract ImmutableList<String> labelValues();
public abstract Instant timestamp(); public abstract Interval interval();
public abstract V value(); public abstract V value();
} }

View file

@ -46,6 +46,7 @@ import java.util.logging.Logger;
import javax.annotation.concurrent.NotThreadSafe; import javax.annotation.concurrent.NotThreadSafe;
import javax.inject.Inject; import javax.inject.Inject;
import javax.inject.Named; import javax.inject.Named;
import org.joda.time.Interval;
import org.joda.time.format.DateTimeFormatter; import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat; import org.joda.time.format.ISODateTimeFormat;
@ -251,6 +252,12 @@ public class StackdriverWriter implements MetricWriter {
return descriptor; return descriptor;
} }
private static TimeInterval encodeTimeInterval(Interval nativeInterval) {
return new TimeInterval()
.setEndTime(DATETIME_FORMATTER.print(nativeInterval.getEnd()))
.setStartTime(DATETIME_FORMATTER.print(nativeInterval.getStart()));
}
/** /**
* Encodes a {@link MetricPoint} into a Stackdriver {@link TimeSeries}. * Encodes a {@link MetricPoint} into a Stackdriver {@link TimeSeries}.
* *
@ -298,9 +305,7 @@ public class StackdriverWriter implements MetricWriter {
} }
Point encodedPoint = Point encodedPoint =
new Point() new Point().setInterval(encodeTimeInterval(point.interval())).setValue(encodedValue);
.setInterval(new TimeInterval().setEndTime(DATETIME_FORMATTER.print(point.timestamp())))
.setValue(encodedValue);
List<LabelDescriptor> encodedLabels = descriptor.getLabels(); List<LabelDescriptor> encodedLabels = descriptor.getLabels();
// The MetricDescriptors returned by the GCM API have null fields rather than empty lists // The MetricDescriptors returned by the GCM API have null fields rather than empty lists

View file

@ -32,7 +32,11 @@ import org.joda.time.Instant;
* <p>The values are stored and set over time. This metric is generally suitable for state * <p>The values are stored and set over time. This metric is generally suitable for state
* indicators, such as indicating that a server is in a RUNNING state or in a STOPPED state. * indicators, such as indicating that a server is in a RUNNING state or in a STOPPED state.
* *
* <p>See {@link Counter} for a subclass which is suitable for incremental values. * <p>See {@link Counter} for a subclass which is suitable for stateful incremental values.
*
* <p>The {@link MetricPoint#interval()} of values of instances of this metric will always have a
* start time equal to the end time, since the metric value represents a point-in-time snapshot with
* no relationship to prior values.
*/ */
@ThreadSafe @ThreadSafe
public class StoredMetric<V> extends AbstractMetric<V> implements SettableMetric<V> { public class StoredMetric<V> extends AbstractMetric<V> implements SettableMetric<V> {
@ -82,7 +86,8 @@ public class StoredMetric<V> extends AbstractMetric<V> implements SettableMetric
final ImmutableList<MetricPoint<V>> getTimestampedValues(Instant timestamp) { final ImmutableList<MetricPoint<V>> getTimestampedValues(Instant timestamp) {
ImmutableList.Builder<MetricPoint<V>> timestampedValues = new Builder<>(); ImmutableList.Builder<MetricPoint<V>> timestampedValues = new Builder<>();
for (Entry<ImmutableList<String>, V> entry : values.entrySet()) { for (Entry<ImmutableList<String>, V> entry : values.entrySet()) {
timestampedValues.add(MetricPoint.create(this, entry.getKey(), timestamp, entry.getValue())); timestampedValues.add(
MetricPoint.create(this, entry.getKey(), timestamp, timestamp, entry.getValue()));
} }
return timestampedValues.build(); return timestampedValues.build();

View file

@ -29,6 +29,10 @@ import org.joda.time.Instant;
* *
* <p>This pattern works well for gauge-like metrics, such as CPU usage, memory usage, and file * <p>This pattern works well for gauge-like metrics, such as CPU usage, memory usage, and file
* descriptor counts. * descriptor counts.
*
* <p>The {@link MetricPoint#interval()} of values of instances of this metric will always have a
* start time equal to the end time, since the metric value represents a point-in-time snapshot with
* no relationship to prior values.
*/ */
@ThreadSafe @ThreadSafe
public final class VirtualMetric<V> extends AbstractMetric<V> { public final class VirtualMetric<V> extends AbstractMetric<V> {
@ -77,7 +81,8 @@ public final class VirtualMetric<V> extends AbstractMetric<V> {
ImmutableList.Builder<MetricPoint<V>> metricPoints = ImmutableList.builder(); ImmutableList.Builder<MetricPoint<V>> metricPoints = ImmutableList.builder();
for (Entry<ImmutableList<String>, V> entry : values.entrySet()) { for (Entry<ImmutableList<String>, V> entry : values.entrySet()) {
metricPoints.add(MetricPoint.create(this, entry.getKey(), timestamp, entry.getValue())); metricPoints.add(
MetricPoint.create(this, entry.getKey(), timestamp, timestamp, entry.getValue()));
} }
cardinality = values.size(); cardinality = values.size();