5

ステートメントの背後にある合理性を理解しようとして いますブロッキングが絶対に必要な場合は、先物をブロックすることができます (推奨されませんが)

背後にあるアイデアForkJoinPoolは、操作をブロックしているプロセスに参加することです。これは、フューチャーとアクターのエグゼキューター コンテキストの主な実装です。結合をブロックするのに効果的です。

私は小さなベンチマークを書きましたが、この非常に単純なシナリオでは、古いスタイルの先物 (scala 2.9) が 2 倍高速であるように見えます。

@inline
  def futureResult[T](future: Future[T]) = Await.result(future, Duration.Inf)

  @inline
  def futureOld[T](body: => T)(implicit  ctx:ExecutionContext): () => T = {
    val f = future(body)
    () => futureResult(f)
  }

  def main(args: Array[String]) {
    @volatile

    var res = 0d
    CommonUtil.timer("res1") {
      (0 until 100000).foreach {  i =>
       val f1 = futureOld(math.exp(1))
        val f2 = futureOld(math.exp(2))
        val f3 = futureOld(math.exp(3))
        res = res + f1() + f2() + f3()
      }
    }
    println("res1 = "+res)
    res = 0

    res = 0
    CommonUtil.timer("res1") {
      (0 until 100000).foreach {  i =>
        val f1 = future(math.exp(1))
        val f2 = future(math.exp(2))
        val f3 = future(math.exp(3))
        val f4 = for(r1 <- f1; r2 <- f2 ; r3 <- f3) yield r1+r2+r3
        res = res + futureResult(f4)
      }
    }
    println("res2 = "+res)
  }



start:res1
res1 - 1.683 seconds
res1 = 3019287.4850644027
start:res1
res1 - 3.179 seconds
res2 = 3019287.485058338
4

2 に答える 2

10

Future のポイントのほとんどは、並列で簡単に実行できるノンブロッキングの並行コードを作成できることです。

OK、将来的には潜在的に長い関数をラップすると、すぐに返されるので、実際に興味があるまで戻り値について心配することを延期できます。しかし、値を気にするコードの部分が結果が実際に利用可能になるまでブロックするだけの場合、得られるのはコードを少し整理する方法だけです (そして、先物なしでそれを行うことができます-先物を使用してあなたのコードを整理することはコードのにおいになると思います)。future でラップされている関数がまったく自明でない限り、コードは他の式を評価するよりもブロックに多くの時間を費やすことになります。

一方、( onCompleteonSuccessなどを使用して) コールバックを登録し、そのコールバックに結果を処理するコードを配置すると、コードを編成して非常に効率的に実行し、適切にスケーリングすることができます。座って結果を待つのではなく、イベント駆動型になります。

あなたのベンチマークは前者のタイプですが、そこにはいくつかの小さな関数があるため、それらを順番に実行する場合と比較して並列に実行する場合で得られるものはほとんどありません。これは、Future の作成とアクセスのオーバーヘッドを主に評価していることを意味します。おめでとうございます: 状況によっては、2.9 先物は 2.10 よりも些細なことを行うのが速いことを示しました。

もう少し複雑で要求の厳しいものを試してください。つまり、あなたはほぼ即座に将来の値を要求しています! 少なくとも、100000 個の先物の配列を構築し、その結果を別のループで引き出すことができます。それは少し意味のある何かをテストすることになります。ああ、そしてiの値に基づいて何かを計算させます。

そこから先に進むことができます

  1. 結果を格納するオブジェクトを作成します。
  2. 結果をオブジェクトに挿入する各 Future にコールバックを登録します。
  3. n計算の開始

そして、すべてを要求したときに、実際の結果が得られるまでにかかる時間をベンチマークします。そのほうが意味深でしょう。

編集

ところで、あなたのベンチマークは、それ自体の条件と、先物の適切な使用に関する理解の両方で失敗しています。

まず、個々の将来の結果を取得するのにかかる時間を数えていますが、3 つの先物すべてが作成された後にresを評価するのにかかる実際の時間や、ループを反復するのにかかる合計時間はカウントしていません。また、あなたの数学的計算は非常に簡単なので、a) 理解のための 2 番目のテストと、b) 最初の 3 つの先物がラップされている 4 番目の先物で実際にペナルティをテストしている可能性があります。

