私は pyspark Dataframe を持っていますが、今では各行を反復処理し、mongoDB コレクションに挿入/更新したいと考えています。
#Did every required imports
#dataframe
+---+----+
|age|name|
+---+----+
| 30| c|
| 5| e|
| 6| f|
+---+----+
db = mongodbclient['mydatabase']
collection = db['mycollection']
#created below function to insert/update
def customFunction(row):
key = {'name':row.name}
data = dict(zip(columns,[row.x for x in columns]))
collection.update(key, data, {upsert:true})
#return a_flag #commented it as of now, a_flag can be 0 or 1
名前が mongoDB コレクション 'mycollection' に存在する場合、その行/レコードを更新する必要があり、そうでない場合はその新しいレコードを挿入します。
この関数をspark-dataframeにマップしようとすると、次のエラーが発生します
result = my_dataframe.rdd.map(customFunction)
#.....TypeError: can't pickle _thread.lock objects....
#AttributeError: 'TypeError' object has no attribute 'message'
誰かが「ここの関数および/または他の場所で何が間違っているか」を理解してください。または、このタイプのタスクに他の代替手段があるかどうかを提案してください。
基本的に各行を繰り返します(収集呼び出しなしでも可能ですか??)
そして、各行に関数を適用して、外部スパーク作業を実行します。
提案してください、前もって感謝します..:)
mongoDB の私のデータ
name age
a 1
b 2
c 3 #new update should make age as 30 and 2 more new recs should inserted