145

最初に大きなタスクを単純に N 個のサブタスクに分割し、それらを ( Executorsから) キャッシュされたスレッド プールに送信し、各タスクが完了するのを待つだけでなく、新しいfork/join フレームワークを使用する利点は何ですか? fork/join 抽象化を使用することで、問題が単純化されたり、何年も前から解決策がより効率的になったりすることがわかりません。

たとえば、チュートリアルの例の並列化されたぼかしアルゴリズムは、次のように実装できます。

public class Blur implements Runnable {
    private int[] mSource;
    private int mStart;
    private int mLength;
    private int[] mDestination;

    private int mBlurWidth = 15; // Processing window size, should be odd.

    public ForkBlur(int[] src, int start, int length, int[] dst) {
        mSource = src;
        mStart = start;
        mLength = length;
        mDestination = dst;
    }

    public void run() {
        computeDirectly();
    }

    protected void computeDirectly() {
        // As in the example, omitted for brevity
    }
}

最初に分割し、タスクをスレッド プールに送信します。

// source image pixels are in src
// destination image pixels are in dst
// threadPool is a (cached) thread pool

int maxSize = 100000; // analogous to F-J's "sThreshold"
List<Future> futures = new ArrayList<Future>();

// Send stuff to thread pool:
for (int i = 0; i < src.length; i+= maxSize) {
    int size = Math.min(maxSize, src.length - i);
    ForkBlur task = new ForkBlur(src, i, size, dst);
    Future f = threadPool.submit(task);
    futures.add(f);
}

// Wait for all sent tasks to complete:
for (Future future : futures) {
    future.get();
}

// Done!

タスクはスレッド プールのキューに移動し、ワーカー スレッドが使用可能になるとそこから実行されます。分割が十分に細分化されており (特に最後のタスクを待機する必要がないように)、スレッド プールに十分な (少なくとも N 個のプロセッサ) スレッドがある限り、計算全体が完了するまで、すべてのプロセッサがフル スピードで動作します。

何か不足していますか?fork/join フレームワークを使用することの付加価値は何ですか?

4

11 に答える 11

146

基本的な誤解は、フォーク/ジョインの例は仕事を盗むことを示しおらず、ある種の標準的な分割統治法のみを示しているということだと思います。

仕事を盗むことは次のようになります:労働者Bは彼の仕事を終えました。彼は親切な人なので、周りを見回すと、労働者Aがまだ非常に懸命に働いているのが見えます。彼は散歩して、「やあ、私はあなたに手を差し伸べることができます」と尋ねます。返信。「かっこいい、私は1000ユニットのこのタスクを持っています。これまでに、655を残して345を終了しました。番号673から1000で作業していただけませんか、346から672を実行します。」Bは「OK、早くパブに行けるように始めましょう」と言います。

ほら、実際の仕事を始めたときでさえ、労働者はお互いにコミュニケーションをとらなければなりません。これは、例に欠けている部分です。

一方、例は「下請け業者を使用する」のようなものだけを示しています。

労働者A:「ダン、私には1000単位の仕事があります。私には多すぎます。私は自分で500を行い、他の誰かに500を下請けします。」これは、大きなタスクがそれぞれ10ユニットの小さなパケットに分割されるまで続きます。これらは、利用可能なワーカーによって実行されます。しかし、1つのパケットが一種の毒薬であり、他のパケットよりもかなり長い時間がかかる場合、運が悪ければ、分割フェーズは終了します。

Fork / Joinとタスクを事前に分割することの唯一の違いは次のとおりです。事前に分割すると、最初から作業キューがいっぱいになります。例:1000ユニット、しきい値は10であるため、キューには100個のエントリがあります。これらのパケットはスレッドプールメンバーに配布されます。

