diff --git a/java/google/registry/monitoring/metrics/BUILD b/java/google/registry/monitoring/metrics/BUILD index 494fba08d..4abbbe9ed 100644 --- a/java/google/registry/monitoring/metrics/BUILD +++ b/java/google/registry/monitoring/metrics/BUILD @@ -10,7 +10,7 @@ java_library( srcs = glob(["*.java"]), deps = [ "//google/monitoring:monitoring_java_lib", - "//java/com/google/api/client/json", + "//java/com/google/api/client/googleapis/json", "//java/com/google/common/annotations", "//java/com/google/common/base", "//java/com/google/common/collect", diff --git a/java/google/registry/monitoring/metrics/StackdriverWriter.java b/java/google/registry/monitoring/metrics/StackdriverWriter.java index 8ec0575b9..bd2264009 100644 --- a/java/google/registry/monitoring/metrics/StackdriverWriter.java +++ b/java/google/registry/monitoring/metrics/StackdriverWriter.java @@ -18,6 +18,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import com.google.api.client.googleapis.json.GoogleJsonResponseException; import com.google.api.services.monitoring.v3.Monitoring; import com.google.api.services.monitoring.v3.model.CreateTimeSeriesRequest; import com.google.api.services.monitoring.v3.model.LabelDescriptor; @@ -54,6 +55,8 @@ import org.joda.time.format.ISODateTimeFormat; *

This class communicates with the API via HTTP. In order to increase throughput and minimize * CPU, it buffers points to be written until it has {@code maxPointsPerRequest} points buffered or * until {@link #flush()} is called. + * + * @see https://cloud.google.com/monitoring/api/v3/ */ // TODO(shikhman): add retry logic @NotThreadSafe @@ -168,6 +171,98 @@ public class StackdriverWriter implements MetricWriter { public void write(google.registry.monitoring.metrics.MetricPoint point) throws IOException { checkNotNull(point); + + TimeSeries timeSeries = getEncodedTimeSeries(point); + timeSeriesBuffer.add(timeSeries); + + logger.fine(String.format("Enqueued metric %s for writing", timeSeries.getMetric().getType())); + if (timeSeriesBuffer.size() == maxPointsPerRequest) { + flush(); + } + } + + /** Flushes all buffered metric points to Stackdriver. This call is blocking. */ + @Override + public void flush() throws IOException { + checkState(timeSeriesBuffer.size() <= 200, FLUSH_OVERFLOW_ERROR); + + ImmutableList timeSeriesList = ImmutableList.copyOf(timeSeriesBuffer); + timeSeriesBuffer.clear(); + + CreateTimeSeriesRequest request = new CreateTimeSeriesRequest().setTimeSeries(timeSeriesList); + + rateLimiter.acquire(); + monitoringClient.projects().timeSeries().create(projectResource, request).execute(); + + for (TimeSeries timeSeries : timeSeriesList) { + pushedPoints.incrementBy(1, timeSeries.getMetricKind(), timeSeries.getValueType()); + } + logger.info(String.format("Flushed %d metrics to Stackdriver", timeSeriesList.size())); + } + + /** + * Registers a metric's {@link MetricDescriptor} with the Monitoring API. + * + * @param metric the metric to be registered. + * @see https://cloud.google.com/monitoring/api/ref_v3/rest/v3/projects.metricDescriptors + */ + @VisibleForTesting + MetricDescriptor registerMetric(final google.registry.monitoring.metrics.Metric metric) + throws IOException { + if (registeredDescriptors.containsKey(metric)) { + logger.fine( + String.format("Using existing metric descriptor %s", metric.getMetricSchema().name())); + return registeredDescriptors.get(metric); + } + + MetricDescriptor descriptor = createMetricDescriptor(metric); + + rateLimiter.acquire(); + // We try to create a descriptor, but it may have been created already, so we re-fetch it on + // failure + try { + descriptor = + monitoringClient + .projects() + .metricDescriptors() + .create(projectResource, descriptor) + .execute(); + logger.info(String.format("Registered new metric descriptor %s", descriptor.getType())); + } catch (GoogleJsonResponseException jsonException) { + // Not the error we were expecting, just give up + if (!jsonException.getStatusMessage().equals("ALREADY_EXISTS")) { + throw jsonException; + } + + descriptor = + monitoringClient + .projects() + .metricDescriptors() + .get(projectResource + "/metricDescriptors/" + descriptor.getType()) + .execute(); + + logger.info( + String.format("Fetched existing metric descriptor %s", metric.getMetricSchema().name())); + } + + registeredDescriptors.put(metric, descriptor); + + return descriptor; + } + + /** + * Encodes a {@link MetricPoint} into a Stackdriver {@link TimeSeries}. + * + *

This method will register the underlying {@link google.registry.monitoring.metrics.Metric} + * as a Stackdriver {@link MetricDescriptor}. + * + * @see https://cloud.google.com/monitoring/api/ref_v3/rest/v3/TimeSeries + */ + @VisibleForTesting + TimeSeries getEncodedTimeSeries(google.registry.monitoring.metrics.MetricPoint point) + throws IOException { google.registry.monitoring.metrics.Metric metric = point.metric(); try { checkArgument( @@ -208,6 +303,9 @@ public class StackdriverWriter implements MetricWriter { .setValue(encodedValue); List encodedLabels = descriptor.getLabels(); + // The MetricDescriptors returned by the GCM API have null fields rather than empty lists + encodedLabels = encodedLabels == null ? ImmutableList.of() : encodedLabels; + ImmutableMap.Builder labelValues = new ImmutableMap.Builder<>(); int i = 0; for (LabelDescriptor labelDescriptor : encodedLabels) { @@ -217,71 +315,13 @@ public class StackdriverWriter implements MetricWriter { Metric encodedMetric = new Metric().setType(descriptor.getType()).setLabels(labelValues.build()); - timeSeriesBuffer.add( - new TimeSeries() - .setMetric(encodedMetric) - .setPoints(ImmutableList.of(encodedPoint)) - .setResource(monitoredResource) - // these two attributes are ignored by the API, we set them here to use elsewhere - // for internal metrics. - .setMetricKind(descriptor.getMetricKind()) - .setValueType(descriptor.getValueType())); - - logger.fine(String.format("Enqueued metric %s for writing", descriptor.getType())); - if (timeSeriesBuffer.size() == maxPointsPerRequest) { - flush(); - } - } - - /** Flushes all buffered metric points to Stackdriver. This call is blocking. */ - @Override - public void flush() throws IOException { - checkState(timeSeriesBuffer.size() <= 200, FLUSH_OVERFLOW_ERROR); - - ImmutableList timeSeriesList = ImmutableList.copyOf(timeSeriesBuffer); - timeSeriesBuffer.clear(); - - CreateTimeSeriesRequest request = new CreateTimeSeriesRequest().setTimeSeries(timeSeriesList); - - rateLimiter.acquire(); - monitoringClient.projects().timeSeries().create(projectResource, request).execute(); - - for (TimeSeries timeSeries : timeSeriesList) { - pushedPoints.incrementBy(1, timeSeries.getMetricKind(), timeSeries.getValueType()); - } - logger.info(String.format("Flushed %d metrics to Stackdriver", timeSeriesList.size())); - } - - /** - * Registers a metric's {@link MetricDescriptor} with the Monitoring API. - * - * @param metric the metric to be registered. - */ - @VisibleForTesting - MetricDescriptor registerMetric(final google.registry.monitoring.metrics.Metric metric) { - if (registeredDescriptors.containsKey(metric)) { - logger.info( - String.format("Fetched existing metric descriptor %s", metric.getMetricSchema().name())); - return registeredDescriptors.get(metric); - } - - MetricDescriptor descriptor = createMetricDescriptor(metric); - - try { - rateLimiter.acquire(); - descriptor = - monitoringClient - .projects() - .metricDescriptors() - .create(projectResource, descriptor) - .execute(); - } catch (IOException e) { - throw new RuntimeException("Error creating a MetricDescriptor"); - } - - logger.info(String.format("Registered new metric descriptor %s", descriptor.getType())); - registeredDescriptors.put(metric, descriptor); - - return descriptor; + return new TimeSeries() + .setMetric(encodedMetric) + .setPoints(ImmutableList.of(encodedPoint)) + .setResource(monitoredResource) + // these two attributes are ignored by the API, we set them here to use elsewhere + // for internal metrics. + .setMetricKind(descriptor.getMetricKind()) + .setValueType(descriptor.getValueType()); } }