0

次のように、計算を 8 つのスレッドに分割し、結果をファイルに書き込みます。

1a. 7 つのスレッドのそれぞれがその入力を処理し、その出力を独自の に書き込みますByteArrayOutputStream。ストリームが閉じると、スレッドoffers<Integer, ByteArrayOutputStream>aに変換され、 a (7 に初期化された) がConcurrentLinkedQueue呼び出されます。countDown()CountDownLatch

1b. 同時に、8 番目のスレッドが、次の反復で処理されるすべての入力データを読み取ります。このスレッドは、データの読み取りが終了awaitsしたときに実行されます。CountDownLatch

2a. がCountDownLatch0 になると、8 番目のスレッドが起動し、をソート キーとしてConcurrentinkedQueue使用しInteger<Integer, ByteArrayOutputStream>ソートし、キューを反復処理してバイト配列をファイルに追加します。(ソートせずに順番にリストをトラバースするより効率的な方法があるかもしれませんが、リストには 7 つの要素しかないため、ソート メソッドの実行時間は問題になりません。)

2b. 同時に、他の 7 つのスレッドは、8 番目のスレッドによって準備された入力を処理します。

** このプロセスは、すべてのデータが処理されるまでループします (通常、40 ~ 80 回の反復)。

各スレッドは、8 MB の同じサイズの入力チャンク (おそらく最後の反復を除く) を処理します。それぞれByteArrayOutputStreamに 1 ~ 4 MB が含まれており、出力サイズを事前に知ることはできません。通常、最も早く完了した CPU バウンド スレッドと最も遅く完了した CPU バウンド スレッドの実行時間は、互いに 20% 以内です。

すでにこのようなことをしている IO ライブラリ (または私が見逃した java.io または java.nio のメソッド) があるかどうか疑問に思っています。現在、8 番目のスレッド (IO スレッド) は約 75% アイドル状態です。しかし、この非効率性を軽減するために私が思いついた方法はどれも複雑すぎると思います (したがって、デッドロックやデータ競合を引き起こすという点でリスクが高すぎます)。たとえば、入力を 4 MB のチャンクに分割し、2 つのチャンクを 7 つの CPU バウンド スレッドに割り当て、1 つのチャンクを IO バウンド スレッドに割り当てると、理論的には IO スレッドのアイドル時間が 25% ( IO、4 mb チャンクで 50%、25% アイドル)、これは脆弱なソリューションであり、別の CPU に移植できない可能性があります (つまり、別の CPU では、ランタイムがCPU バウンド スレッドの 150%) - I'

4

2 に答える 2

1

非効率性は、スレッド 8 が処理する前に 7 つの出力すべてが完了するのを待つことにあります。キューを 1 つではなく 7 つ (ソーススレッドごとに 1 つ) 実行し、必要な順序で読み取る方がよいでしょう。そうすれば、最初のキューにデータがある場合、他の 6 つを待つ必要はなく、すぐに処理されます。キュー 2..6 についても同様です。スレッド 8 が最後のキューを終了すると、生成を開始できます。実際、特定のキューが生成を開始するのを待つ代わりに、それを実行している可能性もあります。

于 2013-05-16T03:43:17.513 に答える
0

アルゴリズムを次のように変更しました。

  1. 入力チャンクを CPU スレッドに直接割り当てるのではなくBlockingQueue、CPU スレッドpoll/takeが作業チャンクを取得する にチャンクを配置します。
  2. 出力はConcurrentSkipListMap<Integer, ByteArrayOutputStream>
  3. CPU スレッドは、キャンセルされるまで単純にループします。IO スレッドはConcurrentSkipListMap(を使用してfirstKey) をピークして、書き込まれるデータがあるかどうかを確認します (出力ストリームが順番に書き込まれることを確認するために、次のキーがどうあるべきかのカウンターを維持します)。BlockingQueueデータを追加する必要があるかどうかを確認します (その場合queue.size() < N、さらに N 個のチャンクを追加します。ここで、N は最初は 12 です)。いずれかまたは両方の IO タスクを実行した場合はループし、そうでない場合は からチャンクを処理してからBlockingQueueループします。

BlockingQueue入力全体が IO スレッドによってすでに処理されていない限り、 を空にすることはできません。空のキューは、しきい値queue.size() < Nを上げる必要があることを示します。このため、CPU スレッドのロジックは

while(!cancel) {
    try {
        Input input = queue.poll();
        if(input == null) {
            log.warn("Empty queue");
            input = queue.take();
        }
        process(input);
    } catch (InterruptedException ex) {
        cancel = true;
    }
}
于 2013-05-16T16:17:18.720 に答える