Pythonでspark 1.2.0を使用しています。
私の問題は、フィールドの値がゼロの場合、SQLクエリで、それを他の値に置き換える必要があることです。
1.4.0 では機能するが 1.2.0 では機能しない case /coalese を試しました
case when COALESCE("+fld+",0)=0 then "+str(numavgnumlst[0][lock])+" else "+fld+" end.
ただし、1.2.0の場合、マップで同じことをしようとしました
sc = SparkContext(appName="RunModelCCATTR")
sqlContext=SQLContext(sc)
sqlstr="select ..."
nonzerodf=sqlContext.sql(sqlstr)
.....
iifdatadf=nonzerodf.map(lambda candrow:replacezeroforrow(candrow,numavgnumlst))
....
def replacezeroforrow(rowfields,avgvalfields):
ind=0
lent=len(rowfields)
for rowfield in rowfields[4:lent]:
if rowfield==0:
rowfields[ind]=avgvalfields[ind]
ind=ind+1
return rowfields;
これはエラーをスローします
TypeError: 'Row' object does not support item assignment
spark 1.2.0 で目的を達成するために何ができるかわかりません。
助けてくれてありがとう、私はそれが今働いていると思います..列の順序が変更されたように見えることを除いて..しかし、それは問題ではないかもしれません。再度、感謝します
編集:
このアイデアは私を大いに助けてくれました。差し迫った問題を解決するには少し修正が必要でした。
def replacezeroforrow(rowfields,avgvalfields,dont_replace=[]):
rdict = rowfields.asDict()
return Row(dict([(k,avgvalfields[k] if v == 0 and k not in dont_replace else v ) for (k,v) in rdict.items()]))
「for」での構文エラーを回避するために、元のソリューションを変更しました。
メソッドの呼び出しは次のとおりです。
restrictdict=[FieldSet1,FieldSet2,FieldSet3,FieldSet4,modeldepvarcat[0]]
iifdatadf=nonzerodf.map(lambda candrow: replacezeroforrow(candrow,numavgnumlst[0].asDict(),restrictdict))
ただし、今 iifdatadf にアクセスしようとしていますが、
frstln= iifdatadf.first()
print frstln
次のエラーが発生しています
return "<Row(%s)>" % ", ".join(self)
TypeError: sequence item 0: expected string, dict found
助けていただければ幸いです。