3

からのこの出力には驚きましたfold。何をしているのか想像できません。

折り畳みは で始まり、要素ごとに追加されるため、something.fold(0, lambda a,b: a+1)の要素数が返されると思います。something01

sc.parallelize([1,25,8,4,2]).fold(0,lambda a,b:a+1 )
8

私は Scala から来ました。そこでは、私が説明したように折り畳みが機能します。では、fold は pyspark でどのように機能するのでしょうか? ご意見ありがとうございます。

4

2 に答える 2

7

foldここで何が起こっているのかを理解するために、Spark の操作の定義を見てみましょう。PySpark を使用しているため、コードの Python バージョンを表示しますが、Scala バージョンはまったく同じ動作を示します ( GitHub でソースを参照することもできます)。

def fold(self, zeroValue, op):
    """
    Aggregate the elements of each partition, and then the results for all
    the partitions, using a given associative function and a neutral "zero
    value."
    The function C{op(t1, t2)} is allowed to modify C{t1} and return it
    as its result value to avoid object allocation; however, it should not
    modify C{t2}.
    >>> from operator import add
    >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
    15
    """
    def func(iterator):
        acc = zeroValue
        for obj in iterator:
            acc = op(obj, acc)
        yield acc
    vals = self.mapPartitions(func).collect()
    return reduce(op, vals, zeroValue)

(比較については、 のScala 実装をRDD.fold参照してください)。

Sparkfoldは、最初に各パーティションを折りたたみ、次に結果を折りたたむことによって動作します。問題は、空のパーティションがゼロ要素に折り畳まれることです。そのため、最後のドライバー側の折り畳みは、空でないパーティションごとに 1 つの値ではなく、すべてのパーティションに対して1 つの値を折り畳むことになります。これは、 の結果がパーティションの数に影響されることを意味します。fold

>>> sc.parallelize([1,25,8,4,2], 100).fold(0,lambda a,b:a+1 )
100
>>> sc.parallelize([1,25,8,4,2], 50).fold(0,lambda a,b:a+1 )
50
>>> sc.parallelize([1,25,8,4,2], 1).fold(0,lambda a,b:a+1 )
1

この最後のケースでは、単一のパーティションが正しい値に折り畳まれ、その値がドライバーでゼロ値と折り畳まれて 1 が生成されます。

Spark のfold()操作では、fold 関数が結合的であるだけでなく交換可能である必要があるようです。実際には、シャッフルされたパーティション内の要素の順序が実行間で非決定的である可能性があるという事実など、この要件を課す Spark の他の場所があります ( SPARK-5750を参照)。

この問題を調査するために、Spark JIRA チケットをオープンしました: https://issues.apache.org/jira/browse/SPARK-6416

于 2015-03-19T18:07:59.043 に答える