Automatic reconnect to GAE when the connection is dropped

The connection to GAE is not persistent and can drop. Reconnect when that happens, as long as the connection from the client is still active.

We need to consider the fact that while a reconnection is happening, the client may be sending requests that was relayed to the old connection, which is not going through. In that case these requests are queued and will be retried when the new connection is available.

Since we are no longer tying the lifecycles of the two connections, we cannot automatically terminate one when another is terminated. Also we need to explicitly control how WHOIS connection is terminated, not depending on the HTTP connection header.

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=207335498
This commit is contained in:
jianglai 2018-08-03 15:37:21 -07:00
parent 0b8e7498e0
commit 4ff77fb370
8 changed files with 203 additions and 79 deletions

View file

@ -15,7 +15,9 @@
package google.registry.proxy;
import static google.registry.proxy.Protocol.PROTOCOL_KEY;
import static google.registry.proxy.handler.RelayHandler.RELAY_BUFFER_KEY;
import static google.registry.proxy.handler.RelayHandler.RELAY_CHANNEL_KEY;
import static google.registry.proxy.handler.RelayHandler.writeToRelayChannel;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@ -40,7 +42,9 @@ import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.Future;
import io.netty.util.internal.logging.InternalLoggerFactory;
import io.netty.util.internal.logging.JdkLoggerFactory;
import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.inject.Provider;
@ -86,6 +90,7 @@ public class ProxyServer implements Runnable {
FrontendProtocol inboundProtocol =
(FrontendProtocol) inboundChannel.parent().attr(PROTOCOL_KEY).get();
inboundChannel.attr(PROTOCOL_KEY).set(inboundProtocol);
inboundChannel.attr(RELAY_BUFFER_KEY).set(new ArrayDeque<>());
addHandlers(inboundChannel.pipeline(), inboundProtocol.handlerProviders());
if (!inboundProtocol.hasBackend()) {
@ -114,30 +119,99 @@ public class ProxyServer implements Runnable {
// Outbound channel relays to inbound channel.
.attr(RELAY_CHANNEL_KEY, inboundChannel)
.attr(PROTOCOL_KEY, outboundProtocol);
ChannelFuture outboundChannelFuture =
bootstrap.connect(outboundProtocol.host(), outboundProtocol.port());
outboundChannelFuture.addListener(
(ChannelFuture future) -> {
if (future.isSuccess()) {
Channel outboundChannel = future.channel();
// Inbound channel relays to outbound channel.
inboundChannel.attr(RELAY_CHANNEL_KEY).set(outboundChannel);
// Outbound channel established successfully, inbound channel can start reading.
// This setter also calls channel.read() to request read operation.
inboundChannel.config().setAutoRead(true);
connectOutboundChannel(bootstrap, inboundProtocol, outboundProtocol, inboundChannel);
// If the inbound connection is closed, close its outbound relay connection as well. There
// is no way to recover from an inbound connection termination, as the connection can only
// be initiated by the client.
ChannelFuture unusedChannelFuture =
inboundChannel
.closeFuture()
.addListener(
(future) -> {
// Check if there's a relay connection. In case that the outbound connection
// is not successful, this attribute is not set.
Channel outboundChannel = inboundChannel.attr(RELAY_CHANNEL_KEY).get();
if (outboundChannel != null) {
ChannelFuture unusedChannelFuture2 = outboundChannel.close();
}
});
}
}
/**
* Establishes an outbound relay channel and sets the relevant metadata on both channels.
*
* <p>This method also adds a listener that is called when the established outbound connection
* is closed. The outbound connection to GAE is *not* guaranteed to persist. In case that the
* outbound connection closes but the inbound connection is still active, the listener calls
* this function again to re-establish another outbound connection. The metadata is also reset
* so that the inbound channel knows to relay to the new outbound channel.
*/
private static void connectOutboundChannel(
Bootstrap bootstrap,
FrontendProtocol inboundProtocol,
BackendProtocol outboundProtocol,
NioSocketChannel inboundChannel) {
ChannelFuture outboundChannelFuture =
bootstrap.connect(outboundProtocol.host(), outboundProtocol.port());
outboundChannelFuture.addListener(
(ChannelFuture future) -> {
if (future.isSuccess()) {
// Outbound connection is successful, now we can set the metadata to couple these two
// connections together.
Channel outboundChannel = future.channel();
// Inbound channel relays to outbound channel.
inboundChannel.attr(RELAY_CHANNEL_KEY).set(outboundChannel);
// Outbound channel established successfully, inbound channel can start reading.
// This setter also calls channel.read() to request read operation.
inboundChannel.config().setAutoRead(true);
logger.atInfo().log(
"Relay established: %s <-> %s\nFRONTEND: %s\nBACKEND: %s",
inboundProtocol.name(), outboundProtocol.name(), inboundChannel, outboundChannel);
// Now that we have a functional relay channel to the backend, if there's any
// buffered requests, send them off to the relay channel. We need to obtain a copy
// of the messages and clear the queue first, because if the relay is not successful,
// the message will be written back to the queue, causing an infinite loop.
Queue<Object> relayBuffer = inboundChannel.attr(RELAY_BUFFER_KEY).get();
Object[] messages = relayBuffer.toArray();
relayBuffer.clear();
for (Object msg : messages) {
writeToRelayChannel(inboundChannel, outboundChannel, msg);
logger.atInfo().log(
"Relay established: %s <-> %s\nFRONTEND: %s\nBACKEND: %s",
"Relay retried: %s <-> %s\nFRONTEND: %s\nBACKEND: %s",
inboundProtocol.name(),
outboundProtocol.name(),
inboundChannel,
outboundChannel);
} else {
logger.atSevere().withCause(future.cause()).log(
"Cannot connect to relay channel for %s protocol connection from %s.",
inboundProtocol.name(), inboundChannel.remoteAddress().getHostName());
}
});
}
// When this outbound connection is closed, try reconnecting if the inbound connection
// is still active.
ChannelFuture unusedChannelFuture =
outboundChannel
.closeFuture()
.addListener(
(ChannelFuture future2) -> {
if (inboundChannel.isActive()) {
logger.atInfo().log(
"Relay interrupted: %s <-> %s\nFRONTEND: %s\nBACKEND: %s",
inboundProtocol.name(),
outboundProtocol.name(),
inboundChannel,
outboundChannel);
connectOutboundChannel(
bootstrap, inboundProtocol, outboundProtocol, inboundChannel);
}
});
} else {
// We cannot connect to GAE for unknown reasons, no relay can be done so drop the
// inbound connection as well.
logger.atSevere().withCause(future.cause()).log(
"Cannot connect to relay channel for %s channel: %s.",
inboundProtocol.name(), inboundChannel);
ChannelFuture unusedFuture = inboundChannel.close();
}
});
}
private static void addHandlers(

View file

@ -118,7 +118,8 @@ public class EppServiceHandler extends HttpsRelayServiceHandler {
"epp", sslClientCertificateHash, ctx.channel());
channelRead(ctx, Unpooled.wrappedBuffer(helloBytes));
} else {
logger.atSevere().withCause(promise.cause()).log("Cannot finish handshake.");
logger.atSevere().withCause(promise.cause()).log(
"Cannot finish handshake for channel %s", ctx.channel());
ChannelFuture unusedFuture = ctx.close();
}
});

