Improve StackdriverWriter code

Add additional logic to handle cases when a MetricDescriptor is already defined
on the server, fix an NPE in related code, and add additional tests to ensure
that TimeSeries created from native MetricPoints are well-formed.

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=130555708
This commit is contained in:
shikhman 2016-08-17 13:24:18 -07:00 committed by Ben McIlwain
parent 9e810c4b36
commit 7a041d066f
2 changed files with 107 additions and 67 deletions

View file

@ -10,7 +10,7 @@ java_library(
srcs = glob(["*.java"]), srcs = glob(["*.java"]),
deps = [ deps = [
"//google/monitoring:monitoring_java_lib", "//google/monitoring:monitoring_java_lib",
"//java/com/google/api/client/json", "//java/com/google/api/client/googleapis/json",
"//java/com/google/common/annotations", "//java/com/google/common/annotations",
"//java/com/google/common/base", "//java/com/google/common/base",
"//java/com/google/common/collect", "//java/com/google/common/collect",

View file

@ -18,6 +18,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Preconditions.checkState;
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.services.monitoring.v3.Monitoring; import com.google.api.services.monitoring.v3.Monitoring;
import com.google.api.services.monitoring.v3.model.CreateTimeSeriesRequest; import com.google.api.services.monitoring.v3.model.CreateTimeSeriesRequest;
import com.google.api.services.monitoring.v3.model.LabelDescriptor; import com.google.api.services.monitoring.v3.model.LabelDescriptor;
@ -54,6 +55,8 @@ import org.joda.time.format.ISODateTimeFormat;
* <p>This class communicates with the API via HTTP. In order to increase throughput and minimize * <p>This class communicates with the API via HTTP. In order to increase throughput and minimize
* CPU, it buffers points to be written until it has {@code maxPointsPerRequest} points buffered or * CPU, it buffers points to be written until it has {@code maxPointsPerRequest} points buffered or
* until {@link #flush()} is called. * until {@link #flush()} is called.
*
* @see <a href="Stackdriver API Introduction">https://cloud.google.com/monitoring/api/v3/</a>
*/ */
// TODO(shikhman): add retry logic // TODO(shikhman): add retry logic
@NotThreadSafe @NotThreadSafe
@ -168,6 +171,98 @@ public class StackdriverWriter implements MetricWriter {
public <V> void write(google.registry.monitoring.metrics.MetricPoint<V> point) public <V> void write(google.registry.monitoring.metrics.MetricPoint<V> point)
throws IOException { throws IOException {
checkNotNull(point); checkNotNull(point);
TimeSeries timeSeries = getEncodedTimeSeries(point);
timeSeriesBuffer.add(timeSeries);
logger.fine(String.format("Enqueued metric %s for writing", timeSeries.getMetric().getType()));
if (timeSeriesBuffer.size() == maxPointsPerRequest) {
flush();
}
}
/** Flushes all buffered metric points to Stackdriver. This call is blocking. */
@Override
public void flush() throws IOException {
checkState(timeSeriesBuffer.size() <= 200, FLUSH_OVERFLOW_ERROR);
ImmutableList<TimeSeries> timeSeriesList = ImmutableList.copyOf(timeSeriesBuffer);
timeSeriesBuffer.clear();
CreateTimeSeriesRequest request = new CreateTimeSeriesRequest().setTimeSeries(timeSeriesList);
rateLimiter.acquire();
monitoringClient.projects().timeSeries().create(projectResource, request).execute();
for (TimeSeries timeSeries : timeSeriesList) {
pushedPoints.incrementBy(1, timeSeries.getMetricKind(), timeSeries.getValueType());
}
logger.info(String.format("Flushed %d metrics to Stackdriver", timeSeriesList.size()));
}
/**
* Registers a metric's {@link MetricDescriptor} with the Monitoring API.
*
* @param metric the metric to be registered.
* @see <a href="Stackdriver MetricDescriptor
* API">https://cloud.google.com/monitoring/api/ref_v3/rest/v3/projects.metricDescriptors</a>
*/
@VisibleForTesting
MetricDescriptor registerMetric(final google.registry.monitoring.metrics.Metric<?> metric)
throws IOException {
if (registeredDescriptors.containsKey(metric)) {
logger.fine(
String.format("Using existing metric descriptor %s", metric.getMetricSchema().name()));
return registeredDescriptors.get(metric);
}
MetricDescriptor descriptor = createMetricDescriptor(metric);
rateLimiter.acquire();
// We try to create a descriptor, but it may have been created already, so we re-fetch it on
// failure
try {
descriptor =
monitoringClient
.projects()
.metricDescriptors()
.create(projectResource, descriptor)
.execute();
logger.info(String.format("Registered new metric descriptor %s", descriptor.getType()));
} catch (GoogleJsonResponseException jsonException) {
// Not the error we were expecting, just give up
if (!jsonException.getStatusMessage().equals("ALREADY_EXISTS")) {
throw jsonException;
}
descriptor =
monitoringClient
.projects()
.metricDescriptors()
.get(projectResource + "/metricDescriptors/" + descriptor.getType())
.execute();
logger.info(
String.format("Fetched existing metric descriptor %s", metric.getMetricSchema().name()));
}
registeredDescriptors.put(metric, descriptor);
return descriptor;
}
/**
* Encodes a {@link MetricPoint} into a Stackdriver {@link TimeSeries}.
*
* <p>This method will register the underlying {@link google.registry.monitoring.metrics.Metric}
* as a Stackdriver {@link MetricDescriptor}.
*
* @see <a href="Stackdriver TimeSeries
* API">https://cloud.google.com/monitoring/api/ref_v3/rest/v3/TimeSeries</a>
*/
@VisibleForTesting
<V> TimeSeries getEncodedTimeSeries(google.registry.monitoring.metrics.MetricPoint<V> point)
throws IOException {
google.registry.monitoring.metrics.Metric<V> metric = point.metric(); google.registry.monitoring.metrics.Metric<V> metric = point.metric();
try { try {
checkArgument( checkArgument(
@ -208,6 +303,9 @@ public class StackdriverWriter implements MetricWriter {
.setValue(encodedValue); .setValue(encodedValue);
List<LabelDescriptor> encodedLabels = descriptor.getLabels(); List<LabelDescriptor> encodedLabels = descriptor.getLabels();
// The MetricDescriptors returned by the GCM API have null fields rather than empty lists
encodedLabels = encodedLabels == null ? ImmutableList.<LabelDescriptor>of() : encodedLabels;
ImmutableMap.Builder<String, String> labelValues = new ImmutableMap.Builder<>(); ImmutableMap.Builder<String, String> labelValues = new ImmutableMap.Builder<>();
int i = 0; int i = 0;
for (LabelDescriptor labelDescriptor : encodedLabels) { for (LabelDescriptor labelDescriptor : encodedLabels) {
@ -217,71 +315,13 @@ public class StackdriverWriter implements MetricWriter {
Metric encodedMetric = Metric encodedMetric =
new Metric().setType(descriptor.getType()).setLabels(labelValues.build()); new Metric().setType(descriptor.getType()).setLabels(labelValues.build());
timeSeriesBuffer.add( return new TimeSeries()
new TimeSeries()
.setMetric(encodedMetric) .setMetric(encodedMetric)
.setPoints(ImmutableList.of(encodedPoint)) .setPoints(ImmutableList.of(encodedPoint))
.setResource(monitoredResource) .setResource(monitoredResource)
// these two attributes are ignored by the API, we set them here to use elsewhere // these two attributes are ignored by the API, we set them here to use elsewhere
// for internal metrics. // for internal metrics.
.setMetricKind(descriptor.getMetricKind()) .setMetricKind(descriptor.getMetricKind())
.setValueType(descriptor.getValueType())); .setValueType(descriptor.getValueType());
logger.fine(String.format("Enqueued metric %s for writing", descriptor.getType()));
if (timeSeriesBuffer.size() == maxPointsPerRequest) {
flush();
}
}
/** Flushes all buffered metric points to Stackdriver. This call is blocking. */
@Override
public void flush() throws IOException {
checkState(timeSeriesBuffer.size() <= 200, FLUSH_OVERFLOW_ERROR);
ImmutableList<TimeSeries> timeSeriesList = ImmutableList.copyOf(timeSeriesBuffer);
timeSeriesBuffer.clear();
CreateTimeSeriesRequest request = new CreateTimeSeriesRequest().setTimeSeries(timeSeriesList);
rateLimiter.acquire();
monitoringClient.projects().timeSeries().create(projectResource, request).execute();
for (TimeSeries timeSeries : timeSeriesList) {
pushedPoints.incrementBy(1, timeSeries.getMetricKind(), timeSeries.getValueType());
}
logger.info(String.format("Flushed %d metrics to Stackdriver", timeSeriesList.size()));
}
/**
* Registers a metric's {@link MetricDescriptor} with the Monitoring API.
*
* @param metric the metric to be registered.
*/
@VisibleForTesting
MetricDescriptor registerMetric(final google.registry.monitoring.metrics.Metric<?> metric) {
if (registeredDescriptors.containsKey(metric)) {
logger.info(
String.format("Fetched existing metric descriptor %s", metric.getMetricSchema().name()));
return registeredDescriptors.get(metric);
}
MetricDescriptor descriptor = createMetricDescriptor(metric);
try {
rateLimiter.acquire();
descriptor =
monitoringClient
.projects()
.metricDescriptors()
.create(projectResource, descriptor)
.execute();
} catch (IOException e) {
throw new RuntimeException("Error creating a MetricDescriptor");
}
logger.info(String.format("Registered new metric descriptor %s", descriptor.getType()));
registeredDescriptors.put(metric, descriptor);
return descriptor;
} }
} }