From b45c2ca2ee6569b66e6e95f9214b9e8fab56758f Mon Sep 17 00:00:00 2001 From: shikhman Date: Tue, 16 Aug 2016 17:40:19 -0700 Subject: [PATCH] Wire in MetricReporter into the backend servlet to publish metrics ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=130468842 --- java/google/registry/config/ConfigModule.java | 48 ++++++++--- java/google/registry/module/backend/BUILD | 1 + .../module/backend/BackendComponent.java | 2 + .../module/backend/BackendServlet.java | 23 ++++++ .../monitoring/metrics/MetricExporter.java | 19 ++++- .../metrics/MetricRegistryImpl.java | 1 - .../monitoring/metrics/MetricReporter.java | 79 +++++++++++++++++-- .../whitebox/StackdriverModule.java | 48 +++++++++++ .../google/registry/module/backend/BUILD | 1 + .../module/backend/BackendServletTest.java | 6 ++ .../registry/testing/AppEngineRule.java | 11 ++- 11 files changed, 214 insertions(+), 25 deletions(-) diff --git a/java/google/registry/config/ConfigModule.java b/java/google/registry/config/ConfigModule.java index b0cc8f2ee..f938d1326 100644 --- a/java/google/registry/config/ConfigModule.java +++ b/java/google/registry/config/ConfigModule.java @@ -20,6 +20,7 @@ import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import dagger.Module; import dagger.Provides; +import google.registry.config.ConfigModule.Config; import java.lang.annotation.Documented; import java.net.URI; import java.net.URL; @@ -480,17 +481,6 @@ public final class ConfigModule { return Duration.standardSeconds(30); } - /** - * Time interval between metric writes to Stackdriver. - * - * @see google.registry.monitoring.stackdriver.MonitoringComponent - */ - @Provides - @Config("metricWriteInterval") - public static Duration provideMetricWriteInterval() { - return Duration.standardSeconds(60); - } - /** Duration after watermark where we shouldn't deposit, because transactions might be pending. */ @Provides @Config("transactionCooldown") @@ -633,4 +623,40 @@ public final class ConfigModule { + "restrict or deny your access to the Whois database, and may modify these terms\n" + "at any time.\n"; } + + /** + * Maximum QPS for the Google Cloud Monitoring V3 (aka Stackdriver) API. The QPS limit can be + * adjusted by contacting Cloud Support. + * + * @see google.registry.monitoring.metrics.StackdriverWriter + */ + @Provides + @Config("stackdriverMaxQps") + public static int provideStackdriverMaxQps() { + return 30; + } + + /** + * Maximum number of points that can be sent to Stackdriver in a single TimeSeries.Create API + * call. + * + * @see google.registry.monitoring.metrics.StackdriverWriter + */ + @Provides + @Config("stackdriverMaxPointsPerRequest") + public static int provideStackdriverMaxPointsPerRequest() { + return 200; + } + + /** + * The reporting interval, for Metrics to be sent to a {@link + * google.registry.monitoring.metrics.MetricWriter}. + * + * @see google.registry.monitoring.metrics.MetricReporter + */ + @Provides + @Config("metricsWriteInterval") + public static Duration provideMetricsWriteInterval() { + return Duration.standardSeconds(60); + } } diff --git a/java/google/registry/module/backend/BUILD b/java/google/registry/module/backend/BUILD index 2faffe7d7..370e62b89 100644 --- a/java/google/registry/module/backend/BUILD +++ b/java/google/registry/module/backend/BUILD @@ -35,6 +35,7 @@ java_library( "//java/google/registry/keyring/api", "//java/google/registry/mapreduce", "//java/google/registry/model", + "//java/google/registry/monitoring/metrics", "//java/google/registry/monitoring/whitebox", "//java/google/registry/rde", "//java/google/registry/request", diff --git a/java/google/registry/module/backend/BackendComponent.java b/java/google/registry/module/backend/BackendComponent.java index f7096bc30..ac12306e7 100644 --- a/java/google/registry/module/backend/BackendComponent.java +++ b/java/google/registry/module/backend/BackendComponent.java @@ -26,6 +26,7 @@ import google.registry.groups.GroupsModule; import google.registry.groups.GroupssettingsModule; import google.registry.keyring.api.KeyModule; import google.registry.keyring.api.VoidKeyringModule; +import google.registry.monitoring.metrics.MetricReporter; import google.registry.monitoring.whitebox.StackdriverModule; import google.registry.rde.JSchModule; import google.registry.request.Modules.AppIdentityCredentialModule; @@ -71,4 +72,5 @@ import javax.inject.Singleton; }) interface BackendComponent { BackendRequestComponent startRequest(RequestModule requestModule); + MetricReporter metricReporter(); } diff --git a/java/google/registry/module/backend/BackendServlet.java b/java/google/registry/module/backend/BackendServlet.java index 7ffc18667..5266aad2c 100644 --- a/java/google/registry/module/backend/BackendServlet.java +++ b/java/google/registry/module/backend/BackendServlet.java @@ -18,11 +18,15 @@ import static java.util.Arrays.asList; import com.google.common.base.Function; import com.google.common.collect.FluentIterable; +import google.registry.monitoring.metrics.MetricReporter; import google.registry.request.RequestHandler; import google.registry.request.RequestModule; +import google.registry.util.FormattingLogger; import java.io.IOException; import java.lang.reflect.Method; import java.security.Security; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -32,6 +36,8 @@ import org.bouncycastle.jce.provider.BouncyCastleProvider; public final class BackendServlet extends HttpServlet { private static final BackendComponent component = DaggerBackendComponent.create(); + private static final MetricReporter metricReporter = component.metricReporter(); + private static final FormattingLogger logger = FormattingLogger.getLoggerForCallerClass(); private static final RequestHandler requestHandler = RequestHandler.create(BackendRequestComponent.class, FluentIterable @@ -46,6 +52,23 @@ public final class BackendServlet extends HttpServlet { @Override public void init() { Security.addProvider(new BouncyCastleProvider()); + + try { + metricReporter.startAsync().awaitRunning(10, TimeUnit.SECONDS); + logger.info("Started up MetricReporter"); + } catch (TimeoutException timeoutException) { + logger.severefmt("Failed to initialize MetricReporter: %s", timeoutException); + } + } + + @Override + public void destroy() { + try { + metricReporter.stopAsync().awaitTerminated(10, TimeUnit.SECONDS); + logger.info("Shut down MetricReporter"); + } catch (TimeoutException timeoutException) { + logger.severefmt("Failed to stop MetricReporter: %s", timeoutException); + } } @Override diff --git a/java/google/registry/monitoring/metrics/MetricExporter.java b/java/google/registry/monitoring/metrics/MetricExporter.java index 7a73c29f0..b4fb237df 100644 --- a/java/google/registry/monitoring/metrics/MetricExporter.java +++ b/java/google/registry/monitoring/metrics/MetricExporter.java @@ -18,6 +18,10 @@ import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.AbstractExecutionThreadService; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.logging.Logger; /** * Background service to asynchronously push bundles of {@link MetricPoint} instances to a {@link @@ -25,17 +29,24 @@ import java.util.concurrent.BlockingQueue; */ class MetricExporter extends AbstractExecutionThreadService { + private static final Logger logger = Logger.getLogger(MetricReporter.class.getName()); + private final BlockingQueue>>> writeQueue; private final MetricWriter writer; + private final ThreadFactory threadFactory; MetricExporter( - BlockingQueue>>> writeQueue, MetricWriter writer) { + BlockingQueue>>> writeQueue, + MetricWriter writer, + ThreadFactory threadFactory) { this.writeQueue = writeQueue; this.writer = writer; + this.threadFactory = threadFactory; } @Override protected void run() throws Exception { + logger.info("Started up MetricExporter"); while (isRunning()) { Optional>> batch = writeQueue.take(); if (batch.isPresent()) { @@ -44,9 +55,15 @@ class MetricExporter extends AbstractExecutionThreadService { } writer.flush(); } else { + logger.info("Received a poison pill, stopping now"); // An absent optional indicates that the Reporter wants this service to shut down. return; } } } + + @Override + protected Executor executor() { + return Executors.newSingleThreadExecutor(threadFactory); + } } diff --git a/java/google/registry/monitoring/metrics/MetricRegistryImpl.java b/java/google/registry/monitoring/metrics/MetricRegistryImpl.java index c47543558..097e183ec 100644 --- a/java/google/registry/monitoring/metrics/MetricRegistryImpl.java +++ b/java/google/registry/monitoring/metrics/MetricRegistryImpl.java @@ -81,7 +81,6 @@ public final class MetricRegistryImpl implements MetricRegistry { String description, String valueDisplayName, ImmutableSet labels) { - Counter metric = new Counter(name, description, valueDisplayName, labels); registerMetric(name, metric); logger.info("Registered new counter: " + name); diff --git a/java/google/registry/monitoring/metrics/MetricReporter.java b/java/google/registry/monitoring/metrics/MetricReporter.java index bff2a5f3e..a2cd96d1b 100644 --- a/java/google/registry/monitoring/metrics/MetricReporter.java +++ b/java/google/registry/monitoring/metrics/MetricReporter.java @@ -15,6 +15,7 @@ package google.registry.monitoring.metrics; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; @@ -22,8 +23,15 @@ import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.AbstractScheduledService; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Level; import java.util.logging.Logger; +import javax.inject.Inject; +import javax.inject.Named; /** * Engine to write metrics to a {@link MetricWriter} on a regular periodic basis. @@ -38,18 +46,26 @@ public class MetricReporter extends AbstractScheduledService { private final long writeInterval; private final MetricRegistry metricRegistry; private final BlockingQueue>>> writeQueue; - private final MetricExporter metricExporter; + private MetricExporter metricExporter; + private final MetricWriter metricWriter; + private final ThreadFactory threadFactory; /** * Returns a new MetricReporter. * * @param metricWriter {@link MetricWriter} implementation to write metrics to. * @param writeInterval time period between metric writes, in seconds. + * @param threadFactory factory to use when creating background threads. */ - public MetricReporter(MetricWriter metricWriter, long writeInterval) { + @Inject + public MetricReporter( + MetricWriter metricWriter, + @Named("metricsWriteInterval") long writeInterval, + @Named("metricsBackgroundThreadFactory") ThreadFactory threadFactory) { this( metricWriter, writeInterval, + threadFactory, MetricRegistryImpl.getDefault(), new ArrayBlockingQueue>>>(1000)); } @@ -58,19 +74,27 @@ public class MetricReporter extends AbstractScheduledService { MetricReporter( MetricWriter metricWriter, long writeInterval, + ThreadFactory threadFactory, MetricRegistry metricRegistry, BlockingQueue>>> writeQueue) { checkArgument(writeInterval > 0, "writeInterval must be greater than zero"); + this.metricWriter = metricWriter; this.writeInterval = writeInterval; + this.threadFactory = threadFactory; this.metricRegistry = metricRegistry; this.writeQueue = writeQueue; - this.metricExporter = new MetricExporter(writeQueue, metricWriter); + this.metricExporter = new MetricExporter(writeQueue, metricWriter, threadFactory); } @Override protected void runOneIteration() { logger.info("Running background metric push"); + + if (metricExporter.state() == State.FAILED) { + startMetricExporter(); + } + ImmutableList.Builder> points = new ImmutableList.Builder<>(); /* @@ -80,12 +104,12 @@ public class MetricReporter extends AbstractScheduledService { for (Metric metric : metricRegistry.getRegisteredMetrics()) { points.addAll(metric.getTimestampedValues()); logger.fine(String.format("Enqueued metric %s", metric)); - MetricMetrics.pushedPoints.incrementBy( - 1, metric.getMetricSchema().kind().name(), metric.getValueClass().toString()); + MetricMetrics.pushedPoints.incrementBy(1, + metric.getMetricSchema().kind().name(), metric.getValueClass().toString()); } if (!writeQueue.offer(Optional.of(points.build()))) { - logger.warning("writeQueue full, dropped a reporting interval of points"); + logger.severe("writeQueue full, dropped a reporting interval of points"); } MetricMetrics.pushIntervals.incrementBy(1); @@ -98,12 +122,17 @@ public class MetricReporter extends AbstractScheduledService { runOneIteration(); writeQueue.offer(Optional.>>absent()); - metricExporter.stopAsync().awaitTerminated(); + try { + metricExporter.stopAsync().awaitTerminated(10, TimeUnit.SECONDS); + logger.info("Shut down MetricExporter"); + } catch (TimeoutException timeoutException) { + logger.severe("Failed to shut down MetricExporter: " + timeoutException); + } } @Override protected void startUp() { - metricExporter.startAsync().awaitRunning(); + startMetricExporter(); } @Override @@ -111,4 +140,38 @@ public class MetricReporter extends AbstractScheduledService { // Start writing after waiting for one writeInterval. return Scheduler.newFixedDelaySchedule(writeInterval, writeInterval, TimeUnit.SECONDS); } + + @Override + protected ScheduledExecutorService executor() { + final ScheduledExecutorService executor = + Executors.newSingleThreadScheduledExecutor(threadFactory); + addListener( + new Listener() { + @Override + public void terminated(State from) { + executor.shutdown(); + } + + @Override + public void failed(State from, Throwable failure) { + executor.shutdown(); + } + }, + directExecutor()); + return executor; + } + + @VisibleForTesting + void startMetricExporter() { + // Services in the FAILED state must be reconstructed, they can't be started + if (metricExporter.state() == State.FAILED) { + logger.log( + Level.SEVERE, + "MetricExporter died unexpectedly, restarting", + metricExporter.failureCause()); + this.metricExporter = new MetricExporter(writeQueue, metricWriter, threadFactory); + } + + this.metricExporter.startAsync(); + } } diff --git a/java/google/registry/monitoring/whitebox/StackdriverModule.java b/java/google/registry/monitoring/whitebox/StackdriverModule.java index cfb112b4c..179eecd9d 100644 --- a/java/google/registry/monitoring/whitebox/StackdriverModule.java +++ b/java/google/registry/monitoring/whitebox/StackdriverModule.java @@ -19,11 +19,20 @@ import com.google.api.client.http.HttpTransport; import com.google.api.client.json.JsonFactory; import com.google.api.services.monitoring.v3.Monitoring; import com.google.api.services.monitoring.v3.MonitoringScopes; +import com.google.api.services.monitoring.v3.model.MonitoredResource; +import com.google.appengine.api.ThreadManager; +import com.google.appengine.api.modules.ModulesService; +import com.google.appengine.repackaged.com.google.common.collect.ImmutableMap; import com.google.common.base.Function; import dagger.Module; import dagger.Provides; import google.registry.config.ConfigModule.Config; +import google.registry.monitoring.metrics.MetricWriter; +import google.registry.monitoring.metrics.StackdriverWriter; import java.util.Set; +import java.util.concurrent.ThreadFactory; +import javax.inject.Named; +import org.joda.time.Duration; /** Dagger module for Google Stackdriver service connection objects. */ @Module @@ -39,4 +48,43 @@ public final class StackdriverModule { .setApplicationName(projectId) .build(); } + + @Provides + static MetricWriter provideMetricWriter( + Monitoring monitoringClient, + @Config("projectId") String projectId, + ModulesService modulesService, + @Config("stackdriverMaxQps") int maxQps, + @Config("stackdriverMaxPointsPerRequest") int maxPointsPerRequest) { + // The MonitoredResource for GAE apps lacks an instance_id field, so we encode it into the + // version_id field so that metrics from different instances don't interleave. + return new StackdriverWriter( + monitoringClient, + projectId, + new MonitoredResource() + .setType("gae_app") + .setLabels( + ImmutableMap.of( + "module_id", + modulesService.getCurrentModule(), + "version_id", + modulesService.getCurrentVersion() + + ":" + + modulesService.getCurrentInstanceId())), + maxQps, + maxPointsPerRequest); + } + + @Provides + @Named("metricsBackgroundThreadFactory") + static ThreadFactory provideThreadFactory() { + return ThreadManager.backgroundThreadFactory(); + } + + @Provides + @Named("metricsWriteInterval") + static long provideMetricsWriteInterval( + @Config("metricsWriteInterval") Duration metricsWriteInterval) { + return metricsWriteInterval.getStandardSeconds(); + } } diff --git a/javatests/google/registry/module/backend/BUILD b/javatests/google/registry/module/backend/BUILD index cb7733054..183e7e709 100644 --- a/javatests/google/registry/module/backend/BUILD +++ b/javatests/google/registry/module/backend/BUILD @@ -20,6 +20,7 @@ java_library( "//third_party/java/servlet/servlet_api", "//third_party/java/truth", "//java/google/registry/module/backend", + "//javatests/google/registry/testing", ], ) diff --git a/javatests/google/registry/module/backend/BackendServletTest.java b/javatests/google/registry/module/backend/BackendServletTest.java index 1db7f4db5..f53c018b2 100644 --- a/javatests/google/registry/module/backend/BackendServletTest.java +++ b/javatests/google/registry/module/backend/BackendServletTest.java @@ -18,8 +18,10 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import google.registry.testing.AppEngineRule; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -28,6 +30,10 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class BackendServletTest { + @Rule + public final AppEngineRule appEngine = + AppEngineRule.builder().withDatastore().withLocalModules().build(); + private final HttpServletRequest req = mock(HttpServletRequest.class); private final HttpServletResponse rsp = mock(HttpServletResponse.class); diff --git a/javatests/google/registry/testing/AppEngineRule.java b/javatests/google/registry/testing/AppEngineRule.java index cec30494d..ec95cfcf2 100644 --- a/javatests/google/registry/testing/AppEngineRule.java +++ b/javatests/google/registry/testing/AppEngineRule.java @@ -269,10 +269,9 @@ public final class AppEngineRule extends ExternalResource { } if (withLocalModules) { configs.add(new LocalModulesServiceTestConfig() - .addDefaultModuleVersion() - .addAutomaticScalingModuleVersion("default", "v1") - .addAutomaticScalingModuleVersion("tools", "v1") - .addAutomaticScalingModuleVersion("backend", "v1")); + .addBasicScalingModuleVersion("default", "1", 1) + .addBasicScalingModuleVersion("tools", "1", 1) + .addBasicScalingModuleVersion("backend", "1", 1)); } if (withTaskQueue) { File queueFile = temporaryFolder.newFile("queue.xml"); @@ -307,6 +306,10 @@ public final class AppEngineRule extends ExternalResource { }); } + if (withLocalModules) { + helper.setEnvInstance("0"); + } + helper.setUp(); if (withDatastore) {