1

多段パイプラインがあります。各ステージは個別のスレッドで実行され、制限されたBlockingArrayQueuesを使用して通信が行われます。スループットを向上させるために、最も遅いステージをマルチスレッド化しようとしています。

質問:これに推奨される実装は何ですか?この実装をよりシンプルで読みやすくするライブラリはありますか?

入力キュー->ステージ1(4スレッド)->境界キュー->ステージ2

要件

  • 入力キューの作業単位は独立しています。

  • 作業単位は厳密に順序付けられています。出力は入力と同じ順序である必要があります。

  • ステージ1はスロットルする必要があります-出力が特定のサイズを超えると停止する必要があります。

  • ステージ1の例外により、出力キューに「ポイズンピル」が発生し、ExecutorServiceが終了します。キューに入れられたタスクは、最善を尽くして破棄する必要があります。

**私の提案した実装:**

限られた数のスレッドでThreadPoolExecutorを使用することを考えています。

厳密な順序付けは、各作業単位のカウントダウンラッチで実施されます。スレッドは、前のワークユニットのラッチが0で、キューにスペースがある場合にのみ結果をプッシュできます。これは、出力キューに空きができるまでスレッドがブロックされるため、スロットルも処理します。

class WorkUnit {
   CountDownLatch previousLatch;
   CountDownLatch myLatch;
}

class MyRunnable extends Runnable {
   public void run() {
       //do work...
       previousLatch.await();
       ouputQueue.put( result );
       myLatch.countDown();
   }
}

例外処理は私が少し困惑しているところです。例外が発生した場合にshutdownNow()を呼び出すThreadPoolExecutor.afterExecute()をオーバーライドすることを考えています。

class MyThreadPoolExecutor extends ThreadPoolExecutor {
      protected void afterExecute(Runnable r, Throwable t) {
           if(t != null) {
               //record exection, log, alert, etc
               ouput.put(POISON_PILL);
               shutdownNow();
           }
      }
}
4

1 に答える 1

0

ExecutorService を使用するには、CountDownLatch.countDown()またはのようなブロック操作を使用せずに、完全に非同期の設計が必要BlockingQueue.take()です。Runnableアクション B がいくつかのイベントを待機する必要がある場合、そのイベントは、 を に送信することによってアクション B を開始する必要がありExecutorServiceます。

あなたの場合、キューの代わりにカスタム クラスを作成する必要があります。このクラスは、メッセージを受け入れて、それらを内部に保存するか、いくつかのルール (実行中の Stage1 タスクの数を制限するなど) に従って、Stage1 または Stage2 を実装するタスクを送信する必要があります。

注文に関しては、前のタスクの参照をシリアル番号に置き換えてください。

于 2013-01-12T09:25:49.060 に答える