From 4965478cceecb6667c2a71a85d8acbc8bf8a73de Mon Sep 17 00:00:00 2001 From: jianglai Date: Tue, 14 Aug 2018 16:42:46 -0700 Subject: [PATCH] Correctly retry relay of reference counted objects It turns out in the edge case where a write occurs at the same moment that the relay connection is terminated, the current retry mechanism is not sufficient because it stores reference coutned objects whose internal buffers are already freed. ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=208738065 --- .../registry/proxy/handler/RelayHandler.java | 40 +++++++++++++------ 1 file changed, 28 insertions(+), 12 deletions(-) diff --git a/java/google/registry/proxy/handler/RelayHandler.java b/java/google/registry/proxy/handler/RelayHandler.java index dc4e7862c..8dab948a7 100644 --- a/java/google/registry/proxy/handler/RelayHandler.java +++ b/java/google/registry/proxy/handler/RelayHandler.java @@ -26,6 +26,8 @@ import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.util.Attribute; import io.netty.util.AttributeKey; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.ReferenceCounted; import java.util.Deque; import java.util.Queue; import javax.inject.Inject; @@ -84,6 +86,15 @@ public class RelayHandler extends SimpleChannelInboundHandler { public static void writeToRelayChannel( Channel channel, Channel relayChannel, Object msg, boolean retry) { + // If the message is reference counted, its internal buffer that holds the data will be freed by + // Netty when the reference count reduce to zero. When this message is written to the relay + // channel, regardless of whether it is successful or not, its reference count will be reduced + // to zero and its buffer will be freed. After the buffer is freed, the message cannot be used + // anymore, even if in Java's eye the object still exist, its content is gone. We increment a + // count here so that the message can be retried, in case the relay is not successful. + if (msg instanceof ReferenceCounted) { + ((ReferenceCounted) msg).retain(); + } ChannelFuture unusedFuture = relayChannel .writeAndFlush(msg) @@ -106,23 +117,28 @@ public class RelayHandler extends SimpleChannelInboundHandler { // as long as the frontend channel is open. Otherwise, we are relaying from the // backend to the frontend, and this relay failure cannot be recovered from: we // should just kill the relay (frontend) channel, which in turn will kill the - // backend channel. It is fine to just save the message object itself, not a - // clone of it, because if the relay is not successful, its content is not read, - // therefore its buffer is not cleared. + // backend channel. Queue relayBuffer = channel.attr(RELAY_BUFFER_KEY).get(); if (relayBuffer != null) { channel.attr(RELAY_BUFFER_KEY).get().add(msg); } ChannelFuture unusedFuture2 = relayChannel.close(); - } else if (retry) { - // TODO (jianglai): do not log the message once retry behavior is confirmed. - logger.atInfo().log( - "Relay retry succeeded: %s --> %s\nINBOUND: %s\nOUTBOUND: %s\nsMESSAGE: %s", - channel.attr(PROTOCOL_KEY).get().name(), - relayChannel.attr(PROTOCOL_KEY).get().name(), - channel, - relayChannel, - msg); + } else { + if (retry) { + // TODO (jianglai): do not log the message once retry behavior is confirmed. + logger.atInfo().log( + "Relay retry succeeded: %s --> %s\nINBOUND: %s\nOUTBOUND: %s\n" + + "MESSAGE: %s", + channel.attr(PROTOCOL_KEY).get().name(), + relayChannel.attr(PROTOCOL_KEY).get().name(), + channel, + relayChannel, + msg); + } + // If the write is successful, we know that no retry is needed. This function + // will decrement the reference count if the message is reference counted, + // allowing Netty to free the message's buffer. + ReferenceCountUtil.release(msg); } }); }