特定の処理を別のスレッドに分離できるようにする必要があります。特定のメッセージのみが、この追加の処理を必要とします。これには、ネイティブ コールとリモート リソースが含まれます。したがって、私は自分のスレッドプール/キューを使用してこの処理を厳密に制御できるようにしたいと考えています。
私の最初の実装では、ChannelHandler からアクセスできる ThreadPoolExecutor を使用します。ハンドラーが特別な処理が必要なメッセージを特定したら、処理のためにワーカーをエグゼキューターに送信します。注、その時点で ExecutionHandler の拡張とフィルタリングに関する投稿を見ましたが、それでも同じ問題が発生すると思います。
問題は、ワーカー プロセスが完了した後、パイプラインで定義された ExecutionHandler/ThreadPool でメッセージを続行したいということです。したがって、「ワーカー」スレッドを解放します。ここでもワーカー スレッドプールには、キュー サイズ、最大スレッド数などに関して厳しい制限があります。
以下は私の Worker (私の ChannleHandler の内部クラス) です。下部に向かって、パイプラインで使用される ExecutionHandler を取得し、handleUpstream メソッドを呼び出します。私にはうまくいくようですが、私が考慮していない隠れた問題があるかどうか疑問に思っています...
public class EncryptionWorker implements Runnable
{
private ChannelHandlerContext _ctx;
private MessageEvent _e;
public EncryptionWorker(ChannelHandlerContext ctx, MessageEvent e)
{
_ctx = ctx;
_e = e;
}
@Override
public void run()
{
Object m = _e.getMessage();
if ( !( m instanceof AESSLMessageCtx ) )
{
LOGGER.error( "Invalid Message sent to EncryptionWorker.");
return;
}
AESSLMessageCtx msgContext = (AESSLMessageCtx) m;
LOGGER.info( "EncryptionWorker initiated, id: " + msgContext.getIdentifier());
// TODO - Process the Message.
try
{
// Encryption needs to be isolated from main pipeline.
// Only occurs for special messages; could block for
// a period of time; do not want it affecting normal
// traffic.
//.......
Thread.sleep( 2000 );
}
catch (Exception ex)
{
LOGGER.warn( "Exception processing encrypted message, id: " + msgContext.getIdentifier(), ex );
sendError( _ctx.getChannel(), msgContext, TxnMessageCtx.ErrorReason.SYSLEVEL_ERROR );
return;
}
// Continue processing message as normal....
// Do not want to do "_ctx.sendUpstream( _e )"
// because it continues to use my EncryptionWorker Thread.
// I would rather do something like the following in order to
// get it back into service pipeline Executor Threadpool.
// Get ExecutionHandler being used in pipeline.
ChannelHandler ch = _ctx.getPipeline().get( "executor" );
if (ch == null || !(ch instanceof ExecutionHandler))
{
LOGGER.error( "ExecutionHandler not found.");
return;
}
ExecutionHandler execHandler = (ExecutionHandler) ch;
try
{
// Use ExecutionHandler Executor to continue processing
execHandler.handleUpstream( _ctx, _e );
}
catch (Exception ex)
{
LOGGER.error( "Exception initiating upstream handling after encryption processing, tpdu: " + txnContext.getIdentifier(), ex );
sendError( _ctx.getChannel(), msgContext, TxnMessageCtx.ErrorReason.SYSLEVEL_ERROR );
return;
}
return;
}
}