3

これは非常に単純なことだと確信していますが、これに関連するものは何も見つかりませんでした。

私のコードは簡単です:

... 
stream = stream.map(mapper) 
stream = stream.reduceByKey(reducer) 
... 

出力は次のようになります。

... 
key1  value1 
key2  [value2, value3] 
key3  [[value4, value5], value6] 
... 

等々。そのため、フラットな値を取得することがあります (単一の場合)。場合によっては、非常に深いネストされたリスト (私の単純なテスト データでは 3 レベルの深さでした)。

「フラット」のようなものをソースから検索しようとしましたが、必要なものではない(私が理解しているように)flatMapメソッドしか見つかりませんでした。

これらのリストがネストされている理由がわかりません。私の推測では、それらは異なるプロセス (ワーカー?) によって処理され、平坦化せずに結合されたのです。

もちろん、そのリストを展開して平坦化するコードを Python で書くこともできます。しかし、これは通常の状況ではないと思います。ほぼすべての人が一定の出力を必要としていると思います。

itertools.chain は、最初に見つかった反復不可能な値で展開を停止します。つまり、まだコーディングが必要です (前の段落)。

では、PySpark のネイティブ メソッドを使用してリストをフラット化する方法は?

ありがとう

4

2 に答える 2

5

ここでの問題は、reduce 関数です。キーごとにreduceByKey、値のペアで reduce 関数を呼び出し、同じ型の結合された値を生成することを期待します。

たとえば、単語カウント操作を実行したいとします。(word, 1)まず、各単語をペアにマッピングし、次にreduceByKey(lambda x, y: x + y)各単語のカウントを合計します。(word, count)最後に、ペアの RDD が残ります。

PySpark API ドキュメントの例を次に示します。

>>> from operator import add
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.reduceByKey(add).collect())
[('a', 2), ('b', 1)]

あなたの例がうまくいかなかった理由を理解するために、reduce 関数が次のように適用されていると想像できます。

reduce(reduce(reduce(firstValue, secondValue), thirdValue), fourthValue) ...

reduce 関数に基づいて、groupByKey各キーをその値のリストでグループ化する組み込み操作を実装しようとしているように思えます。

また、reduce 関数の入力と出力の型が異なることを可能にするcombineByKeyの一般化を見てください (はに関して実装されています) 。reduceByKey()reduceByKeycombineByKey

于 2014-01-13T07:38:27.077 に答える