4

私が分析しているコードは、Netty NioDatagramChannelFactory で UDP サーバーを作成します。以下を使用してスレッドプールを作成します。

ExecutorService threadPool = Executors.newCachedThreadPool();

次に、データグラム チャネル、pipelineFactory & ブートストラップ:

int workerCount = 10;
DatagramChannelFactory datagramChannelFactory = new NioDatagramChannelFactory(threadPool, workerCount);
ChannelPipelineFactory pipelineFactory = new SNMPTrapsPipeLineFactory();

ConnectionlessBootstrap bootStrap = new ConnectionlessBootstrap(datagramChannelFactory);
bootStrap.setPipelineFactory(pipelineFactory);
bootStrap.bind(new InetSocketAddress(host, port));

pipelineFactory では、getPipeline() がカスタム ハンドラを追加します。

それが言われているように: UDPメッセージのマルチスレッド処理

受信したメッセージを処理するスレッドは 1 つだけです。ログでは、スレッド名は次のようにNew I/O datagram worker #1として表示されます。

2012-04-20 09:20:51,853 新しい I/O データグラム ワーカー #1'-'1 INFO [cemrshSNMPTrapsRequestHandler:42] messageReceived | 処理:V1TRAP[reqestID=0, ...]

ドキュメントとこのエントリを読みました: Netty を使用した UDP サーバーで失われた UDP 要求のロット

そして、それらのエントリに従ってコードを少し変更しました。これで、スレッド プールが次のように作成されます。

int corePoolSize = 5;
ExecutorService threadPool = new OrderedMemoryAwareThreadPoolExecutor(corePoolSize, 1048576, 1048576);

そして、パイプラインファクトリーと ExecutionHandler:

ExecutionHandler executionHandler = new ExecutionHandler(threadPool);
ChannelPipelineFactory pipelineFactory = new SNMPTrapsPipeLineFactory(executionHandler);

そして、 getPipeline() は、次のようにハンドラーを追加します。

public class SNMPTrapsPipeLineFactory implements ChannelPipelineFactory {

    private ExecutionHandler executionHandler = null;

    public SNMPTrapsPipeLineFactory(ExecutionHandler executionHandler) { 
        this.executionHandler = executionHandler;
    }

    @Override
    public ChannelPipeline getPipeline() throws Exception {

        ChannelPipeline pipeline = Channels.pipeline();
        pipeline.addFirst("ExecutorHandler", executionHandler);

        // Here the custom handlers are added
        pipeline.addLast( ... )
    }

現在、ログに 4 つの異なるスレッド名が記録されています。それらは、pool-2-thread-1pool-2-thread-2などとして表示されます...

例えば:

2012-05-09 09:12:19,589 プール 2 スレッド 1 情報 [cemrshSNMPTrapsRequestHandler:46] messageReceived | 処理:V1TRAP[reqestID=0, ...]

ただし、それらは同時に処理されません。messageReceived() の下の処理は、次のメッセージを処理するために次のスレッドで終了する必要があります。さまざまなクライアントからサーバーに大量のメッセージを送信しましたが、取得したログはインターレースされていません。また、messageReceived() 内で Thread.sleep() を試み、前のことを確認しました。

何か不足していますか?Netty で REAL マルチスレッド UDP サーバーを実現する方法はありますか? 複数のスレッドで messageReceived() を同時に実行するにはどうすればよいですか?

4

2 に答える 2

1

私の経験と UDP を使用した Netty の理解に基づいて、デコードのために UDP メッセージを処理するスレッドが 1 つしかないのが普通です。UDP はセッションレスであるため、1 つの UDP ポートでデータを受信して​​デコードできるスレッドは 1 つだけです。

データをデコードしてバッファまたは特定の Java オブジェクトにラップしたら、そのオブジェクトを処理するスレッドのプールに入れることができます (実行ハンドラ -> ビジネス ハンドラ)。次に、以前にデコードされたデータを実行ハンドラーに解放すると、UDP ポートの新しい今後のデータをデコードできます。

NioDatagramChannelFactory の作成時に指定できるプールのスレッドは、複数のポートでデータをリッスンする場合にのみ使用されます。ポートごとに 1 つのスレッドのみが意味を持ちます。そのコンストラクターで 100 個のワーカーを指定しても、1 つの UDP ポートを構成した場合は 1 つだけが使用されます。

于 2012-05-13T18:07:56.727 に答える
0

私が驚いたことの 1 つは、実行ハンドラーをパイプラインの最初に配置したことです。「アプリケーション」ハンドラーまでのパイプライン全体が、IOとデコードを実行する IO スレッドによって実行される必要があるという意図があると思います。

したがって、最初にすべての SNMPTrap デコード ハンドラーを追加し、実際の SNMPTrap がある場合は実行ハンドラーに渡され、トラップが実際のトラップのコンシューマーに渡されることを強くお勧めします。何か役に立つことをする。

@Override
public ChannelPipeline getPipeline() throws Exception {

    ChannelPipeline pipeline = Channels.pipeline(
         new SomethingSomethingDecoder(),
         new SNMPTrapDecoder(),
         executionHandler.
         snmpTrapConsumerHandler
    );
}

少なくとも、それがExecutionHandler javadoc に示されている方法であり、上記は私の解釈です。

于 2012-05-09T21:05:23.037 に答える