0

私が書いた以下の SPARK コードを実行中にエラーが発生しました。キーに基づいてすべてのベクトルの合計を見つけようとしています。各入力行は key(integer) で始まり、次に 127 次元の単一ベクトルである 127 個の浮動小数点数、つまり各行はキーとベクトルで始まります。


from cStringIO import StringIO

class testing:
    def __str__(self):
        file_str = StringIO()
        for n in self.vector:
            file_str.write(str(n)) 
            file_str.write(" ")
        return file_str.getvalue()
    def __init__(self,txt="",initial=False):
        self.vector = [0.0]*128
        if len(txt)==0:
            return
        i=0
        for n in txt.split():
            if i<128:
                self.vector[i]=float(n)
                i = i+1
                continue
            self.filename=n
            break
def addVec(self,r):
    a = testing()
    for n in xrange(0,128):
        a.vector[n] = self.vector[n] + r.vector[n]
    return a

def InitializeAndReturnPair(string,first=False):
    vec = testing(string,first)
    return 1,vec


from pyspark import SparkConf, SparkContext
conf = (SparkConf()
         .setMaster("local")
         .setAppName("My app")
         .set("spark.executor.memory", "1g"))
sc = SparkContext(conf = conf)

inp = sc.textFile("input.txt")
output = inp.map(lambda s: InitializeAndReturnPair(s,True)).cache()
output.saveAsTextFile("output")
print output.reduceByKey(lambda a,b : a).collect()

ラインインの例input.txt

6.0 156.0 26.0 3.0 1.0 0.0 2.0 1.0 15.0 113.0 53.0 139.0 156.0 0.0 0.0 0.0 156.0 29.0 1.0 38.0 59.0 0.0 0.0 28.0 4.0 2.0 9.0 1.0 0.0 0.0 0.0 9.0 83.0 13.0 6.0 33.0 11.0 2.0 0.0 11.0 35.0 4.0 2.0 4.0 1.0 3.0 2.0 4.0 0.0 0.0 0.0 0.0 2.0 19.0 45.0 17.0 47.0 2.0 2.0 7.0 59.0 90.0 15.0 11.0 156.0 14.0 1.0 4.0 9.0 1.0 5.0 25.0 14.0 27.0 2.0 0.0 2.0 86.0 48.0 10.0 6.0 156.0 23.0 1.0 2.0 21.0 6.0 0.0 3.0 31.0 10.0 4.0 3.0 0.0 0.0 1.0 2.0

以下は私が得ているエラーです。このエラーは、コードの最後の行、つまりoutput.reduceByKey

エラーメッセージ - http://pastebin.com/0tqiiJQm

この問題へのアプローチ方法がよくわかりません。を使用してみましたMarshalSerializerが、同じ問題が発生しました。

- - - - - - - - - - - - - - - 答え - - - - - - - - - - -----------------

同じ質問に対してApacheユーザーリストから回答を得ました。基本的にクラスターで実行されるマッパー/リデューサーにはクラス定義がなく、別のモジュールでクラスを記述してアタッチし、SparkContext を使用して構成しながらクラスを渡す必要があります。

sc.addPyFile(os.path( HOMEDirectory + "module.py"))

私を助けてくれてありがとう。

4

1 に答える 1

0

スパークでうまく機能するnumpy配列を使用できます。

import numpy as np

def row_map(text):
    split_text = text.split()
    # create numpy array from elements besides the first element 
    # which is the key
    return split_text(0), np.array([float(v) for v in split_text[1:]])

from pyspark import SparkConf, SparkContext
conf = (SparkConf()
     .setMaster("local")
     .setAppName("My app")
     .set("spark.executor.memory", "1g"))
sc = SparkContext(conf = conf)

inp = sc.textFile("input.txt")
output = inp.map(row_map).cache()
#Below line is throwing error
print output.reduceByKey(lambda a,b : np.add(a,b)).collect()

より簡潔でpythonicです。

于 2014-10-26T21:43:23.330 に答える