Fork / Joinはより複雑で、キュー内のパケット数を少なくしようとします。

  • ステップ1:(1 ... 1000)を含む1つのパケットをキューに入れます
  • ステップ2:1人のワーカーがパケット(1 ... 1000)をポップし、(1 ... 500)と(501 ... 1000)の2つのパケットに置き換えます。
  • ステップ3:1人のワーカーがパケット(500 ... 1000)をポップし、(500 ... 750)と(751 ... 1000)をプッシュします。
  • ステップn:スタックには次のパケットが含まれています:(1..500)、(500 ... 750)、(750 ... 875)...(991..1000)
  • ステップn+​​1:パケット(991..1000)がポップされ、実行されます
  • ステップn+​​2:パケット(981..990)がポップされて実行されます
  • ステップn+​​3:パケット(961..980)がポップされ、(961 ... 970)と(971..980)に分割されます。...。

ご覧のとおり、Fork / Joinではキューが小さく(例では6)、「分割」フェーズと「作業」フェーズがインターリーブされています。

もちろん、複数のワーカーがポップして同時にプッシュしている場合、相互作用はそれほど明確ではありません。

于 2011-10-28T11:49:29.280 に答える
28

n 個のビジー スレッドがすべて独立して 100% で動作している場合、Fork-Join (FJ) プール内の n 個のスレッドよりも優れています。しかし、それは決してうまくいきません。

問題を n 個の等しい部分に正確に分割できない場合があります。たとえそうしたとしても、スレッドのスケジューリングは公平ではありません。最も遅いスレッドを待つことになります。複数のタスクがある場合、それらはそれぞれ n-way 未満の並列処理で実行できますが (一般的にはより効率的です)、他のタスクが終了すると n-way になります。

では、問題を FJ サイズの断片に切り分けて、スレッド プールで処理してみませんか。典型的な FJ の使用法では、問題を小さな断片に切り分けます。これらをランダムな順序で行うには、ハードウェア レベルで多くの調整が必要です。オーバーヘッドは致命的です。FJ では、タスクはスレッドが後入れ先出し順 (LIFO/スタック) で読み取るキューに入れられ、ワーク スチール (通常はコア ワークで) は先入れ先出し (FIFO/"queue") で行われます。その結果、たとえ小さなチャンクに分割されていても、長い配列の処理は大部分が順番に実行できるようになります。(また、問題を 1 つのビッグバンで小さな均一なサイズのチャンクに分割するのが簡単ではない場合もあります。たとえば、バランスをとらずに何らかの形式の階層を扱うとします。)

結論: FJ を使用すると、不均一な状況でハードウェア スレッドをより効率的に使用できます。これは、複数のスレッドがある場合に常に発生します。

于 2011-10-28T08:45:59.967 に答える
23

スレッド プールと Fork/Join の最終的な目標は似ています。つまり、どちらも利用可能な CPU パワーを最大限に活用して、最大のスループットを実現したいと考えています。最大スループットとは、できるだけ多くのタスクを長期間で完了する必要があることを意味します。そのためには何が必要ですか?(以下では、計算タスクが不足していないと仮定します: 100% の CPU 使用率を達成するには、常に十分な計算タスクがあります。さらに、ハイパースレッディングの場合は、コアまたは仮想コアに「CPU」を同等に使用します)。

  1. 少なくとも、使用可能な CPU と同じ数のスレッドを実行する必要があります。これは、実行するスレッドが少なくなるとコアが未使用のままになるためです。
  2. 最大で、使用可能な CPU と同じ数のスレッドを実行する必要があります。より多くのスレッドを実行すると、別のスレッドに CPU を割り当てるスケジューラに追加の負荷が発生し、一部の CPU 時間が計算タスクではなくスケジューラに費やされるためです。

したがって、スループットを最大にするには、CPU とまったく同じ数のスレッドが必要であることがわかりました。Oracle のぼかしの例では、使用可能な CPU の数と同じ数のスレッドを持つ固定サイズのスレッド プールを使用するか、スレッド プールを使用することができます。違いはありません、あなたは正しいです!

