0

spark を使用して一部のデータを離散化しようとしています。

次の形式のデータがあります。

date           zip   amount
2013/04/02    04324  32.2
2013/04/01    23242  1.5
2013/04/02    99343  12

次に、次のコードがあります。

sampleTable = sqlCtx.inferSchema(columns)
sampleTable.registerAsTable("amounts")


exTable = sampleTable.map(lambda p: {"date":p.date,"zip":p.zip,"amount":p.amount}) 

次に、離散化する関数があります。

def discretize((key, data), cutoff=0.75):
    result = (data < np.percentile(index,cutoff))
    return result

この結果列を取得し、後で元のデータ セットと結合します。

このステートメントを使用してアクションを実行しようとしています:

exDiscretized = exTable.map(lambda x: (((dt.datetime.strptime(x.date,'%Y/%m/%d')).year, (dt.datetime.strptime(x.date,'%Y/%m/%d')).month), x.amount)).reduce(discretize).collect()

基本的に、((年、月)、行全体) のタプルが必要なので、月と年の組み合わせごとに 75 パーセンタイルを見つけることができます。

マップ部分を正常に動作させることができます。reduce 部分を取り除くと、コードが機能するようになります。

map と reduce の両方でステートメントを実行すると、次のエラーが発生します。

org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/lib/spark/python/pyspark/worker.py", line 79, in main
serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/lib/spark/python/pyspark/serializers.py", line 196, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
  File "/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/lib/spark/python/pyspark/serializers.py", line 127, in dump_stream
    for obj in iterator:
  File "/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/lib/spark/python/pyspark/serializers.py", line 185, in _batched
for item in iterator:
  File "/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/lib/spark/python/pyspark/rdd.py", line 715, in func
yield reduce(f, iterator, initial)
  File "<stdin>", line 2, in discretize
  File "/usr/local/lib/python2.7/dist-packages/numpy-1.9.1-py2.7-linux-x86_64.egg/numpy/lib/function_base.py", line 3051, in percentile
    q = array(q, dtype=np.float64, copy=True)
 ValueError: setting an array element with a sequence.

何が間違っているのかわかりません。おそらく、キーと値のペアを生成する方法と関係がありますか?

4

1 に答える 1

1

したがって、問題の根本は、reduce が意図したとおりに機能しないことにあると思います。単一のキーのすべてのデータをまとめたいので、関数 groupByKey が探しているものである可能性があります。次に例を示します。

input = sc.parallelize([("hi", 1), ("bye", 0), ("hi", 3)])
groupedInput = input.groupByKey()
def top(x):
     data = list(x)
     percentile = np.percentile(data, 0.70)
     return filter(lambda x: x >= percentile , data) 
modifiedGroupedInput = groupedInput.mapValues(top)
modifiedGroupedInput.collect()

結果:

[('bye', [0]), ('hi', [3])]

通常は reduceByKey を使用する方が適切ですが、各キーのすべての要素を同時に計算して計算する必要があるためです。

于 2015-02-06T00:09:27.603 に答える