fold
ここで何が起こっているのかを理解するために、Spark の操作の定義を見てみましょう。PySpark を使用しているため、コードの Python バージョンを表示しますが、Scala バージョンはまったく同じ動作を示します ( GitHub でソースを参照することもできます)。
def fold(self, zeroValue, op):
"""
Aggregate the elements of each partition, and then the results for all
the partitions, using a given associative function and a neutral "zero
value."
The function C{op(t1, t2)} is allowed to modify C{t1} and return it
as its result value to avoid object allocation; however, it should not
modify C{t2}.
>>> from operator import add
>>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
15
"""
def func(iterator):
acc = zeroValue
for obj in iterator:
acc = op(obj, acc)
yield acc
vals = self.mapPartitions(func).collect()
return reduce(op, vals, zeroValue)
(比較については、 のScala 実装をRDD.fold
参照してください)。
Sparkfold
は、最初に各パーティションを折りたたみ、次に結果を折りたたむことによって動作します。問題は、空のパーティションがゼロ要素に折り畳まれることです。そのため、最後のドライバー側の折り畳みは、空でないパーティションごとに 1 つの値ではなく、すべてのパーティションに対して1 つの値を折り畳むことになります。これは、 の結果がパーティションの数に影響されることを意味します。fold
>>> sc.parallelize([1,25,8,4,2], 100).fold(0,lambda a,b:a+1 )
100
>>> sc.parallelize([1,25,8,4,2], 50).fold(0,lambda a,b:a+1 )
50
>>> sc.parallelize([1,25,8,4,2], 1).fold(0,lambda a,b:a+1 )
1
この最後のケースでは、単一のパーティションが正しい値に折り畳まれ、その値がドライバーでゼロ値と折り畳まれて 1 が生成されます。
Spark のfold()
操作では、fold 関数が結合的であるだけでなく交換可能である必要があるようです。実際には、シャッフルされたパーティション内の要素の順序が実行間で非決定的である可能性があるという事実など、この要件を課す Spark の他の場所があります ( SPARK-5750を参照)。
この問題を調査するために、Spark JIRA チケットをオープンしました: https://issues.apache.org/jira/browse/SPARK-6416。