0

まだ開いていないチャネルに書き込もうとしているため、次のコードで java.nio.channels.NotYetConnectedException が発生します。

基本的に、私が持っているのはチャンネルプールで、空きがある場合は書き込み用のチャンネルを取得し、利用できない場合は新しいチャンネルを作成します。私の問題は、新しいチャネルを作成するときに、接続を呼び出したときにチャネルが書き込みの準備ができておらず、スレッドをブロックしたくないため、戻る前に接続が開くのを待ちたくないことです。これを行う最善の方法は何ですか?また、チャネルを取得/返すための私のロジックは有効ですか? 以下のコードを参照してください。

次のような単純な接続プールがあります。

private static class ChannelPool {
    private final ClientBootstrap cb;
    private Set<Channel> activeChannels = new HashSet<Channel>();
    private Deque<Channel> freeChannels = new ArrayDeque<Channel>();
    public ChannelPool() {
        ChannelFactory clientFactory =
                new NioClientSocketChannelFactory(
                        Executors.newCachedThreadPool(),
                        Executors.newCachedThreadPool());
        cb = new ClientBootstrap(clientFactory);
        cb.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() {
                return Channels.pipeline(
                        new HttpRequestEncoder(),
                        new HttpResponseDecoder(),
                        new ResponseHandler());
            }
        });
    }

    private Channel newChannel() {
        ChannelFuture cf;
        synchronized (cb) {
            cf = cb.connect(new InetSocketAddress("localhost", 18080));
        }
        final Channel ret = cf.getChannel();
        ret.getCloseFuture().addListener(new ChannelFutureListener() {

            @Override
            public void operationComplete(ChannelFuture arg0) throws Exception {
                System.out.println("channel closed?");
                synchronized (activeChannels) {
                    activeChannels.remove(ret);
                }
            }
        });
        synchronized (activeChannels) {
            activeChannels.add(ret);
        }
        System.out.println("returning new channel");
        return ret;
    }

    public Channel getFreeChannel() {
        synchronized (freeChannels) {
            while (!freeChannels.isEmpty()) {
                Channel ch = freeChannels.pollFirst();
                if (ch.isOpen()) {
                    return ch;
                }
            }
        }
        return newChannel();
    }

    public void returnChannel(Channel ch) {
        synchronized (freeChannels) {
            freeChannels.addLast(ch);
        }
    }
}

次のように、ハンドラー内でこれを使用しようとしています。

private static class RequestHandler extends SimpleChannelHandler {

    @Override
    public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) {
        final HttpRequest request = (HttpRequest) e.getMessage();
        Channel proxyChannel = pool.getFreeChannel();
        proxyToClient.put(proxyChannel, e.getChannel());
        proxyChannel.write(request);
    }
}
4

1 に答える 1