多段パイプラインがあります。各ステージは個別のスレッドで実行され、制限されたBlockingArrayQueuesを使用して通信が行われます。スループットを向上させるために、最も遅いステージをマルチスレッド化しようとしています。
質問:これに推奨される実装は何ですか?この実装をよりシンプルで読みやすくするライブラリはありますか?
入力キュー->ステージ1(4スレッド)->境界キュー->ステージ2
要件:
入力キューの作業単位は独立しています。
作業単位は厳密に順序付けられています。出力は入力と同じ順序である必要があります。
ステージ1はスロットルする必要があります-出力が特定のサイズを超えると停止する必要があります。
ステージ1の例外により、出力キューに「ポイズンピル」が発生し、ExecutorServiceが終了します。キューに入れられたタスクは、最善を尽くして破棄する必要があります。
**私の提案した実装:**
限られた数のスレッドでThreadPoolExecutorを使用することを考えています。
厳密な順序付けは、各作業単位のカウントダウンラッチで実施されます。スレッドは、前のワークユニットのラッチが0で、キューにスペースがある場合にのみ結果をプッシュできます。これは、出力キューに空きができるまでスレッドがブロックされるため、スロットルも処理します。
class WorkUnit {
CountDownLatch previousLatch;
CountDownLatch myLatch;
}
class MyRunnable extends Runnable {
public void run() {
//do work...
previousLatch.await();
ouputQueue.put( result );
myLatch.countDown();
}
}
例外処理は私が少し困惑しているところです。例外が発生した場合にshutdownNow()を呼び出すThreadPoolExecutor.afterExecute()をオーバーライドすることを考えています。
class MyThreadPoolExecutor extends ThreadPoolExecutor {
protected void afterExecute(Runnable r, Throwable t) {
if(t != null) {
//record exection, log, alert, etc
ouput.put(POISON_PILL);
shutdownNow();
}
}
}