diff --git a/java/google/registry/proxy/ProxyServer.java b/java/google/registry/proxy/ProxyServer.java index a02a4c94e..81cda3920 100644 --- a/java/google/registry/proxy/ProxyServer.java +++ b/java/google/registry/proxy/ProxyServer.java @@ -15,7 +15,9 @@ package google.registry.proxy; import static google.registry.proxy.Protocol.PROTOCOL_KEY; +import static google.registry.proxy.handler.RelayHandler.RELAY_BUFFER_KEY; import static google.registry.proxy.handler.RelayHandler.RELAY_CHANNEL_KEY; +import static google.registry.proxy.handler.RelayHandler.writeToRelayChannel; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -40,7 +42,9 @@ import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.concurrent.Future; import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.JdkLoggerFactory; +import java.util.ArrayDeque; import java.util.HashMap; +import java.util.Queue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import javax.inject.Provider; @@ -86,6 +90,7 @@ public class ProxyServer implements Runnable { FrontendProtocol inboundProtocol = (FrontendProtocol) inboundChannel.parent().attr(PROTOCOL_KEY).get(); inboundChannel.attr(PROTOCOL_KEY).set(inboundProtocol); + inboundChannel.attr(RELAY_BUFFER_KEY).set(new ArrayDeque<>()); addHandlers(inboundChannel.pipeline(), inboundProtocol.handlerProviders()); if (!inboundProtocol.hasBackend()) { @@ -114,30 +119,99 @@ public class ProxyServer implements Runnable { // Outbound channel relays to inbound channel. .attr(RELAY_CHANNEL_KEY, inboundChannel) .attr(PROTOCOL_KEY, outboundProtocol); - ChannelFuture outboundChannelFuture = - bootstrap.connect(outboundProtocol.host(), outboundProtocol.port()); - outboundChannelFuture.addListener( - (ChannelFuture future) -> { - if (future.isSuccess()) { - Channel outboundChannel = future.channel(); - // Inbound channel relays to outbound channel. - inboundChannel.attr(RELAY_CHANNEL_KEY).set(outboundChannel); - // Outbound channel established successfully, inbound channel can start reading. - // This setter also calls channel.read() to request read operation. - inboundChannel.config().setAutoRead(true); + + connectOutboundChannel(bootstrap, inboundProtocol, outboundProtocol, inboundChannel); + // If the inbound connection is closed, close its outbound relay connection as well. There + // is no way to recover from an inbound connection termination, as the connection can only + // be initiated by the client. + ChannelFuture unusedChannelFuture = + inboundChannel + .closeFuture() + .addListener( + (future) -> { + // Check if there's a relay connection. In case that the outbound connection + // is not successful, this attribute is not set. + Channel outboundChannel = inboundChannel.attr(RELAY_CHANNEL_KEY).get(); + if (outboundChannel != null) { + ChannelFuture unusedChannelFuture2 = outboundChannel.close(); + } + }); + } + } + + /** + * Establishes an outbound relay channel and sets the relevant metadata on both channels. + * + *

This method also adds a listener that is called when the established outbound connection + * is closed. The outbound connection to GAE is *not* guaranteed to persist. In case that the + * outbound connection closes but the inbound connection is still active, the listener calls + * this function again to re-establish another outbound connection. The metadata is also reset + * so that the inbound channel knows to relay to the new outbound channel. + */ + private static void connectOutboundChannel( + Bootstrap bootstrap, + FrontendProtocol inboundProtocol, + BackendProtocol outboundProtocol, + NioSocketChannel inboundChannel) { + ChannelFuture outboundChannelFuture = + bootstrap.connect(outboundProtocol.host(), outboundProtocol.port()); + outboundChannelFuture.addListener( + (ChannelFuture future) -> { + if (future.isSuccess()) { + // Outbound connection is successful, now we can set the metadata to couple these two + // connections together. + Channel outboundChannel = future.channel(); + // Inbound channel relays to outbound channel. + inboundChannel.attr(RELAY_CHANNEL_KEY).set(outboundChannel); + // Outbound channel established successfully, inbound channel can start reading. + // This setter also calls channel.read() to request read operation. + inboundChannel.config().setAutoRead(true); + logger.atInfo().log( + "Relay established: %s <-> %s\nFRONTEND: %s\nBACKEND: %s", + inboundProtocol.name(), outboundProtocol.name(), inboundChannel, outboundChannel); + // Now that we have a functional relay channel to the backend, if there's any + // buffered requests, send them off to the relay channel. We need to obtain a copy + // of the messages and clear the queue first, because if the relay is not successful, + // the message will be written back to the queue, causing an infinite loop. + Queue relayBuffer = inboundChannel.attr(RELAY_BUFFER_KEY).get(); + Object[] messages = relayBuffer.toArray(); + relayBuffer.clear(); + for (Object msg : messages) { + writeToRelayChannel(inboundChannel, outboundChannel, msg); logger.atInfo().log( - "Relay established: %s <-> %s\nFRONTEND: %s\nBACKEND: %s", + "Relay retried: %s <-> %s\nFRONTEND: %s\nBACKEND: %s", inboundProtocol.name(), outboundProtocol.name(), inboundChannel, outboundChannel); - } else { - logger.atSevere().withCause(future.cause()).log( - "Cannot connect to relay channel for %s protocol connection from %s.", - inboundProtocol.name(), inboundChannel.remoteAddress().getHostName()); } - }); - } + // When this outbound connection is closed, try reconnecting if the inbound connection + // is still active. + ChannelFuture unusedChannelFuture = + outboundChannel + .closeFuture() + .addListener( + (ChannelFuture future2) -> { + if (inboundChannel.isActive()) { + logger.atInfo().log( + "Relay interrupted: %s <-> %s\nFRONTEND: %s\nBACKEND: %s", + inboundProtocol.name(), + outboundProtocol.name(), + inboundChannel, + outboundChannel); + connectOutboundChannel( + bootstrap, inboundProtocol, outboundProtocol, inboundChannel); + } + }); + } else { + // We cannot connect to GAE for unknown reasons, no relay can be done so drop the + // inbound connection as well. + logger.atSevere().withCause(future.cause()).log( + "Cannot connect to relay channel for %s channel: %s.", + inboundProtocol.name(), inboundChannel); + ChannelFuture unusedFuture = inboundChannel.close(); + } + }); } private static void addHandlers( diff --git a/java/google/registry/proxy/handler/EppServiceHandler.java b/java/google/registry/proxy/handler/EppServiceHandler.java index 10318cfd8..15b6f9a06 100644 --- a/java/google/registry/proxy/handler/EppServiceHandler.java +++ b/java/google/registry/proxy/handler/EppServiceHandler.java @@ -118,7 +118,8 @@ public class EppServiceHandler extends HttpsRelayServiceHandler { "epp", sslClientCertificateHash, ctx.channel()); channelRead(ctx, Unpooled.wrappedBuffer(helloBytes)); } else { - logger.atSevere().withCause(promise.cause()).log("Cannot finish handshake."); + logger.atSevere().withCause(promise.cause()).log( + "Cannot finish handshake for channel %s", ctx.channel()); ChannelFuture unusedFuture = ctx.close(); } }); diff --git a/java/google/registry/proxy/handler/RelayHandler.java b/java/google/registry/proxy/handler/RelayHandler.java index 2193b348d..e47ecdc6d 100644 --- a/java/google/registry/proxy/handler/RelayHandler.java +++ b/java/google/registry/proxy/handler/RelayHandler.java @@ -14,7 +14,7 @@ package google.registry.proxy.handler; -import static com.google.common.base.Preconditions.checkNotNull; +import static google.registry.proxy.Protocol.PROTOCOL_KEY; import com.google.common.flogger.FluentLogger; import io.netty.channel.Channel; @@ -25,6 +25,8 @@ import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.util.Attribute; import io.netty.util.AttributeKey; +import java.util.Deque; +import java.util.Queue; import javax.inject.Inject; /** @@ -33,6 +35,18 @@ import javax.inject.Inject; */ public class RelayHandler extends SimpleChannelInboundHandler { + /** + * A queue that saves messages that failed to be relayed. + * + *

This queue is null for channels that should not retry on failure, i. e. backend channels. + * + *

This queue does not need to be synchronised because it is only accessed by the I/O thread of + * the channel, or its relay channel. Since both channels use the same EventLoop, their I/O + * activities are handled by the same thread. + */ + public static final AttributeKey> RELAY_BUFFER_KEY = + AttributeKey.valueOf("RELAY_BUFFER_KEY"); + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); /** Key used to retrieve the relay channel from a {@link Channel}'s {@link Attribute}. */ @@ -43,45 +57,52 @@ public class RelayHandler extends SimpleChannelInboundHandler { super(clazz, false); } - /** Terminate connection when an exception is caught during inbound IO. */ - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - logger.atSevere().withCause(cause).log( - "Inbound exception caught for channel %s", ctx.channel()); - ChannelFuture unusedFuture = ctx.close(); - } - - /** Close relay channel if this channel is closed. */ - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - Channel relayChannel = ctx.channel().attr(RELAY_CHANNEL_KEY).get(); - if (relayChannel != null) { - ChannelFuture unusedFuture = relayChannel.close(); - } - ctx.fireChannelInactive(); - } - /** Read message of type {@code I}, write it as-is into the relay channel. */ @Override protected void channelRead0(ChannelHandlerContext ctx, I msg) throws Exception { - Channel relayChannel = ctx.channel().attr(RELAY_CHANNEL_KEY).get(); - checkNotNull(relayChannel, "Relay channel not specified for channel: %s", ctx.channel()); - if (relayChannel.isActive()) { - // Relay channel is open, write to it. - ChannelFuture channelFuture = relayChannel.writeAndFlush(msg); - channelFuture.addListener( - future -> { - // Cannot write into relay channel, close this channel. - if (!future.isSuccess()) { - ChannelFuture unusedFuture = ctx.close(); - } - }); + Channel channel = ctx.channel(); + Channel relayChannel = channel.attr(RELAY_CHANNEL_KEY).get(); + if (relayChannel == null) { + logger.atSevere().log("Relay channel not specified for channel: %s", channel); + ChannelFuture unusedFuture = channel.close(); } else { - // close this channel if the relay channel is closed. - ChannelFuture unusedFuture = ctx.close(); + writeToRelayChannel(channel, relayChannel, msg); } } + public static void writeToRelayChannel(Channel channel, Channel relayChannel, Object msg) { + ChannelFuture unusedFuture = + relayChannel + .writeAndFlush(msg) + .addListener( + future -> { + if (!future.isSuccess()) { + logger.atWarning().log( + "Relay failed: %s --> %s\nINBOUND: %s\nOUTBOUND: %s", + channel.attr(PROTOCOL_KEY).get().name(), + relayChannel.attr(PROTOCOL_KEY).get().name(), + channel, + relayChannel); + // If we cannot write to the relay channel and the originating channel has + // a relay buffer (i. e. we tried to relay the frontend to the backend), store + // the message in the buffer for retry later. Otherwise, we are relaying from + // the backend to the frontend, and this relay failure cannot be recovered + // from, we should just kill the relay (frontend) channel, which in turn will + // kill the backend channel. We should not kill any backend channel while the + // the frontend channel is open, because that will just trigger a reconnect. + // It is fine to just save the message object itself, not a clone of it, + // because if the relay is not successful, its content is not read, therefore + // its buffer is not cleared. + Queue relayBuffer = channel.attr(RELAY_BUFFER_KEY).get(); + if (relayBuffer != null) { + channel.attr(RELAY_BUFFER_KEY).get().add(msg); + } else { + ChannelFuture unusedFuture2 = relayChannel.close(); + } + } + }); + } + /** Specialized {@link RelayHandler} that takes a {@link FullHttpRequest} as inbound payload. */ public static class FullHttpRequestRelayHandler extends RelayHandler { @Inject diff --git a/java/google/registry/proxy/handler/WhoisServiceHandler.java b/java/google/registry/proxy/handler/WhoisServiceHandler.java index f0e8a042a..cf134bf9f 100644 --- a/java/google/registry/proxy/handler/WhoisServiceHandler.java +++ b/java/google/registry/proxy/handler/WhoisServiceHandler.java @@ -14,12 +14,17 @@ package google.registry.proxy.handler; +import static com.google.common.base.Preconditions.checkArgument; + import google.registry.proxy.metric.FrontendMetrics; import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpHeaderValues; +import io.netty.handler.codec.http.HttpResponse; import java.util.function.Supplier; /** Handler that processes WHOIS protocol logic. */ @@ -44,11 +49,18 @@ public final class WhoisServiceHandler extends HttpsRelayServiceHandler { FullHttpRequest request = super.decodeFullHttpRequest(byteBuf); request .headers() - // Close connection after a response is received, per RFC-3912 - // https://tools.ietf.org/html/rfc3912 - .set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE) .set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.TEXT_PLAIN) .set(HttpHeaderNames.ACCEPT, HttpHeaderValues.TEXT_PLAIN); return request; } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) + throws Exception { + // Close connection after a response is received, per RFC-3912 + // https://tools.ietf.org/html/rfc3912 + checkArgument(msg instanceof HttpResponse); + promise.addListener(ChannelFutureListener.CLOSE); + super.write(ctx, msg, promise); + } } diff --git a/javatests/google/registry/proxy/TestUtils.java b/javatests/google/registry/proxy/TestUtils.java index c62e442dd..820b79e3e 100644 --- a/javatests/google/registry/proxy/TestUtils.java +++ b/javatests/google/registry/proxy/TestUtils.java @@ -79,7 +79,6 @@ public class TestUtils { FullHttpRequest request = makeHttpPostRequest(content, host, path); request .headers() - .set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE) .set(HttpHeaderNames.AUTHORIZATION, "Bearer " + accessToken) .set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.TEXT_PLAIN) .set(HttpHeaderNames.ACCEPT, HttpHeaderValues.TEXT_PLAIN); diff --git a/javatests/google/registry/proxy/WhoisProtocolModuleTest.java b/javatests/google/registry/proxy/WhoisProtocolModuleTest.java index 395d9d826..b90bdbb07 100644 --- a/javatests/google/registry/proxy/WhoisProtocolModuleTest.java +++ b/javatests/google/registry/proxy/WhoisProtocolModuleTest.java @@ -17,6 +17,7 @@ package google.registry.proxy; import static com.google.common.truth.Truth.assertThat; import static google.registry.proxy.TestUtils.makeWhoisHttpRequest; import static google.registry.proxy.TestUtils.makeWhoisHttpResponse; +import static google.registry.testing.JUnitBackports.assertThrows; import static java.nio.charset.StandardCharsets.US_ASCII; import static java.util.stream.Collectors.joining; import static org.junit.Assert.fail; @@ -26,6 +27,7 @@ import io.netty.buffer.Unpooled; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpResponseStatus; +import java.nio.channels.ClosedChannelException; import java.util.stream.Stream; import org.junit.Test; import org.junit.runner.RunWith; @@ -53,7 +55,7 @@ public class WhoisProtocolModuleTest extends ProtocolModuleTest { PROXY_CONFIG.whois.relayHost, PROXY_CONFIG.whois.relayPath, TestModule.provideFakeAccessToken().get()); - assertThat(expectedRequest).isEqualTo(actualRequest); + assertThat(actualRequest).isEqualTo(expectedRequest); assertThat(channel.isActive()).isTrue(); // Nothing more to read. assertThat((Object) channel.readInbound()).isNull(); @@ -88,7 +90,7 @@ public class WhoisProtocolModuleTest extends ProtocolModuleTest { PROXY_CONFIG.whois.relayHost, PROXY_CONFIG.whois.relayPath, TestModule.provideFakeAccessToken().get()); - assertThat(expectedRequest1).isEqualTo(actualRequest1); + assertThat(actualRequest1).isEqualTo(expectedRequest1); // No more message at this point. assertThat((Object) channel.readInbound()).isNull(); // More inbound bytes, but no newline. @@ -102,7 +104,7 @@ public class WhoisProtocolModuleTest extends ProtocolModuleTest { PROXY_CONFIG.whois.relayHost, PROXY_CONFIG.whois.relayPath, TestModule.provideFakeAccessToken().get()); - assertThat(expectedRequest2).isEqualTo(actualRequest2); + assertThat(actualRequest2).isEqualTo(expectedRequest2); // The third message is not complete yet. assertThat(channel.isActive()).isTrue(); assertThat((Object) channel.readInbound()).isNull(); @@ -126,26 +128,23 @@ public class WhoisProtocolModuleTest extends ProtocolModuleTest { assertThat(channel.writeOutbound(response)).isTrue(); ByteBuf outputBuffer = channel.readOutbound(); assertThat(outputBuffer.toString(US_ASCII)).isEqualTo(outputString); - assertThat(channel.isActive()).isTrue(); + assertThat(channel.isActive()).isFalse(); // Nothing more to write. assertThat((Object) channel.readOutbound()).isNull(); } @Test - public void testSuccess_parseMultipleOutboundHttpResponse() { + public void testFailure_parseOnlyFirstFromMultipleOutboundHttpResponse() { String outputString1 = "line1\r\nline2\r\n"; String outputString2 = "line3\r\nline4\r\nline5\r\n"; FullHttpResponse response1 = makeWhoisHttpResponse(outputString1, HttpResponseStatus.OK); FullHttpResponse response2 = makeWhoisHttpResponse(outputString2, HttpResponseStatus.OK); - assertThat(channel.writeOutbound(response1, response2)).isTrue(); + assertThrows(ClosedChannelException.class, () -> channel.writeOutbound(response1, response2)); // First Http response parsed ByteBuf outputBuffer1 = channel.readOutbound(); assertThat(outputBuffer1.toString(US_ASCII)).isEqualTo(outputString1); - // Second Http response parsed - ByteBuf outputBuffer2 = channel.readOutbound(); - assertThat(outputBuffer2.toString(US_ASCII)).isEqualTo(outputString2); - assertThat(channel.isActive()).isTrue(); - // Nothing more to write. + // Second Http response not parsed because the connection is closed. + assertThat(channel.isActive()).isFalse(); assertThat((Object) channel.readOutbound()).isNull(); } diff --git a/javatests/google/registry/proxy/handler/RelayHandlerTest.java b/javatests/google/registry/proxy/handler/RelayHandlerTest.java index 4f98bd05e..a59420bcd 100644 --- a/javatests/google/registry/proxy/handler/RelayHandlerTest.java +++ b/javatests/google/registry/proxy/handler/RelayHandlerTest.java @@ -15,10 +15,16 @@ package google.registry.proxy.handler; import static com.google.common.truth.Truth.assertThat; +import static google.registry.proxy.Protocol.PROTOCOL_KEY; +import static google.registry.proxy.handler.RelayHandler.RELAY_BUFFER_KEY; import static google.registry.proxy.handler.RelayHandler.RELAY_CHANNEL_KEY; -import io.netty.channel.ChannelFuture; +import com.google.common.collect.ImmutableList; +import google.registry.proxy.Protocol; +import google.registry.proxy.Protocol.BackendProtocol; +import google.registry.proxy.Protocol.FrontendProtocol; import io.netty.channel.embedded.EmbeddedChannel; +import java.util.ArrayDeque; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -35,10 +41,27 @@ public class RelayHandlerTest { private final RelayHandler relayHandler = new RelayHandler<>(ExpectedType.class); private final EmbeddedChannel inboundChannel = new EmbeddedChannel(relayHandler); private final EmbeddedChannel outboundChannel = new EmbeddedChannel(); + private final FrontendProtocol frontendProtocol = + Protocol.frontendBuilder() + .port(0) + .name("FRONTEND") + .handlerProviders(ImmutableList.of()) + .relayProtocol( + Protocol.backendBuilder() + .host("host.invalid") + .port(0) + .name("BACKEND") + .handlerProviders(ImmutableList.of()) + .build()) + .build(); + private final BackendProtocol backendProtocol = frontendProtocol.relayProtocol(); @Before public void setUp() { inboundChannel.attr(RELAY_CHANNEL_KEY).set(outboundChannel); + inboundChannel.attr(RELAY_BUFFER_KEY).set(new ArrayDeque<>()); + inboundChannel.attr(PROTOCOL_KEY).set(frontendProtocol); + outboundChannel.attr(PROTOCOL_KEY).set(backendProtocol); } @Test @@ -62,21 +85,16 @@ public class RelayHandlerTest { } @Test - public void testSuccess_disconnectIfRelayIsUnsuccessful() { + public void testSuccess_outboundClosed_enqueueBuffer() { ExpectedType inboundMessage = new ExpectedType(); - // Outbound channel is closed. + // Outbound channel (backend) is closed. outboundChannel.finish(); assertThat(inboundChannel.writeInbound(inboundMessage)).isFalse(); ExpectedType relayedMessage = outboundChannel.readOutbound(); assertThat(relayedMessage).isNull(); - // Inbound channel is closed as well. - assertThat(inboundChannel.isActive()).isFalse(); - } - - @Test - public void testSuccess_disconnectRelayChannelIfInactive() { - ChannelFuture unusedFuture = inboundChannel.close(); - assertThat(outboundChannel.isActive()).isFalse(); + // Inbound channel (frontend) should stay open. + assertThat(inboundChannel.isActive()).isTrue(); + assertThat(inboundChannel.attr(RELAY_BUFFER_KEY).get()).containsExactly(inboundMessage); } @Test diff --git a/javatests/google/registry/proxy/handler/WhoisServiceHandlerTest.java b/javatests/google/registry/proxy/handler/WhoisServiceHandlerTest.java index 92567e69e..191dbbcdf 100644 --- a/javatests/google/registry/proxy/handler/WhoisServiceHandlerTest.java +++ b/javatests/google/registry/proxy/handler/WhoisServiceHandlerTest.java @@ -108,7 +108,7 @@ public class WhoisServiceHandlerTest { assertThat(parsedBuffer.toString(US_ASCII)).isEqualTo(outputString); // The channel is still open, and nothing else is to be written to it. assertThat((Object) channel.readOutbound()).isNull(); - assertThat(channel.isActive()).isTrue(); + assertThat(channel.isActive()).isFalse(); } @Test