Improve service handling and add unit tests for threading behavior in MetricExporter

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=130790170
This commit is contained in:
shikhman 2016-08-19 14:13:28 -07:00 committed by Ben McIlwain
parent 6915e35800
commit b6eaba08eb
2 changed files with 56 additions and 16 deletions

View file

@ -14,13 +14,18 @@
package google.registry.monitoring.metrics;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
@ -49,11 +54,18 @@ class MetricExporter extends AbstractExecutionThreadService {
logger.info("Started up MetricExporter");
while (isRunning()) {
Optional<ImmutableList<MetricPoint<?>>> batch = writeQueue.take();
logger.info("Got a batch of points from the writeQueue");
if (batch.isPresent()) {
logger.info("Batch contains data, writing to MetricWriter");
try {
for (MetricPoint<?> point : batch.get()) {
writer.write(point);
}
writer.flush();
} catch (IOException exception) {
logger.log(
Level.SEVERE, "Threw an exception while writing or flushing metrics", exception);
}
} else {
logger.info("Received a poison pill, stopping now");
// An absent optional indicates that the Reporter wants this service to shut down.
@ -64,6 +76,21 @@ class MetricExporter extends AbstractExecutionThreadService {
@Override
protected Executor executor() {
return Executors.newSingleThreadExecutor(threadFactory);
final ExecutorService executor = Executors.newSingleThreadExecutor(threadFactory);
// Make sure the ExecutorService terminates when this service does.
addListener(
new Listener() {
@Override
public void terminated(State from) {
executor.shutdown();
}
@Override
public void failed(State from, Throwable failure) {
executor.shutdown();
}
},
directExecutor());
return executor;
}
}

View file

@ -121,12 +121,18 @@ public class MetricReporter extends AbstractScheduledService {
// least once.
runOneIteration();
// Offer a poision pill to inform the exporter to stop.
writeQueue.offer(Optional.<ImmutableList<MetricPoint<?>>>absent());
try {
metricExporter.stopAsync().awaitTerminated(10, TimeUnit.SECONDS);
metricExporter.awaitTerminated(10, TimeUnit.SECONDS);
logger.info("Shut down MetricExporter");
} catch (TimeoutException timeoutException) {
logger.severe("Failed to shut down MetricExporter: " + timeoutException);
} catch (IllegalStateException exception) {
logger.log(
Level.SEVERE,
"Failed to shut down MetricExporter because it was FAILED",
metricExporter.failureCause());
} catch (TimeoutException exception) {
logger.log(Level.SEVERE, "Failed to shut down MetricExporter within the timeout", exception);
}
}
@ -145,6 +151,7 @@ public class MetricReporter extends AbstractScheduledService {
protected ScheduledExecutorService executor() {
final ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor(threadFactory);
// Make sure the ExecutorService terminates when this service does.
addListener(
new Listener() {
@Override
@ -162,15 +169,21 @@ public class MetricReporter extends AbstractScheduledService {
}
private void startMetricExporter() {
// Services in the FAILED state must be reconstructed, they can't be started
if (metricExporter.state() == State.FAILED) {
switch (metricExporter.state()) {
case NEW:
metricExporter.startAsync();
break;
case FAILED:
logger.log(
Level.SEVERE,
"MetricExporter died unexpectedly, restarting",
metricExporter.failureCause());
this.metricExporter = new MetricExporter(writeQueue, metricWriter, threadFactory);
}
this.metricExporter.startAsync();
break;
default:
throw new IllegalStateException(
"MetricExporter not FAILED or NEW, should not be calling startMetricExporter");
}
}
}