Netty/Java を使用して、他の多くの TCP エンドポイントにリクエストを転送する TCP プロキシを作成しようとしています。
例えば:
/--> SERVER A
Client A --> PROXY --
\--> SERVER B
がプロキシ経由で TCP コマンドを送信すると、プロキシは とClient A
への 2 つの TCP 接続を開き、 から送信された要求を両方に同時にプロキシします。Server A
Server B
Client A
Client A
後で別のコマンドを送信すると、理論的には、プロキシは以前に 2 つの接続をプールにキャッシュしているため、2 つの新しい接続を再度開くことなく、2 つのサーバーに要求をプロキシします。
応答処理に関しては、次の 2 つのオプションが必要です。
- に対する 2 つの応答を順番に表示し
Client A
ます。 - または、応答を完全に無視します。
接続が失われたり閉じられたりした場合、プロキシはそれを自動的に再作成し、接続プールに戻すことができるはずです。
私は Netty の例を見てChannelGroup
、接続プールを処理するために使用しようとしましたが、成功しませんでした。また、以下のコードでは、最初のリクエストを送信した後、プロキシが機能しなくなります。任意のヒント?
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOption;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.util.LinkedList;
import java.util.List;
public class TcpProxyHandler extends ChannelInboundHandlerAdapter {
private static List<String> hosts = new LinkedList<>();
private static List<String> connected = new LinkedList<>();
static {
hosts.add("127.0.0.1:10000");
hosts.add("127.0.0.1:20000");
}
static final ChannelGroup channels = new DefaultChannelGroup(
GlobalEventExecutor.INSTANCE);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
final Channel inboundChannel = ctx.channel();
for (String host : hosts) {
if (!connected.contains(host)) {
String address = host.split(":")[0];
int port = Integer.parseInt(host.split(":")[1]);
Channel outboundChannel = ConnectionPool.getConnection(address,
port);
if (outboundChannel == null) {
Bootstrap b = new Bootstrap();
b.group(inboundChannel.eventLoop())
.channel(ctx.channel().getClass())
.handler(new TcpProxyBackendHandler(inboundChannel))
.option(ChannelOption.AUTO_READ, false);
ChannelFuture f = b.connect(address, port);
outboundChannel = f.channel();
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future)
throws Exception {
if (future.isSuccess()) {
// connection complete start to read first data
inboundChannel.read();
} else {
// Close the connection if the connection
// attempt
// has failed.
inboundChannel.close();
}
}
});
channels.add(outboundChannel);
connected.add(host);
System.out.println("Connected to " + host);
}
}
}
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg)
throws Exception {
channels.flushAndWrite(msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
ctx.close();
}
static void closeOnFlush(Channel ch) {
if (ch.isActive()) {
ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(
ChannelFutureListener.CLOSE);
}
}
static class TcpProxyBackendHandler extends ChannelInboundHandlerAdapter {
private final Channel inboundChannel;
public TcpProxyBackendHandler(Channel inboundChannel) {
this.inboundChannel = inboundChannel;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.read();
ctx.write(Unpooled.EMPTY_BUFFER);
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg)
throws Exception {
inboundChannel.writeAndFlush(msg).addListener(
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future)
throws Exception {
if (future.isSuccess()) {
ctx.channel().read();
} else {
future.channel().close();
}
}
});
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
TcpProxyHandler.closeOnFlush(inboundChannel);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
TcpProxyHandler.closeOnFlush(ctx.channel());
}
}
}