google-nomulus/java/google/registry/proxy/handler/RelayHandler.java
jianglai c5c0051f5e Ensure that no reference counted objects leak memory
The objects stored in the relay buffer may leak memory when they are no longer used. Alway remember to release their reference count in all cases.

Also save the relay channel and its name in BackendMetricsHandler when the handler is registered. This is because when retrying a relay, the write is sent as soon as the channel is connected, and the channelActive function is not called yet.

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=208757730
2018-08-20 14:06:24 -04:00

165 lines
7.6 KiB
Java

// Copyright 2017 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.proxy.handler;
import static google.registry.proxy.Protocol.PROTOCOL_KEY;
import com.google.common.flogger.FluentLogger;
import google.registry.proxy.handler.QuotaHandler.OverQuotaException;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import java.util.Deque;
import java.util.Queue;
import javax.inject.Inject;
/**
* Receives inbound massage of type {@code I}, and writes it to the {@code relayChannel} stored in
* the inbound channel's attribute.
*/
public class RelayHandler<I> extends SimpleChannelInboundHandler<I> {
/**
* A queue that saves messages that failed to be relayed.
*
* <p>This queue is null for channels that should not retry on failure, i. e. backend channels.
*
* <p>This queue does not need to be synchronised because it is only accessed by the I/O thread of
* the channel, or its relay channel. Since both channels use the same EventLoop, their I/O
* activities are handled by the same thread.
*/
public static final AttributeKey<Deque<Object>> RELAY_BUFFER_KEY =
AttributeKey.valueOf("RELAY_BUFFER_KEY");
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
/** Key used to retrieve the relay channel from a {@link Channel}'s {@link Attribute}. */
public static final AttributeKey<Channel> RELAY_CHANNEL_KEY =
AttributeKey.valueOf("RELAY_CHANNEL");
public RelayHandler(Class<? extends I> clazz) {
super(clazz, false);
}
/** Read message of type {@code I}, write it as-is into the relay channel. */
@Override
protected void channelRead0(ChannelHandlerContext ctx, I msg) throws Exception {
Channel channel = ctx.channel();
Channel relayChannel = channel.attr(RELAY_CHANNEL_KEY).get();
if (relayChannel == null) {
logger.atSevere().log("Relay channel not specified for channel: %s", channel);
ChannelFuture unusedFuture = channel.close();
} else {
writeToRelayChannel(channel, relayChannel, msg, false);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
if (cause instanceof OverQuotaException) {
logger.atWarning().withCause(cause).log(
"Channel %s closed due to quota exceeded", ctx.channel());
ChannelFuture unusedFuture = ctx.close();
} else {
ctx.fireExceptionCaught(cause);
}
}
public static void writeToRelayChannel(
Channel channel, Channel relayChannel, Object msg, boolean retry) {
// If the message is reference counted, its internal buffer that holds the data will be freed by
// Netty when the reference count reduce to zero. When this message is written to the relay
// channel, regardless of whether it is successful or not, its reference count will be reduced
// to zero and its buffer will be freed. After the buffer is freed, the message cannot be used
// anymore, even if in Java's eye the object still exist, its content is gone. We increment a
// count here so that the message can be retried, in case the relay is not successful.
if (msg instanceof ReferenceCounted) {
((ReferenceCounted) msg).retain();
}
ChannelFuture unusedFuture =
relayChannel
.writeAndFlush(msg)
.addListener(
future -> {
if (!future.isSuccess()) {
// TODO (jianglai): do not log the message once retry behavior is confirmed.
logger.atWarning().withCause(future.cause()).log(
"Relay failed: %s --> %s\nINBOUND: %s\nOUTBOUND: %s\nMESSAGE: %s",
channel.attr(PROTOCOL_KEY).get().name(),
relayChannel.attr(PROTOCOL_KEY).get().name(),
channel,
relayChannel,
msg);
// If we cannot write to the relay channel and the originating channel has
// a relay buffer (i. e. we tried to relay the frontend to the backend), store
// the message in the buffer for retry later. The relay channel (backend) should
// be killed (if it is not already dead, usually the relay is unsuccessful
// because the connection is closed), and a new backend channel will re-connect
// as long as the frontend channel is open. Otherwise, we are relaying from the
// backend to the frontend, and this relay failure cannot be recovered from: we
// should just kill the relay (frontend) channel, which in turn will kill the
// backend channel.
Queue<Object> relayBuffer = channel.attr(RELAY_BUFFER_KEY).get();
if (relayBuffer != null) {
channel.attr(RELAY_BUFFER_KEY).get().add(msg);
} else {
// We are not going to retry, decrement a counter to allow the message to be
// freed by Netty, if the message is reference counted.
ReferenceCountUtil.release(msg);
}
ChannelFuture unusedFuture2 = relayChannel.close();
} else {
if (retry) {
// TODO (jianglai): do not log the message once retry behavior is confirmed.
logger.atInfo().log(
"Relay retry succeeded: %s --> %s\nINBOUND: %s\nOUTBOUND: %s\n"
+ "MESSAGE: %s",
channel.attr(PROTOCOL_KEY).get().name(),
relayChannel.attr(PROTOCOL_KEY).get().name(),
channel,
relayChannel,
msg);
}
// If the write is successful, we know that no retry is needed. This function
// will decrement the reference count if the message is reference counted,
// allowing Netty to free the message's buffer.
ReferenceCountUtil.release(msg);
}
});
}
/** Specialized {@link RelayHandler} that takes a {@link FullHttpRequest} as inbound payload. */
public static class FullHttpRequestRelayHandler extends RelayHandler<FullHttpRequest> {
@Inject
public FullHttpRequestRelayHandler() {
super(FullHttpRequest.class);
}
}
/** Specialized {@link RelayHandler} that takes a {@link FullHttpResponse} as inbound payload. */
public static class FullHttpResponseRelayHandler extends RelayHandler<FullHttpResponse> {
@Inject
public FullHttpResponseRelayHandler() {
super(FullHttpResponse.class);
}
}
}