diff --git a/java/google/registry/monitoring/metrics/MetricExporter.java b/java/google/registry/monitoring/metrics/MetricExporter.java index b4fb237df..c81fd43fb 100644 --- a/java/google/registry/monitoring/metrics/MetricExporter.java +++ b/java/google/registry/monitoring/metrics/MetricExporter.java @@ -14,13 +14,18 @@ package google.registry.monitoring.metrics; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; + import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.AbstractExecutionThreadService; +import java.io.IOException; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; +import java.util.logging.Level; import java.util.logging.Logger; /** @@ -49,11 +54,18 @@ class MetricExporter extends AbstractExecutionThreadService { logger.info("Started up MetricExporter"); while (isRunning()) { Optional>> batch = writeQueue.take(); + logger.info("Got a batch of points from the writeQueue"); if (batch.isPresent()) { - for (MetricPoint point : batch.get()) { - writer.write(point); + logger.info("Batch contains data, writing to MetricWriter"); + try { + for (MetricPoint point : batch.get()) { + writer.write(point); + } + writer.flush(); + } catch (IOException exception) { + logger.log( + Level.SEVERE, "Threw an exception while writing or flushing metrics", exception); } - writer.flush(); } else { logger.info("Received a poison pill, stopping now"); // An absent optional indicates that the Reporter wants this service to shut down. @@ -64,6 +76,21 @@ class MetricExporter extends AbstractExecutionThreadService { @Override protected Executor executor() { - return Executors.newSingleThreadExecutor(threadFactory); + final ExecutorService executor = Executors.newSingleThreadExecutor(threadFactory); + // Make sure the ExecutorService terminates when this service does. + addListener( + new Listener() { + @Override + public void terminated(State from) { + executor.shutdown(); + } + + @Override + public void failed(State from, Throwable failure) { + executor.shutdown(); + } + }, + directExecutor()); + return executor; } } diff --git a/java/google/registry/monitoring/metrics/MetricReporter.java b/java/google/registry/monitoring/metrics/MetricReporter.java index 1b197519f..741cf164c 100644 --- a/java/google/registry/monitoring/metrics/MetricReporter.java +++ b/java/google/registry/monitoring/metrics/MetricReporter.java @@ -121,12 +121,18 @@ public class MetricReporter extends AbstractScheduledService { // least once. runOneIteration(); + // Offer a poision pill to inform the exporter to stop. writeQueue.offer(Optional.>>absent()); try { - metricExporter.stopAsync().awaitTerminated(10, TimeUnit.SECONDS); + metricExporter.awaitTerminated(10, TimeUnit.SECONDS); logger.info("Shut down MetricExporter"); - } catch (TimeoutException timeoutException) { - logger.severe("Failed to shut down MetricExporter: " + timeoutException); + } catch (IllegalStateException exception) { + logger.log( + Level.SEVERE, + "Failed to shut down MetricExporter because it was FAILED", + metricExporter.failureCause()); + } catch (TimeoutException exception) { + logger.log(Level.SEVERE, "Failed to shut down MetricExporter within the timeout", exception); } } @@ -145,6 +151,7 @@ public class MetricReporter extends AbstractScheduledService { protected ScheduledExecutorService executor() { final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(threadFactory); + // Make sure the ExecutorService terminates when this service does. addListener( new Listener() { @Override @@ -162,15 +169,21 @@ public class MetricReporter extends AbstractScheduledService { } private void startMetricExporter() { - // Services in the FAILED state must be reconstructed, they can't be started - if (metricExporter.state() == State.FAILED) { - logger.log( - Level.SEVERE, - "MetricExporter died unexpectedly, restarting", - metricExporter.failureCause()); - this.metricExporter = new MetricExporter(writeQueue, metricWriter, threadFactory); + switch (metricExporter.state()) { + case NEW: + metricExporter.startAsync(); + break; + case FAILED: + logger.log( + Level.SEVERE, + "MetricExporter died unexpectedly, restarting", + metricExporter.failureCause()); + this.metricExporter = new MetricExporter(writeQueue, metricWriter, threadFactory); + this.metricExporter.startAsync(); + break; + default: + throw new IllegalStateException( + "MetricExporter not FAILED or NEW, should not be calling startMetricExporter"); } - - this.metricExporter.startAsync(); } }