48

リスト内の各単語の出現回数をカウントする以下の簡単な例を参照してください。

Stream<String> words = Stream.of("a", "b", "a", "c");
Map<String, Integer> wordsCount = words.collect(toMap(s -> s, s -> 1,
                                                      (i, j) -> i + j));

最後に、wordsCountです{a=2, b=1, c=1}

しかし、私のストリームは非常に大きく、ジョブを並列化したいので、次のように書きます:

Map<String, Integer> wordsCount = words.parallel()
                                       .collect(toMap(s -> s, s -> 1,
                                                      (i, j) -> i + j));

wordsCountただし、それは単純なことに気付いたHashMapので、スレッドの安全性を確保するために同時実行マップを明示的に要求する必要があるかどうか疑問に思います。

Map<String, Integer> wordsCount = words.parallel()
                                       .collect(toConcurrentMap(s -> s, s -> 1,
                                                                (i, j) -> i + j));

非同時コレクターを並列ストリームで安全に使用できますか? または、並列ストリームから収集する場合は同時バージョンのみを使用する必要がありますか?

4

3 に答える 3

48

非同時コレクターを並列ストリームで安全に使用できますか? または、並列ストリームから収集する場合は同時バージョンのみを使用する必要がありますか?

collect並列ストリームの操作で非同時コレクターを使用しても安全です。

インターフェイスの仕様では、Collector半ダースの箇条書きのあるセクションで、これは次のとおりです。

非並行コレクターの場合、結果サプライヤー、アキュムレーター、またはコンバイナー関数から返される結果は、シリアルにスレッド制限する必要があります。これにより、コレクターが追加の同期を実装する必要なく、コレクションを並行して実行できます。リダクションの実装では、入力が適切に分割されていること、分割が分離して処理されていること、結合が累積の完了後にのみ行われることを管理する必要があります。

これは、Collectorsクラスによって提供されるさまざまな実装を並列ストリームで使用できることを意味しますが、これらの実装の一部は同時コレクターではない可能性があります。これは、実装する可能性のある独自の非並行コレクターにも当てはまります。コレクターがストリームソースに干渉しない、副作用がなく、順序に依存しないなどの条件で、並列ストリームで安全に使用できます。

また、java.util.stream パッケージ ドキュメントのMutable Reductionセクションを読むことをお勧めします。このセクションの真ん中には、並列化可能であると述べられているが、ArrayListスレッドセーフではない に結果を収集する例があります。

これが機能する方法は、非並行コレクターで終了する並列ストリームが、中間結果コレクションの異なるインスタンスで異なるスレッドが常に動作していることを確認することです。そのため、コレクターには、Supplierスレッドと同じ数の中間コレクションを作成する機能があり、各スレッドが独自に蓄積できるようになっています。中間結果がマージされる場合、それらはスレッド間で安全に受け渡され、任意の時点で 1 つのスレッドのみが中間結果のペアをマージします。

于 2014-03-12T20:17:24.910 に答える
25

すべてのコレクターは、仕様の規則に従っていれば、並列または順次に安全に実行できます。並列対応は、ここでの設計の重要な部分です。

並行コレクターと非並行コレクターの違いは、並列化へのアプローチに関係しています。

通常の (非並行) コレクタは、サブ結果をマージすることによって動作します。したがって、ソースはチャンクの束に分割され、各チャンクは結果コンテナー (リストやマップなど) に収集され、サブ結果はより大きな結果コンテナーにマージされます。これは安全で順序を保持しますが、キーによって 2 つのマップをマージするとコストがかかることが多いため、一部の種類のコンテナー (特にマップ) ではコストがかかる可能性があります。

並行コレクターは代わりに、挿入操作がスレッドセーフであることが保証されている 1 つの結果コンテナーを作成し、複数のスレッドからそのコンテナーに要素を爆破します。ConcurrentHashMap のような同時実行性の高い結果コンテナーを使用すると、このアプローチは通常の HashMap をマージするよりも優れたパフォーマンスを発揮する可能性があります。

したがって、並行コレクターは、通常のコレクターよりも厳密に最適化されています。そして、それらはコストなしでは提供されません。要素は多くのスレッドから吹き込まれているため、並行コレクターは通常、遭遇順序を保持できません。(しかし、多くの場合、単語カウント ヒストグラムを作成するときは気にしません。「foo」のどのインスタンスを最初にカウントしたかは気にしません。)

于 2014-04-29T16:36:32.743 に答える
12

非並行コレクションと非アトミック カウンターを並列ストリームで安全に使用できます。

Stream::collectのドキュメントを見ると、次の段落が見つかります。

と同様reduce(Object, BinaryOperator)に、追加の同期を必要とせずに収集操作を並列化できます。

メソッドStream::reduceの場合:

これは、ループ内で実行中の合計を単純に変更する場合に比べて、集計を実行するためのより遠回りの方法に見えるかもしれませんが、リダクション操作は、追加の同期を必要とせず、データ競合のリスクを大幅に削減して、より適切に並列化します。

これはちょっと意外かもしれません。ただし、並列ストリームはfork-join モデルに基づいていることに注意してください。つまり、同時実行は次のように機能します。

  • シーケンスをほぼ同じサイズの 2 つの部分に分割する
  • 各パーツを個別に処理する
  • 両方の部分の結果を収集し、それらを 1 つの結果に結合する

2 番目のステップでは、3 つのステップがサブシーケンスに再帰的に適用されます。

例でそれを明確にする必要があります。の

IntStream.range(0, 4)
    .parallel()
    .collect(Trace::new, Trace::accumulate, Trace::combine);

クラスTraceの唯一の目的は、コンストラクターとメソッドの呼び出しをログに記録することです。このステートメントを実行すると、次の行が出力されます。

thread:  9  /  operation: new
thread: 10  /  operation: new
thread: 10  /  operation: accumulate
thread:  1  /  operation: new
thread:  1  /  operation: accumulate
thread:  1  /  operation: combine
thread: 11  /  operation: new
thread: 11  /  operation: accumulate
thread:  9  /  operation: accumulate
thread:  9  /  operation: combine
thread:  9  /  operation: combine

4 つのTraceオブジェクトが作成され、accumulateが各オブジェクトに対して 1 回呼び出され、combineが 3 回使用されて 4 つのオブジェクトが 1 つに結合されていることがわかります。各オブジェクトは、一度に 1 つのスレッドのみがアクセスできます。これにより、コードはスレッドセーフになり、同じことがCollectors::toMapメソッドにも当てはまります。

于 2014-03-12T20:22:18.550 に答える