では、いつスレッド プールで問題が発生するのでしょうか? これは、スレッドが別のタスクの完了を待っているため、スレッドがブロックされた場合です。次の例を想定します。

class AbcAlgorithm implements Runnable {
    public void run() {
        Future<StepAResult> aFuture = threadPool.submit(new ATask());
        StepBResult bResult = stepB();
        StepAResult aResult = aFuture.get();
        stepC(aResult, bResult);
    }
}

ここに表示されているのは、3 つのステップ A、B、C で構成されるアルゴリズムです。A と B は互いに独立して実行できますが、ステップ C にはステップ A と B の結果が必要です。このアルゴリズムが行うことは、タスク A をサブミットすることです。スレッドプールを開き、タスク b を直接実行します。その後、スレッドはタスク A も完了するのを待ち、ステップ C に進みます。A と B が同時に完了した場合、すべて問題ありません。しかし、A が B よりも時間がかかる場合はどうなるでしょうか。これは、タスク A の性質が原因である可能性がありますが、最初に使用可能なタスク A のスレッドがなく、タスク A が待機する必要があるためです。(使用可能な CPU が 1 つしかなく、スレッドプールにスレッドが 1 つしかない場合、デッドロックが発生することさえありますが、今のところそれは重要ではありません)。ポイントは、タスク B を実行したばかりのスレッドがスレッド全体をブロックします。CPU と同じ数のスレッドがあり、1 つのスレッドがブロックされているため、1 つの CPU がアイドル状態であることを意味します。

Fork/Join はこの問題を解決します: fork/join フレームワークでは、同じアルゴリズムを次のように記述します。

class AbcAlgorithm implements Runnable {
    public void run() {
        ATask aTask = new ATask());
        aTask.fork();
        StepBResult bResult = stepB();
        StepAResult aResult = aTask.join();
        stepC(aResult, bResult);
    }
}

同じに見えますよね?ただし、手がかりは、それaTask.join がブロックされないことです。代わりに、ここでワークスティーリングの出番です。スレッドは、過去にフォークされた他のタスクを探して、それらを続行します。最初に、それ自体が fork したタスクが処理を開始したかどうかを確認します。したがって、A が別のスレッドによってまだ開始されていない場合は、次に A を実行します。それ以外の場合は、他のスレッドのキューをチェックして、それらの作業を盗みます。別のスレッドのこの他のタスクが完了すると、A が現在完了しているかどうかがチェックされます。上記のアルゴリズムである場合は、 を呼び出すことができますstepC。それ以外の場合は、スチールする別のタスクを探します。したがって、ブロック アクションに直面しても、プールの fork/join は 100% の CPU 使用率を達成できます

ただし、落とし穴があります。ワークスティーリングは s のjoin呼び出しでのみ可能ですForkJoinTask。別のスレッドの待機や I/O アクションの待機などの外部ブロッキング アクションに対しては実行できません。では、I/O の完了を待つのは一般的なタスクでしょうか? この場合、Fork/Join プールに追加のスレッドを追加できれば、ブロック アクションが完了するとすぐに再び停止することが 2 番目に良い方法です。s を使用している場合、ForkJoinPoolは実際にそれを行うことができますManagedBlocker

フィボナッチ

RecursiveTaskのJavaDoc には、Fork/Join を使用してフィボナッチ数を計算する例があります。従来の再帰的なソリューションについては、次を参照してください。

public static int fib(int n) {
    if (n <= 1) {
        return n;
    }
    return fib(n - 1) + fib(n - 2);
}

JavaDocs で説明されているように、これはフィボナッチ数を計算するためのかなりダンプ的な方法です。このアルゴリズムは O(2^n) の複雑さを持ちますが、より単純な方法が可能です。しかし、このアルゴリズムは非常にシンプルで理解しやすいので、私たちはそれを使い続けています。Fork/Join でこれをスピードアップしたいとしましょう。単純な実装は次のようになります。

class Fibonacci extends RecursiveTask<Long> {
    private final long n;

