From c5c0051f5ef80955d5eb4f9c15f61a2b04c7d9a1 Mon Sep 17 00:00:00 2001 From: jianglai Date: Tue, 14 Aug 2018 19:32:53 -0700 Subject: [PATCH] Ensure that no reference counted objects leak memory The objects stored in the relay buffer may leak memory when they are no longer used. Alway remember to release their reference count in all cases. Also save the relay channel and its name in BackendMetricsHandler when the handler is registered. This is because when retrying a relay, the write is sent as soon as the channel is connected, and the channelActive function is not called yet. ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=208757730 --- java/google/registry/proxy/ProxyServer.java | 15 +++++++++++++++ .../proxy/handler/BackendMetricsHandler.java | 4 ++-- .../registry/proxy/handler/RelayHandler.java | 4 ++++ 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/java/google/registry/proxy/ProxyServer.java b/java/google/registry/proxy/ProxyServer.java index 417255b77..47b5d99ab 100644 --- a/java/google/registry/proxy/ProxyServer.java +++ b/java/google/registry/proxy/ProxyServer.java @@ -39,6 +39,7 @@ import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.Future; import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.JdkLoggerFactory; @@ -137,6 +138,20 @@ public class ProxyServer implements Runnable { if (outboundChannel != null) { ChannelFuture unusedChannelFuture2 = outboundChannel.close(); } + // If the frontend channel is closed and there are messages remaining in the + // buffer, we should make sure that they are released (if the messages are + // reference counted). + inboundChannel + .attr(RELAY_BUFFER_KEY) + .get() + .forEach( + msg -> { + // TODO (jianglai): do not log the message once retry behavior is + // confirmed. + logger.atWarning().log( + "Unfinished relay for connection %s: %s", inboundChannel, msg); + ReferenceCountUtil.release(msg); + }); }); } } diff --git a/java/google/registry/proxy/handler/BackendMetricsHandler.java b/java/google/registry/proxy/handler/BackendMetricsHandler.java index 640118aad..cb65203df 100644 --- a/java/google/registry/proxy/handler/BackendMetricsHandler.java +++ b/java/google/registry/proxy/handler/BackendMetricsHandler.java @@ -78,12 +78,12 @@ public class BackendMetricsHandler extends ChannelDuplexHandler { } @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { + public void channelRegistered(ChannelHandlerContext ctx) throws Exception { // Backend channel is always established after a frontend channel is connected, so this relayedChannel = ctx.channel().attr(RELAY_CHANNEL_KEY).get(); checkNotNull(relayedChannel, "No frontend channel found."); relayedProtocolName = relayedChannel.attr(PROTOCOL_KEY).get().name(); - super.channelActive(ctx); + super.channelRegistered(ctx); } @Override diff --git a/java/google/registry/proxy/handler/RelayHandler.java b/java/google/registry/proxy/handler/RelayHandler.java index 8dab948a7..e71d954e8 100644 --- a/java/google/registry/proxy/handler/RelayHandler.java +++ b/java/google/registry/proxy/handler/RelayHandler.java @@ -121,6 +121,10 @@ public class RelayHandler extends SimpleChannelInboundHandler { Queue relayBuffer = channel.attr(RELAY_BUFFER_KEY).get(); if (relayBuffer != null) { channel.attr(RELAY_BUFFER_KEY).get().add(msg); + } else { + // We are not going to retry, decrement a counter to allow the message to be + // freed by Netty, if the message is reference counted. + ReferenceCountUtil.release(msg); } ChannelFuture unusedFuture2 = relayChannel.close(); } else {