diff --git a/java/google/registry/proxy/handler/RelayHandler.java b/java/google/registry/proxy/handler/RelayHandler.java index dc4e7862c..8dab948a7 100644 --- a/java/google/registry/proxy/handler/RelayHandler.java +++ b/java/google/registry/proxy/handler/RelayHandler.java @@ -26,6 +26,8 @@ import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.util.Attribute; import io.netty.util.AttributeKey; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.ReferenceCounted; import java.util.Deque; import java.util.Queue; import javax.inject.Inject; @@ -84,6 +86,15 @@ public class RelayHandler extends SimpleChannelInboundHandler { public static void writeToRelayChannel( Channel channel, Channel relayChannel, Object msg, boolean retry) { + // If the message is reference counted, its internal buffer that holds the data will be freed by + // Netty when the reference count reduce to zero. When this message is written to the relay + // channel, regardless of whether it is successful or not, its reference count will be reduced + // to zero and its buffer will be freed. After the buffer is freed, the message cannot be used + // anymore, even if in Java's eye the object still exist, its content is gone. We increment a + // count here so that the message can be retried, in case the relay is not successful. + if (msg instanceof ReferenceCounted) { + ((ReferenceCounted) msg).retain(); + } ChannelFuture unusedFuture = relayChannel .writeAndFlush(msg) @@ -106,23 +117,28 @@ public class RelayHandler extends SimpleChannelInboundHandler { // as long as the frontend channel is open. Otherwise, we are relaying from the // backend to the frontend, and this relay failure cannot be recovered from: we // should just kill the relay (frontend) channel, which in turn will kill the - // backend channel. It is fine to just save the message object itself, not a - // clone of it, because if the relay is not successful, its content is not read, - // therefore its buffer is not cleared. + // backend channel. Queue relayBuffer = channel.attr(RELAY_BUFFER_KEY).get(); if (relayBuffer != null) { channel.attr(RELAY_BUFFER_KEY).get().add(msg); } ChannelFuture unusedFuture2 = relayChannel.close(); - } else if (retry) { - // TODO (jianglai): do not log the message once retry behavior is confirmed. - logger.atInfo().log( - "Relay retry succeeded: %s --> %s\nINBOUND: %s\nOUTBOUND: %s\nsMESSAGE: %s", - channel.attr(PROTOCOL_KEY).get().name(), - relayChannel.attr(PROTOCOL_KEY).get().name(), - channel, - relayChannel, - msg); + } else { + if (retry) { + // TODO (jianglai): do not log the message once retry behavior is confirmed. + logger.atInfo().log( + "Relay retry succeeded: %s --> %s\nINBOUND: %s\nOUTBOUND: %s\n" + + "MESSAGE: %s", + channel.attr(PROTOCOL_KEY).get().name(), + relayChannel.attr(PROTOCOL_KEY).get().name(), + channel, + relayChannel, + msg); + } + // If the write is successful, we know that no retry is needed. This function + // will decrement the reference count if the message is reference counted, + // allowing Netty to free the message's buffer. + ReferenceCountUtil.release(msg); } }); }