私が書いた以下の SPARK コードを実行中にエラーが発生しました。キーに基づいてすべてのベクトルの合計を見つけようとしています。各入力行は key(integer) で始まり、次に 127 次元の単一ベクトルである 127 個の浮動小数点数、つまり各行はキーとベクトルで始まります。
from cStringIO import StringIO
class testing:
def __str__(self):
file_str = StringIO()
for n in self.vector:
file_str.write(str(n))
file_str.write(" ")
return file_str.getvalue()
def __init__(self,txt="",initial=False):
self.vector = [0.0]*128
if len(txt)==0:
return
i=0
for n in txt.split():
if i<128:
self.vector[i]=float(n)
i = i+1
continue
self.filename=n
break
def addVec(self,r):
a = testing()
for n in xrange(0,128):
a.vector[n] = self.vector[n] + r.vector[n]
return a
def InitializeAndReturnPair(string,first=False):
vec = testing(string,first)
return 1,vec
from pyspark import SparkConf, SparkContext
conf = (SparkConf()
.setMaster("local")
.setAppName("My app")
.set("spark.executor.memory", "1g"))
sc = SparkContext(conf = conf)
inp = sc.textFile("input.txt")
output = inp.map(lambda s: InitializeAndReturnPair(s,True)).cache()
output.saveAsTextFile("output")
print output.reduceByKey(lambda a,b : a).collect()
ラインインの例input.txt
6.0 156.0 26.0 3.0 1.0 0.0 2.0 1.0 15.0 113.0 53.0 139.0 156.0 0.0 0.0 0.0 156.0 29.0 1.0 38.0 59.0 0.0 0.0 28.0 4.0 2.0 9.0 1.0 0.0 0.0 0.0 9.0 83.0 13.0 6.0 33.0 11.0 2.0 0.0 11.0 35.0 4.0 2.0 4.0 1.0 3.0 2.0 4.0 0.0 0.0 0.0 0.0 2.0 19.0 45.0 17.0 47.0 2.0 2.0 7.0 59.0 90.0 15.0 11.0 156.0 14.0 1.0 4.0 9.0 1.0 5.0 25.0 14.0 27.0 2.0 0.0 2.0 86.0 48.0 10.0 6.0 156.0 23.0 1.0 2.0 21.0 6.0 0.0 3.0 31.0 10.0 4.0 3.0 0.0 0.0 1.0 2.0
以下は私が得ているエラーです。このエラーは、コードの最後の行、つまりoutput.reduceByKey
エラーメッセージ - http://pastebin.com/0tqiiJQm
この問題へのアプローチ方法がよくわかりません。を使用してみましたMarshalSerializer
が、同じ問題が発生しました。
- - - - - - - - - - - - - - - 答え - - - - - - - - - - -----------------
同じ質問に対してApacheユーザーリストから回答を得ました。基本的にクラスターで実行されるマッパー/リデューサーにはクラス定義がなく、別のモジュールでクラスを記述してアタッチし、SparkContext を使用して構成しながらクラスを渡す必要があります。
sc.addPyFile(os.path( HOMEDirectory + "module.py"))
私を助けてくれてありがとう。