diff --git a/java/google/registry/proxy/ProxyServer.java b/java/google/registry/proxy/ProxyServer.java index 417255b77..47b5d99ab 100644 --- a/java/google/registry/proxy/ProxyServer.java +++ b/java/google/registry/proxy/ProxyServer.java @@ -39,6 +39,7 @@ import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.Future; import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.JdkLoggerFactory; @@ -137,6 +138,20 @@ public class ProxyServer implements Runnable { if (outboundChannel != null) { ChannelFuture unusedChannelFuture2 = outboundChannel.close(); } + // If the frontend channel is closed and there are messages remaining in the + // buffer, we should make sure that they are released (if the messages are + // reference counted). + inboundChannel + .attr(RELAY_BUFFER_KEY) + .get() + .forEach( + msg -> { + // TODO (jianglai): do not log the message once retry behavior is + // confirmed. + logger.atWarning().log( + "Unfinished relay for connection %s: %s", inboundChannel, msg); + ReferenceCountUtil.release(msg); + }); }); } } diff --git a/java/google/registry/proxy/handler/BackendMetricsHandler.java b/java/google/registry/proxy/handler/BackendMetricsHandler.java index 640118aad..cb65203df 100644 --- a/java/google/registry/proxy/handler/BackendMetricsHandler.java +++ b/java/google/registry/proxy/handler/BackendMetricsHandler.java @@ -78,12 +78,12 @@ public class BackendMetricsHandler extends ChannelDuplexHandler { } @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { + public void channelRegistered(ChannelHandlerContext ctx) throws Exception { // Backend channel is always established after a frontend channel is connected, so this relayedChannel = ctx.channel().attr(RELAY_CHANNEL_KEY).get(); checkNotNull(relayedChannel, "No frontend channel found."); relayedProtocolName = relayedChannel.attr(PROTOCOL_KEY).get().name(); - super.channelActive(ctx); + super.channelRegistered(ctx); } @Override diff --git a/java/google/registry/proxy/handler/RelayHandler.java b/java/google/registry/proxy/handler/RelayHandler.java index 8dab948a7..e71d954e8 100644 --- a/java/google/registry/proxy/handler/RelayHandler.java +++ b/java/google/registry/proxy/handler/RelayHandler.java @@ -121,6 +121,10 @@ public class RelayHandler extends SimpleChannelInboundHandler { Queue relayBuffer = channel.attr(RELAY_BUFFER_KEY).get(); if (relayBuffer != null) { channel.attr(RELAY_BUFFER_KEY).get().add(msg); + } else { + // We are not going to retry, decrement a counter to allow the message to be + // freed by Netty, if the message is reference counted. + ReferenceCountUtil.release(msg); } ChannelFuture unusedFuture2 = relayChannel.close(); } else {