私は自分の処理パイプラインを実装するための最良の方法に取り組んでいます。
私のプロデューサーは、BlockingQueueに作業をフィードします。コンシューマー側では、キューをポーリングし、Runnableタスクで取得したものをラップして、ExecutorServiceに送信します。
while (!isStopping())
{
String work = workQueue.poll(1000L, TimeUnit.MILLISECONDS);
if (work == null)
{
break;
}
executorService.execute(new Worker(work)); // needs to block if no threads!
}
これは理想的ではありません。もちろん、ExecutorServiceには独自のキューがあるので、実際に起こっていることは、常に作業キューを完全に使い果たし、タスクキューをいっぱいにしていることです。タスクキューは、タスクが完了するとゆっくりと空になります。
プロデューサー側でタスクをキューに入れることができることはわかっていますが、実際にはそうはしません。作業キューの間接参照/分離がダム文字列であることが好きです。彼らに何が起こるかは、実際にはプロデューサーの仕事ではありません。プロデューサーにRunnableまたはCallableをキューに入れるように強制すると、抽象化、IMHOが壊れます。
ただし、共有ワークキューで現在の処理状態を表す必要があります。消費者が追いついていない場合は、生産者をブロックできるようにしたいと思います。
エグゼキュータを使いたいのですが、エグゼキュータのデザインと戦っているような気がします。Kool-adeを部分的に飲むことはできますか、それとも飲み込む必要がありますか?私はキューイングタスクに抵抗することに頭を悩ませていますか?(1タスクキューを使用するようにThreadPoolExecutorを設定し、そのexecuteメソッドをオーバーライドして、reject-on-queue-fullではなくブロックすることができると思いますが、それはひどい感じです。)
提案?