0

私は、マルチスレッド TCP サーバーと単一クライアントの永続的な接続に Netty を使用しています。クライアントは多数のバイナリ メッセージ (私の使用例では 10000) を送信し、メッセージごとに応答を受け取ることになっています。OrderedMemoryAwareThreadPoolExecutor をパイプラインに追加して、複数のスレッドでの DB 呼び出しの実行を処理しました。

メソッド messageReceived() で DB 呼び出しを実行する (または Thread.currentThread().sleep(50) でシミュレートする) と、すべてのイベントが単一のスレッドによって処理されます。

    5 count of {main}
    1 count of {New
10000 count of {pool-3-thread-4}

messageReceived() の単純な実装の場合、サーバーは予想どおり多くのエグゼキュータ スレッドを作成します。

ビジネス ロジック用に複数のスレッド エグゼキューターを取得するには、ExecutionHandler をどのように構成すればよいですか?

これが私のコードです:

public class MyServer {

      public void run() {
            OrderedMemoryAwareThreadPoolExecutor eventExecutor = new OrderedMemoryAwareThreadPoolExecutor(16, 1048576L, 1048576L, 1000, TimeUnit.MILLISECONDS, Executors.defaultThreadFactory());  
            ExecutionHandler executionHandler = new ExecutionHandler(eventExecutor);        
            bootstrap.setPipelineFactory(new ServerChannelPipelineFactory(executionHandler));
      }
    }  



    public class ServerChannelPipelineFactory implements ChannelPipelineFactory {

      public ChannelPipeline getPipeline() throws Exception {

        pipeline.addLast("encoder", new MyProtocolEncoder());
        pipeline.addLast("decoder", new MyProtocolDecoder());
        pipeline.addLast("executor", executionHandler);
        pipeline.addLast("myHandler", new MyServerHandler(dataSource));

      }
    }

    public class MyServerHandler extends SimpleChannelHandler {

      public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) throws DBException {


          // long running DB call simulation
          try {
            Thread.currentThread().sleep(50);
          } catch (InterruptedException ex) {

          }  

          // a simple message  
          final MyMessage answerMsg = new MyMessage();
          if (e.getChannel().isWritable()) {
            e.getChannel().write(answerMsg);
          }  
      }      
    }
4

1 に答える 1