第 2 に、これらの合計が全体の使用時間にほぼ比例するようになる唯一の理由は、まさにここに同時実行性がないからです。

私はあなたを殴ろうとしているわけではありません。ベンチマークのこれらの欠陥が問題を明らかにするのに役立つだけです. さまざまな先物実装のパフォーマンスの適切なベンチマークには、非常に慎重な検討が必要です。

于 2013-09-10T09:31:05.963 に答える
6

ForkJoinTaskレポートの Java7 ドキュメント:

ForkJoinTask は、Future の軽量形式です。ForkJoinTasks の効率は、純粋な関数を計算する計算タスクまたは純粋に分離されたオブジェクトを操作する計算タスクとしての使用目的を反映した一連の制限 (部分的にのみ静的に強制可能) に由来します。主な調整メカニズムは、非同期実行を調整する fork() と、タスクの結果が計算されるまで先に進まない join() です。計算は、同期されたメソッドまたはブロックを回避する必要があり、他のタスクに参加すること、または fork/join スケジューリングと連携するためにアドバタイズされている Phaser などのシンクロナイザーを使用することを除いて、他のブロッキング同期を最小限に抑える必要があります。タスクはブロッキング IO も実行しないでください。理想的には、他の実行中のタスクによってアクセスされる変数とは完全に独立した変数にアクセスする必要があります。共有出力ストリームの使用など、これらの制限の軽微な違反は、実際には許容できる場合がありますが、頻繁に使用するとパフォーマンスが低下する可能性があり、IO またはその他の外部同期を待機していないスレッドの数が使い果たされた場合に無期限に停止する可能性があります。この使用制限は、IOExceptions などのチェック済み例外のスローを許可しないことによって部分的に適用されます。ただし、計算では未チェックの例外が発生する可能性があり、これらの例外は、それらに参加しようとする呼び出し元に再スローされます。これらの例外には、内部タスク キューの割り当ての失敗など、内部リソースの枯渇に起因する RejectedExecutionException がさらに含まれる場合があります。再スローされた例外は通常の例外と同じように動作しますが、可能な場合は、計算を開始したスレッドと実際に例外に遭遇したスレッドの両方のスタック トレース (ex.printStackTrace() を使用して表示されるなど) が含まれます。最小限、後者のみ。

Doug Lea のJSR166 (JDK8 を対象とする)のメンテナンス リポジトリは、次のように拡張されています。

