私は、マルチスレッド TCP サーバーと単一クライアントの永続的な接続に Netty を使用しています。クライアントは多数のバイナリ メッセージ (私の使用例では 10000) を送信し、メッセージごとに応答を受け取ることになっています。OrderedMemoryAwareThreadPoolExecutor をパイプラインに追加して、複数のスレッドでの DB 呼び出しの実行を処理しました。
メソッド messageReceived() で DB 呼び出しを実行する (または Thread.currentThread().sleep(50) でシミュレートする) と、すべてのイベントが単一のスレッドによって処理されます。
5 count of {main}
1 count of {New
10000 count of {pool-3-thread-4}
messageReceived() の単純な実装の場合、サーバーは予想どおり多くのエグゼキュータ スレッドを作成します。
ビジネス ロジック用に複数のスレッド エグゼキューターを取得するには、ExecutionHandler をどのように構成すればよいですか?
これが私のコードです:
public class MyServer {
public void run() {
OrderedMemoryAwareThreadPoolExecutor eventExecutor = new OrderedMemoryAwareThreadPoolExecutor(16, 1048576L, 1048576L, 1000, TimeUnit.MILLISECONDS, Executors.defaultThreadFactory());
ExecutionHandler executionHandler = new ExecutionHandler(eventExecutor);
bootstrap.setPipelineFactory(new ServerChannelPipelineFactory(executionHandler));
}
}
public class ServerChannelPipelineFactory implements ChannelPipelineFactory {
public ChannelPipeline getPipeline() throws Exception {
pipeline.addLast("encoder", new MyProtocolEncoder());
pipeline.addLast("decoder", new MyProtocolDecoder());
pipeline.addLast("executor", executionHandler);
pipeline.addLast("myHandler", new MyServerHandler(dataSource));
}
}
public class MyServerHandler extends SimpleChannelHandler {
public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) throws DBException {
// long running DB call simulation
try {
Thread.currentThread().sleep(50);
} catch (InterruptedException ex) {
}
// a simple message
final MyMessage answerMsg = new MyMessage();
if (e.getChannel().isWritable()) {
e.getChannel().write(answerMsg);
}
}
}