4

Netty/Java を使用して、他の多くの TCP エンドポイントにリクエストを転送する TCP プロキシを作成しようとしています。

例えば:

                     /--> SERVER A 
Client A --> PROXY --
                     \--> SERVER B 

がプロキシ経由で TCP コマンドを送信すると、プロキシは とClient Aへの 2 つの TCP 接続を開き、 から送信された要求を両方に同時にプロキシします。Server AServer BClient A

Client A後で別のコマンドを送信すると、理論的には、プロキシは以前に 2 つの接続をプールにキャッシュしているため、2 つの新しい接続を再度開くことなく、2 つのサーバーに要求をプロキシします。

応答処理に関しては、次の 2 つのオプションが必要です。

  • に対する 2 つの応答を順番に表示しClient Aます。
  • または、応答を完全に無視します。

接続が失われたり閉じられたりした場合、プロキシはそれを自動的に再作成し、接続プールに戻すことができるはずです。

私は Netty の例を見てChannelGroup、接続プールを処理するために使用しようとしましたが、成功しませんでした。また、以下のコードでは、最初のリクエストを送信した後、プロキシが機能しなくなります。任意のヒント?

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOption;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.util.LinkedList;
import java.util.List;

public class TcpProxyHandler extends ChannelInboundHandlerAdapter {

    private static List<String> hosts = new LinkedList<>();
    private static List<String> connected = new LinkedList<>();

    static {
        hosts.add("127.0.0.1:10000");
        hosts.add("127.0.0.1:20000");
    }

    static final ChannelGroup channels = new DefaultChannelGroup(
            GlobalEventExecutor.INSTANCE);

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        final Channel inboundChannel = ctx.channel();

        for (String host : hosts) {
            if (!connected.contains(host)) {
                String address = host.split(":")[0];
                int port = Integer.parseInt(host.split(":")[1]);
                Channel outboundChannel = ConnectionPool.getConnection(address,
                        port);
                if (outboundChannel == null) {
                    Bootstrap b = new Bootstrap();
                    b.group(inboundChannel.eventLoop())
                            .channel(ctx.channel().getClass())
                            .handler(new TcpProxyBackendHandler(inboundChannel))
                            .option(ChannelOption.AUTO_READ, false);
                    ChannelFuture f = b.connect(address, port);
                    outboundChannel = f.channel();
                    f.addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future)
                                throws Exception {
                            if (future.isSuccess()) {
                                // connection complete start to read first data
                                inboundChannel.read();
                            } else {
                                // Close the connection if the connection
                                // attempt
                                // has failed.
                                inboundChannel.close();
                            }
                        }
                    });

                    channels.add(outboundChannel);
                    connected.add(host);
                    System.out.println("Connected to " + host);
                }
            }

        }

    }

    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg)
            throws Exception {
        channels.flushAndWrite(msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    static void closeOnFlush(Channel ch) {
        if (ch.isActive()) {
            ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(
                    ChannelFutureListener.CLOSE);
        }
    }

    static class TcpProxyBackendHandler extends ChannelInboundHandlerAdapter {

        private final Channel inboundChannel;

        public TcpProxyBackendHandler(Channel inboundChannel) {
            this.inboundChannel = inboundChannel;
        }

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ctx.read();
            ctx.write(Unpooled.EMPTY_BUFFER);
        }

        @Override
        public void channelRead(final ChannelHandlerContext ctx, Object msg)
                throws Exception {
            inboundChannel.writeAndFlush(msg).addListener(
                    new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future)
                                throws Exception {
                            if (future.isSuccess()) {
                                ctx.channel().read();
                            } else {
                                future.channel().close();
                            }
                        }
                    });
        }

        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            TcpProxyHandler.closeOnFlush(inboundChannel);
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            cause.printStackTrace();
            TcpProxyHandler.closeOnFlush(ctx.channel());
        }

    }

}
4

2 に答える 2

0

まだ試していない場合は、再度 AUTO_READ を有効にして、手動による read() の呼び出しを削除してください。自動読み取りをfalseに設定してサーバーを初期化することもできます。これも変更してみてください。

于 2015-04-19T20:41:01.977 に答える