ForkJoinTask は、Future の軽量形式です。ForkJoinTasks の効率性は、純粋な関数を計算する計算タスクまたは純粋に分離されたオブジェクトを操作する計算タスクとしての主な用途を反映した一連の制限 (部分的にのみ静的に強制可能) に由来します。主な調整メカニズムは、非同期実行を調整する fork() と、タスクの結果が計算されるまで先に進まない join() です。計算は、理想的には、同期されたメソッドまたはブロックを回避する必要があり、他のタスクへの参加や、フォーク/ジョイン スケジューリングと連携するように宣伝されている Phaser などのシンクロナイザーの使用とは別に、他のブロック同期を最小限に抑える必要があります。また、分割可能なタスクはブロッキング I/O を実行すべきではありません。理想的には、他の実行中のタスクによってアクセスされる変数とは完全に独立した変数にアクセスする必要があります。これらのガイドラインは、IOExceptions などのチェック済み例外のスローを許可しないことで、緩やかに適用されます。ただし、計算では未チェックの例外が発生する可能性があり、これらの例外は、それらに参加しようとする呼び出し元に再スローされます。これらの例外には、内部タスク キューの割り当ての失敗など、内部リソースの枯渇に起因する RejectedExecutionException がさらに含まれる場合があります。再スローされた例外は通常の例外と同じように動作しますが、可能な場合は、計算を開始したスレッドと実際に例外に遭遇したスレッドの両方のスタック トレース (ex.printStackTrace() を使用して表示されるなど) が含まれます。最小限、後者のみ。これらのガイドラインは、IOExceptions などのチェック済み例外のスローを許可しないことで、緩やかに適用されます。ただし、計算では未チェックの例外が発生する可能性があり、これらの例外は、それらに参加しようとする呼び出し元に再スローされます。これらの例外には、内部タスク キューの割り当ての失敗など、内部リソースの枯渇に起因する RejectedExecutionException がさらに含まれる場合があります。再スローされた例外は通常の例外と同じように動作しますが、可能な場合は、計算を開始したスレッドと実際に例外に遭遇したスレッドの両方のスタック トレース (ex.printStackTrace() を使用して表示されるなど) が含まれます。最小限、後者のみ。これらのガイドラインは、IOExceptions などのチェック済み例外のスローを許可しないことで、緩やかに適用されます。ただし、計算では未チェックの例外が発生する可能性があり、これらの例外は、それらに参加しようとする呼び出し元に再スローされます。これらの例外には、内部タスク キューの割り当ての失敗など、内部リソースの枯渇に起因する RejectedExecutionException がさらに含まれる場合があります。再スローされた例外は通常の例外と同じように動作しますが、可能な場合は、計算を開始したスレッドと実際に例外に遭遇したスレッドの両方のスタック トレース (ex.printStackTrace() を使用して表示されるなど) が含まれます。最小限、後者のみ。計算はまだチェックされていない例外に遭遇する可能性があり、それらに参加しようとする呼び出し元に再スローされます。これらの例外には、内部タスク キューの割り当ての失敗など、内部リソースの枯渇に起因する RejectedExecutionException がさらに含まれる場合があります。再スローされた例外は通常の例外と同じように動作しますが、可能な場合は、計算を開始したスレッドと実際に例外に遭遇したスレッドの両方のスタック トレース (ex.printStackTrace() を使用して表示されるなど) が含まれます。最小限、後者のみ。計算はまだチェックされていない例外に遭遇する可能性があり、それらに参加しようとする呼び出し元に再スローされます。これらの例外には、内部タスク キューの割り当ての失敗など、内部リソースの枯渇に起因する RejectedExecutionException がさらに含まれる場合があります。再スローされた例外は通常の例外と同じように動作しますが、可能な場合は、計算を開始したスレッドと実際に例外に遭遇したスレッドの両方のスタック トレース (ex.printStackTrace() を使用して表示されるなど) が含まれます。最小限、後者のみ。再スローされた例外は通常の例外と同じように動作しますが、可能な場合は、計算を開始したスレッドと実際に例外に遭遇したスレッドの両方のスタック トレース (ex.printStackTrace() を使用して表示されるなど) が含まれます。最小限、後者のみ。再スローされた例外は通常の例外と同じように動作しますが、可能な場合は、計算を開始したスレッドと実際に例外に遭遇したスレッドの両方のスタック トレース (ex.printStackTrace() を使用して表示されるなど) が含まれます。最小限、後者のみ。

ブロックする可能性のある ForkJoinTasks を定義して使用することは可能ですが、実行するにはさらに 3 つの考慮事項が必要です。決して結合されないイベント スタイルの非同期タスク (たとえば、CountedCompleter をサブクラス化するタスク) は、多くの場合、このカテゴリに分類されます。(2) リソースへの影響を最小限に抑えるために、タスクは小さくする必要があります。理想的には、(おそらく) ブロッキング アクションのみを実行します。(3) ForkJoinPool.ManagedBlocker API が使用されていない場合、またはブロックされる可能性のあるタスクの数がプールの ForkJoinPool.getParallelism() レベルよりも少ないことがわかっている場合を除き、プールは、進行または良好なパフォーマンスを確保するために十分なスレッドが使用可能であることを保証できません。 .

tl;dr;

fork-join で参照される「ブロッキング ジョイン」操作は、タスク内で何らかの「ブロッキング コード」を呼び出すことと混同しないでください。

1 つ目は、多くの独立したタスク (独立したスレッドではない) を調整して、個々の結果を収集し、全体的な結果を評価することです。

2 つ目は、単一のタスク内で長時間ブロックする可能性のある操作の呼び出しに関するものです。たとえば、ネットワーク経由の IO 操作、DB クエリ、ファイル システムへのアクセス、グローバルに同期されたオブジェクトまたはメソッドへのアクセスなどです。

2 番目の種類のブロッキングは、ScalaFuturesForkJoinTasksその両方では推奨されません。主なリスクは、スレッドプールが使い果たされ、キューで待機しているタスクを完了できず、使用可能なすべてのスレッドが操作のブロックで待機中であることです。

于 2013-09-10T15:25:38.393 に答える