9

私は実際にこの質問に答えようとしました。Files.lines から取得した Stream<String> の行をスキップする方法 したがって、このコレクターは並行してうまく機能しないと思いました:

private static Collector<String, ?, List<String>> oddLines() {
    int[] counter = {1};
    return Collector.of(ArrayList::new,
            (l, line) -> {
                if (counter[0] % 2 == 1) l.add(line);
                counter[0]++;
            },
            (l1, l2) -> {
                l1.addAll(l2);
                return l1;
            });
}

しかし、それは機能します。

編集:実際には機能しませんでした。入力セットが小さすぎて並列処理をトリガーできないという事実にだまされました。コメントの議論を参照してください

次の2つの処刑案が頭に浮かんだので、うまくいかないと思いました。


1.counter配列はすべてのスレッド間で共有されます。

スレッド t1 が Stream の最初の要素を読み取るため、if 条件が満たされます。リストに最初の要素を追加します。その後、配列値を更新する前に実行が停止します。

ストリームの 4 番目の要素で開始したと言うスレッド t2 は、それをリストに追加します。したがって、不要な要素になってしまいます。

もちろん、このコレクターは機能しているように見えるので、そのようには機能しないと思います。とにかく、更新はアトミックではありません。


2. 各スレッドには、配列の独自のコピーがあります

この場合、更新にこれ以上の問題はありませんが、スレッド t2 がストリームの 4 番目の要素で開始されないことを妨げるものは何もありません。だから彼もそのようには働きません。


それで、それはまったく機能しないようです.それは私に質問をもたらします.コレクターはどのように並行して使用されますか?

誰かが基本的にどのように機能するのか、また並行して実行したときにコレクターが機能する理由を説明できますか?

どうもありがとうございました!

4

2 に答える 2

3

実はこのコレクターワークは偶然です。カスタム データ ソースでは機能しません。次の例を検討してください。

List<String> list = IntStream.range(0, 10).parallel().mapToObj(String::valueOf)
        .collect(oddLines());
System.out.println(list);

これにより、常に異なる結果が生成されます。本当の原因は、BufferedReader.lines()ストリームが少なくともjava.util.Spliterators.IteratorSpliterator.BATCH_UNIT1024 の行数で分割されているためです。行数が大幅に多い場合は、次の場合でも失敗する可能性がありBufferedReaderます。

String data = IntStream.range(0, 10000).mapToObj(String::valueOf)
    .collect(Collectors.joining("\n"));
List<String> list = new BufferedReader(new StringReader(data)).lines().parallel()
    .collect(oddLines());
list.stream().mapToInt(Integer::parseInt).filter(x -> x%2 != 0)
    .forEach(System.out::println);

コレクタが正常に動作していれば、何も出力されないはずです。しかし、時々それは印刷されます。

于 2015-05-11T15:56:28.897 に答える