View file

@ -14,7 +14,7 @@
package google.registry.proxy.handler;
import static com.google.common.base.Preconditions.checkNotNull;
import static google.registry.proxy.Protocol.PROTOCOL_KEY;
import com.google.common.flogger.FluentLogger;
import io.netty.channel.Channel;
@ -25,6 +25,8 @@ import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import java.util.Deque;
import java.util.Queue;
import javax.inject.Inject;
/**
@ -33,6 +35,18 @@ import javax.inject.Inject;
*/
public class RelayHandler<I> extends SimpleChannelInboundHandler<I> {
/**
* A queue that saves messages that failed to be relayed.
*
* <p>This queue is null for channels that should not retry on failure, i. e. backend channels.
*
* <p>This queue does not need to be synchronised because it is only accessed by the I/O thread of
* the channel, or its relay channel. Since both channels use the same EventLoop, their I/O
* activities are handled by the same thread.
*/
public static final AttributeKey<Deque<Object>> RELAY_BUFFER_KEY =
AttributeKey.valueOf("RELAY_BUFFER_KEY");
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
/** Key used to retrieve the relay channel from a {@link Channel}'s {@link Attribute}. */
@ -43,45 +57,52 @@ public class RelayHandler<I> extends SimpleChannelInboundHandler<I> {
super(clazz, false);
}
/** Terminate connection when an exception is caught during inbound IO. */
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.atSevere().withCause(cause).log(
"Inbound exception caught for channel %s", ctx.channel());
ChannelFuture unusedFuture = ctx.close();
}
/** Close relay channel if this channel is closed. */
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel relayChannel = ctx.channel().attr(RELAY_CHANNEL_KEY).get();
if (relayChannel != null) {
ChannelFuture unusedFuture = relayChannel.close();
}
ctx.fireChannelInactive();
}
/** Read message of type {@code I}, write it as-is into the relay channel. */
@Override
protected void channelRead0(ChannelHandlerContext ctx, I msg) throws Exception {
Channel relayChannel = ctx.channel().attr(RELAY_CHANNEL_KEY).get();
checkNotNull(relayChannel, "Relay channel not specified for channel: %s", ctx.channel());
if (relayChannel.isActive()) {
// Relay channel is open, write to it.
ChannelFuture channelFuture = relayChannel.writeAndFlush(msg);
channelFuture.addListener(
future -> {
// Cannot write into relay channel, close this channel.
if (!future.isSuccess()) {
ChannelFuture unusedFuture = ctx.close();
}
});
Channel channel = ctx.channel();
Channel relayChannel = channel.attr(RELAY_CHANNEL_KEY).get();
if (relayChannel == null) {
logger.atSevere().log("Relay channel not specified for channel: %s", channel);
ChannelFuture unusedFuture = channel.close();
} else {
// close this channel if the relay channel is closed.
ChannelFuture unusedFuture = ctx.close();
writeToRelayChannel(channel, relayChannel, msg);
}
}
public static void writeToRelayChannel(Channel channel, Channel relayChannel, Object msg) {
ChannelFuture unusedFuture =
relayChannel
.writeAndFlush(msg)
.addListener(
future -> {
if (!future.isSuccess()) {
logger.atWarning().log(
"Relay failed: %s --> %s\nINBOUND: %s\nOUTBOUND: %s",
channel.attr(PROTOCOL_KEY).get().name(),
relayChannel.attr(PROTOCOL_KEY).get().name(),
channel,
relayChannel);
// If we cannot write to the relay channel and the originating channel has
// a relay buffer (i. e. we tried to relay the frontend to the backend), store
// the message in the buffer for retry later. Otherwise, we are relaying from
// the backend to the frontend, and this relay failure cannot be recovered
// from, we should just kill the relay (frontend) channel, which in turn will
// kill the backend channel. We should not kill any backend channel while the
// the frontend channel is open, because that will just trigger a reconnect.
// It is fine to just save the message object itself, not a clone of it,
// because if the relay is not successful, its content is not read, therefore
// its buffer is not cleared.
Queue<Object> relayBuffer = channel.attr(RELAY_BUFFER_KEY).get();
if (relayBuffer != null) {
channel.attr(RELAY_BUFFER_KEY).get().add(msg);
} else {
ChannelFuture unusedFuture2 = relayChannel.close();
}
}
});
}
/** Specialized {@link RelayHandler} that takes a {@link FullHttpRequest} as inbound payload. */
public static class FullHttpRequestRelayHandler extends RelayHandler<FullHttpRequest> {
@Inject

View file

@ -14,12 +14,17 @@
package google.registry.proxy.handler;
import static com.google.common.base.Preconditions.checkArgument;
import google.registry.proxy.metric.FrontendMetrics;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpResponse;
import java.util.function.Supplier;
/** Handler that processes WHOIS protocol logic. */
@ -44,11 +49,18 @@ public final class WhoisServiceHandler extends HttpsRelayServiceHandler {
FullHttpRequest request = super.decodeFullHttpRequest(byteBuf);
request
.headers()
// Close connection after a response is received, per RFC-3912
// https://tools.ietf.org/html/rfc3912
.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE)
.set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.TEXT_PLAIN)
.set(HttpHeaderNames.ACCEPT, HttpHeaderValues.TEXT_PLAIN);
return request;
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
// Close connection after a response is received, per RFC-3912
// https://tools.ietf.org/html/rfc3912
checkArgument(msg instanceof HttpResponse);
promise.addListener(ChannelFutureListener.CLOSE);
super.write(ctx, msg, promise);
}
}