    Fibonacci(long n) {
        this.n = n;
    }

    public Long compute() {
        if (n <= 1) {
            return n;
        }
        Fibonacci f1 = new Fibonacci(n - 1);
        f1.fork();
        Fibonacci f2 = new Fibonacci(n - 2);
        return f2.compute() + f1.join();
   }
}

このタスクが分割されたステップは短すぎるため、これは恐ろしく実行されますが、フレームワークが一般的にどのようにうまく機能するかを見ることができます:結果。したがって、半分は別のスレッドで行われます。デッドロックを起こさずにスレッドプールで同じことを楽しんでください (可能ですが、それほど単純ではありません)。

完全を期すために:この再帰的アプローチを使用してフィボナッチ数を実際に計算したい場合は、最適化されたバージョンを次に示します。

class FibonacciBigSubtasks extends RecursiveTask<Long> {
    private final long n;

    FibonacciBigSubtasks(long n) {
        this.n = n;
    }

    public Long compute() {
        return fib(n);
    }

    private long fib(long n) {
        if (n <= 1) {
            return 1;
        }
        if (n > 10 && getSurplusQueuedTaskCount() < 2) {
            final FibonacciBigSubtasks f1 = new FibonacciBigSubtasks(n - 1);
            final FibonacciBigSubtasks f2 = new FibonacciBigSubtasks(n - 2);
            f1.fork();
            return f2.compute() + f1.join();
        } else {
            return fib(n - 1) + fib(n - 2);
        }
    }
}

が true の場合にのみサブタスクが分割されるため、これによりサブタスクが大幅に小さく保たれn > 10 && getSurplusQueuedTaskCount() < 2ます。つまり、100 をはるかに超える do へのメソッド呼び出しがあり ( n > 10)、すでに待機しているマン タスクはそれほど多くありません ( getSurplusQueuedTaskCount() < 2)。

私のコンピューター (4 コア (ハイパースレッディングを数える場合は 8)、Intel(R) Core(TM) i7-2720QM CPU @ 2.20GHz) ではfib(50)、従来のアプローチで 64 秒、Fork/Join アプローチでわずか 18 秒かかります。理論的に可能な限りではありませんが、かなり顕著なゲインです。

概要

  • はい、あなたの例では、フォーク/ジョインには従来のスレッドプールよりも利点がありません。
  • Fork/Join は、ブロッキングが関係している場合にパフォーマンスを大幅に向上させることができます
  • フォーク/ジョインはデッドロックの問題を回避します
于 2016-05-16T15:33:54.213 に答える
21

フォーク/ジョインは、ワーク スティーリングを実装するため、スレッド プールとは異なります。フォーク/ジョインから

他の ExecutorService と同様に、フォーク/ジョイン フレームワークはタスクをスレッド プール内のワーカー スレッドに分散します。fork/join フレームワークは、work-stealing アルゴリズムを使用するという点で異なります。やることがなくなったワーカー スレッドは、まだビジー状態の他のスレッドからタスクを盗むことができます。

2 つのスレッドと、それぞれ 1、1、5、6 秒かかる 4 つのタスク a、b、c、d があるとします。最初に、a と b がスレッド 1 に割り当てられ、c と d がスレッド 2 に割り当てられます。スレッド プールでは、これに 11 秒かかります。fork/join を使用すると、スレッド 1 が終了し、スレッド 2 から作業を盗むことができるため、タスク d はスレッド 1 によって実行されることになります。スレッド 1 は a、b、および d を実行し、スレッド 2 は c だけを実行します。全体の時間: 11 秒ではなく 8 秒。

編集:Joonasが指摘しているように、タスクは必ずしもスレッドに事前に割り当てられているわけではありません。fork/join の考え方は、スレッドがタスクを複数のサブピースに分割することを選択できるということです。したがって、上記を言い換えると:

