4

PySpark を介して Spark Streaming を調査しており、transform関数を使用しようとするとエラーが発生しtakeます。

ビアと結果sortByに対して正常に使用できます。DStreamtransformpprint

author_counts_sorted_dstream = author_counts_dstream.transform\
  (lambda foo:foo\
   .sortBy(lambda x:x[0].lower())\
   .sortBy(lambda x:x[1],ascending=False))
author_counts_sorted_dstream.pprint()

しかしtake、同じパターンに従って使用して試してみるpprintと:

top_five = author_counts_sorted_dstream.transform\
  (lambda rdd:rdd.take(5))
top_five.pprint()

仕事は失敗します

Py4JJavaError: An error occurred while calling o25.awaitTermination.
: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "/usr/local/spark/python/pyspark/streaming/util.py", line 67, in call
    return r._jrdd
AttributeError: 'list' object has no attribute '_jrdd'

こちらのノートブックで完全なコードと出力を確認できます。

私は何を間違っていますか?

4

1 に答える 1

5

渡す関数はからにtransform変換する必要があります。のようなアクションを使用する場合は、結果を次のように変換する必要があります。RDDRDDtakeRDD

sc: SparkContext = ...

author_counts_sorted_dstream.transform(
  lambda rdd: sc.parallelize(rdd.take(5))
)

対照的RDD.sortByに、変換 (RDD を返す) が使用されるため、さらに並列化する必要はありません。

余談ですが、次の機能:

lambda foo: foo \
    .sortBy(lambda x:x[0].lower()) \
    .sortBy(lambda x:x[1], ascending=False)

あまり意味がありません。Spark はシャッフルでソートするため、安定していないことに注意してください。複数のフィールドで並べ替えたい場合は、次のような複合キーを使用する必要があります。

lambda x: (x[0].lower(), -x[1])
于 2017-01-05T12:40:29.710 に答える