mirror of
https://github.com/google/nomulus.git
synced 2025-05-13 07:57:13 +02:00
Correctly retry relay of reference counted objects
It turns out in the edge case where a write occurs at the same moment that the relay connection is terminated, the current retry mechanism is not sufficient because it stores reference coutned objects whose internal buffers are already freed. ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=208738065
This commit is contained in:
parent
2e2898e17c
commit
4965478cce
1 changed files with 28 additions and 12 deletions
|
@ -26,6 +26,8 @@ import io.netty.handler.codec.http.FullHttpRequest;
|
|||
import io.netty.handler.codec.http.FullHttpResponse;
|
||||
import io.netty.util.Attribute;
|
||||
import io.netty.util.AttributeKey;
|
||||
import io.netty.util.ReferenceCountUtil;
|
||||
import io.netty.util.ReferenceCounted;
|
||||
import java.util.Deque;
|
||||
import java.util.Queue;
|
||||
import javax.inject.Inject;
|
||||
|
@ -84,6 +86,15 @@ public class RelayHandler<I> extends SimpleChannelInboundHandler<I> {
|
|||
|
||||
public static void writeToRelayChannel(
|
||||
Channel channel, Channel relayChannel, Object msg, boolean retry) {
|
||||
// If the message is reference counted, its internal buffer that holds the data will be freed by
|
||||
// Netty when the reference count reduce to zero. When this message is written to the relay
|
||||
// channel, regardless of whether it is successful or not, its reference count will be reduced
|
||||
// to zero and its buffer will be freed. After the buffer is freed, the message cannot be used
|
||||
// anymore, even if in Java's eye the object still exist, its content is gone. We increment a
|
||||
// count here so that the message can be retried, in case the relay is not successful.
|
||||
if (msg instanceof ReferenceCounted) {
|
||||
((ReferenceCounted) msg).retain();
|
||||
}
|
||||
ChannelFuture unusedFuture =
|
||||
relayChannel
|
||||
.writeAndFlush(msg)
|
||||
|
@ -106,23 +117,28 @@ public class RelayHandler<I> extends SimpleChannelInboundHandler<I> {
|
|||
// as long as the frontend channel is open. Otherwise, we are relaying from the
|
||||
// backend to the frontend, and this relay failure cannot be recovered from: we
|
||||
// should just kill the relay (frontend) channel, which in turn will kill the
|
||||
// backend channel. It is fine to just save the message object itself, not a
|
||||
// clone of it, because if the relay is not successful, its content is not read,
|
||||
// therefore its buffer is not cleared.
|
||||
// backend channel.
|
||||
Queue<Object> relayBuffer = channel.attr(RELAY_BUFFER_KEY).get();
|
||||
if (relayBuffer != null) {
|
||||
channel.attr(RELAY_BUFFER_KEY).get().add(msg);
|
||||
}
|
||||
ChannelFuture unusedFuture2 = relayChannel.close();
|
||||
} else if (retry) {
|
||||
// TODO (jianglai): do not log the message once retry behavior is confirmed.
|
||||
logger.atInfo().log(
|
||||
"Relay retry succeeded: %s --> %s\nINBOUND: %s\nOUTBOUND: %s\nsMESSAGE: %s",
|
||||
channel.attr(PROTOCOL_KEY).get().name(),
|
||||
relayChannel.attr(PROTOCOL_KEY).get().name(),
|
||||
channel,
|
||||
relayChannel,
|
||||
msg);
|
||||
} else {
|
||||
if (retry) {
|
||||
// TODO (jianglai): do not log the message once retry behavior is confirmed.
|
||||
logger.atInfo().log(
|
||||
"Relay retry succeeded: %s --> %s\nINBOUND: %s\nOUTBOUND: %s\n"
|
||||
+ "MESSAGE: %s",
|
||||
channel.attr(PROTOCOL_KEY).get().name(),
|
||||
relayChannel.attr(PROTOCOL_KEY).get().name(),
|
||||
channel,
|
||||
relayChannel,
|
||||
msg);
|
||||
}
|
||||
// If the write is successful, we know that no retry is needed. This function
|
||||
// will decrement the reference count if the message is reference counted,
|
||||
// allowing Netty to free the message's buffer.
|
||||
ReferenceCountUtil.release(msg);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue