すばらしいライブラリ netty に感謝します。私は最新の 3.5.8 バージョンを使用しています。私のアプリケーション設計は以下の通りです
- エンド ユーザー (MyWebApp) からの要求を受け取る Web アプリケーションがあります。
- この Web アプリケーションは、受信したデータを変更して MyBean オブジェクトを構築し、処理のために MyServer (netty 上に構築) に接続します。
- MyServer はいくつかのビジネス ロジックを実行し、変更された MyBean オブジェクトを返します。
- MyWebApp はこれを受け取り、JSP 経由でエンド ユーザーに引き渡します
MyServer は正常に動作しており、リクエストを処理できます。
同時リクエストがある場合、クライアントがリクエストを遅らせるのを観察しました。多くのリクエストはミリ秒単位で処理されますが、ほとんどのリクエストが遅延する場合があります。遅延リクエストには 1 ~ 4 分かかる場合があります。しかし、私はこれらのリクエストがサーバーによって非常に遅く受信されるのを観察しましたが、すぐに処理されます. したがって、クライアントの実装を疑うことしかできません。「Channel」の接続プールを実装しようとしましたが、リクエストとレスポンスが一致しません。どちらの実装も私が行っているため、両側のコードを自由に変更できます。そのため、応答を処理した後でも接続を閉じないように、サーバーの messageReceived ソース コードを変更しました。要求を行ってチャネルを開き、応答を受け取った後にチャネルを閉じるのはクライアントです。
理想的には、1 つのチャネルまたはチャネルのプール、またはその他の実装を使用して、同時要求を処理したいと考えています。Netty はサーバーの実装には非常に簡単だと感じましたが、クライアントの実装はそのままでは私のような設計には適していません。これがnettyライブラリ自体の一部になることができれば、それは素晴らしいことです
どんなアイデアでも大歓迎です。
私のコードはクライアント側で以下の通りです
public class NioConnectorImpl implements ICoreConnector{
private String host;
private int port;
private static ChannelFactory channelFactory;
public NioConnectorImpl(String host, int port){
this.host = host;
this.port = port;
}
@Override
public MyBean processRequest(MyBean bean) {
return (MyBean) callMyServer(bean);
}
public Object callMyServer(Object requestObject) throws RuntimeException{
if(channelFactory==null){
System.out.println("createFactory with 3 Boss Threads and "+Runtime.getRuntime().availableProcessors() * 2+ " Child Threads");
channelFactory = new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool(),3, Runtime.getRuntime().availableProcessors() * 2);
}
ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);
// Set up the event pipeline factory.
bootstrap.setPipelineFactory(new MyChannelPipelineFactory(requestObject));
//Connection timeOut for initial connection...
bootstrap.setOption("connectTimeoutMillis", 10000);
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("keepAlive", true);
ChannelFuture connectFuture = bootstrap.connect(new InetSocketAddress(host, port));
// Wait until the connection is made successfully.
Channel channel = connectFuture.awaitUninterruptibly().getChannel();
// Get the handler instance to retrieve the answer.
MyChannelHandler handler = (MyChannelHandler) channel.getPipeline().getLast();
Object object = handler.getObjectBean();
channel.close().awaitUninterruptibly();
//bootstrap.releaseExternalResources();
//channelFactory.releaseExternalResources();
return object;
}
@Override
public void safeReleaseResources() {
System.out.println("Releasing External Resources::::::::::::");
if(channelFactory!=null)
channelFactory.releaseExternalResources();
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
MyChannelHandler コード
public class MyChannelHandler extends SimpleChannelUpstreamHandler{
private static final Logger logger = Logger.getLogger(MyChannelHandler.class.getName());
private final Object object;
final BlockingQueue<Object> answer = new LinkedBlockingQueue<Object>();
public MyChannelHandler(Object object) {
this.object= object;
}
public Object getObjectBean() {
boolean interrupted = false;
for (;;) {
try {
Object object = answer.take();
if (interrupted) {
Thread.currentThread().interrupt();
}
return object;
} catch (InterruptedException e) {
interrupted = true;
}
}
}
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
sendObject(e);
}
@Override
public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) {
if (e.getMessage() != null) {
// Offer the answer after closing the connection.
e.getChannel().close().addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
boolean offered = answer.offer((Object) e.getMessage());
assert offered;
}
});
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
logger.log(Level.WARNING,"Naga: Unexpected exception from downstream.",e.getCause());
e.getChannel().close();
}
private void sendObject(ChannelStateEvent e) {
Channel channel = e.getChannel();
if(channel.isWritable()) channel.write(object);
}
}
MyServer コード
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
Object object = MyServerController().handleRequest(e.getMessage(), e.getRemoteAddress().toString());
final ChannelFuture future = e.getChannel().write(object);
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture f) {
Channel ch = future.getChannel();
//ch.close(); // expect client closes the connection
}
});
}