0

Netty 4 を使用してかなり単純なサーバーを作成しました。数千の接続を処理するようにスケールアップすることができましたが、最大 40 スレッドを超えることはありません。

サーバースレッドの数は一定のまま (Java VisualVM 経由)

テストするために、数千の接続を作成するテスト クライアントも作成しました。残念ながら、これは接続を作成するのと同じ数のスレッドを作成します。クライアントのスレッドを最小限に抑えることを望んでいました。私はこれについて多くの投稿を見てきました。多くの例は、単一接続のセットアップを示しています。 これこれは、クライアント間で NioEventLoopGroup を共有することを示しています。限られた数の nioEventLoopGroup を取得していますが、他の場所で接続ごとにスレッドを取得しています。パイプラインで意図的にスレッドを作成しているのではなく、何ができるかわかりません。

クライアントスレッドの数は、接続数とともに増加します (Java VisualVM 経由)

スレッド (Java VisualVM 経由)

これは、クライアント コードのセットアップの一部です。これまでの調査に基づいて、一定のスレッド数を維持する必要があるようです。クライアント接続ごとのスレッドを防ぐために私がしなければならないことはありますか?

主要

final EventLoopGroup group = new NioEventLoopGroup();

for (int i=0; i<100; i++)){
    MockClient client = new MockClient(i, group);
    client.connect();
}

モッククライアント

public class MockClient implements Runnable {

    private final EventLoopGroup group;

    private int identity;

    public MockClient(int identity, final EventLoopGroup group) {
        this.identity = identity;
        this.group = group;
    }

    @Override
    public void run() {
        try {
            connect();
        } catch (Exception e) {}
    }

    public void connect() throws Exception{

        Bootstrap b = new Bootstrap();
        b.group(group)
         .channel(NioSocketChannel.class)
         .handler(new MockClientInitializer(identity, this));

        final Runnable that = this;
        // Start the connection attempt
        b.connect(config.getHost(), config.getPort()).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    Channel ch = future.sync().channel();
                } else {
                    //if the server is down, try again in a few seconds
                    future.channel().eventLoop().schedule(that, 15, TimeUnit.SECONDS); 
                }
            }
        });
    }
}
4

1 に答える 1

1

これまで何度も経験したことですが、問題を詳細に説明すると、さらに考えさせられ、問題に出くわしました。何千もの Netty クライアントを作成する際に他の誰かが同じ問題に遭遇した場合に備えて、ここで提供したいと思います。

パイプラインには、クライアント接続の再起動をシミュレートするタイムアウト タスクを作成するパスが 1 つあります。接続ごとにスレッドが存在するまで、サーバーから「再起動」シグナルを受信するたびに (頻繁に発生します)、接続ごとに余分なスレッドを作成していたのは、このタイマー タスクであることが判明しました。

ハンドラ

private final HashedWheelTimer timer;

@Override
protected void channelRead0(ChannelHandlerContext ctx, Packet msg) throws Exception {

    Packet packet = reboot();

    ChannelFutureListener closeHandler = new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            RebootTimeoutTask timeoutTask = new RebootTimeoutTask(identity, client);
            timer.newTimeout(timeoutTask, SECONDS_FOR_REBOOT, TimeUnit.SECONDS);
        }
    };

    ctx.writeAndFlush(packet).addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()) {
                future.channel().close().addListener(closeHandler);
            } else {
                future.channel().close();
            }
        }
    });

}

タイムアウトタスク

public class RebootTimeoutTask implements TimerTask {

    public RebootTimeoutTask(...) {...}

    @Override
    public void run(Timeout timeout) throws Exception {
        client.connect(identity);
    }

}
于 2014-06-13T00:17:22.877 に答える