それぞれ 2 秒と 11 秒かかる 2 つのタスク (ab) と (cd) があります。スレッド 1 は ab の実行を開始し、それを 2 つのサブタスク a と b に分割します。スレッド 2 と同様に、2 つのサブタスク c と d に分割されます。スレッド 1 が a と b を完了すると、スレッド 2 から d を盗むことができます。

于 2011-10-28T09:46:46.120 に答える
14

上記の誰もが、仕事を盗むことによって利益が得られるのは正しいですが、その理由をさらに詳しく説明します。

主な利点は、ワーカー スレッド間の効率的な調整です。作業を分割して再組み立てする必要があり、調整が必要です。上記の AH の回答からわかるように、各スレッドには独自の作業リストがあります。このリストの重要なプロパティは、並べ替えられていることです (大きなタスクが上部に、小さなタスクが下部に表示されます)。各スレッドは、そのリストの一番下にあるタスクを実行し、他のスレッド リストの一番上にあるタスクを盗みます。

この結果は次のとおりです。

  • タスク リストの先頭と末尾を個別に同期できるため、リストの競合が減少します。
  • 作業の重要なサブツリーは同じスレッドによって分割および再構築されるため、これらのサブツリーにはスレッド間の調整は必要ありません。
  • スレッドが作業を盗むとき、それは大きなピースを取り、それを独自のリストに分割します
  • ワークスチールとは、プロセスの最後までスレッドがほぼ完全に使用されることを意味します。

スレッド プールを使用する他のほとんどの分割統治方式では、スレッド間の通信と調整がさらに必要になります。

于 2012-06-22T14:11:47.843 に答える
8

もう1つの重要な違いは、FJを使用すると、複数の複雑な「参加」フェーズを実行できることです。http://faculty.ycp.edu/~dhovemey/spring2011/cs365/lecture/lecture18.htmlからのマージソートを検討してください。この作業を事前に分割するには、オーケストレーションが多すぎる必要があります。たとえば、次のことを行う必要があります。

  • 第1四半期を並べ替える
  • 第2四半期を並べ替える
  • 最初の2四半期をマージします
  • 第3四半期を並べ替える
  • 第4四半期を並べ替える
  • 最後の2四半期をマージします
  • 2つの半分をマージします

それらに関係するマージなどの前にソートを実行する必要があることをどのように指定しますか。

私は、アイテムのリストごとに特定のことを行うための最善の方法を検討してきました。リストを事前に分割して、標準のThreadPoolを使用すると思います。FJは、作業を十分に独立したタスクに事前に分割できないが、それらの間で独立しているタスクに再帰的に分割できる場合に最も役立つようです(たとえば、半分を並べ替えることは独立していますが、2つの並べ替えられた半分を並べ替えられた全体にマージすることはできません)。

于 2012-09-05T16:23:28.590 に答える
6

F/J には、高価なマージ操作がある場合にも明確な利点があります。ツリー構造に分割されるため、線形スレッド分割による n 個のマージとは対照的に、log2(n) 個のマージのみを行います。(これは、スレッドと同じ数のプロセッサを持っているという理論上の仮定ですが、それでも利点があります) 宿題のために、各インデックスの値を合計して、数千の 2D 配列 (すべて同じ次元) をマージする必要がありました。fork join と P プロセッサでは、P が無限大に近づくにつれて、時間は log2(n) に近づきます。

1 2 3 .. 7 3 1 .... 8 5 4
4 5 6 + 2 4 3 => 6 9 9
7 8 9 .. 1 1 0 .... 8 9 9

于 2012-12-03T04:59:19.690 に答える
3

クローラーのようなアプリケーションでの ForkJoin のパフォーマンスには驚かれることでしょう。ここから学ぶのに最適なチュートリアルです。

Fork/Join のロジックは非常に単純です。(1) 各大きなタスクを小さなタスクに分離 (フォーク) します。(2) 各タスクを個別のスレッドで処理します (必要に応じて、それらをさらに小さなタスクに分割します)。(3) 結果を結合します。

于 2015-10-19T15:27:59.037 に答える