2

私は pyspark Dataframe を持っていますが、今では各行を反復処理し、mongoDB コレクションに挿入/更新したいと考えています。

 #Did every required imports 
 #dataframe
 +---+----+
 |age|name|
 +---+----+
 | 30|   c|
 |  5|   e|
 |  6|   f|
 +---+----+
  db = mongodbclient['mydatabase']
  collection = db['mycollection']
  #created below function to insert/update
  def customFunction(row):
      key = {'name':row.name}
      data = dict(zip(columns,[row.x for x in columns]))
      collection.update(key, data, {upsert:true})
      #return a_flag #commented it as of now, a_flag can be 0 or 1

名前が mongoDB コレクション 'mycollection' に存在する場合、その行/レコードを更新する必要があり、そうでない場合はその新しいレコードを挿入します。

この関数をspark-dataframeにマップしようとすると、次のエラーが発生します

 result = my_dataframe.rdd.map(customFunction)
 #.....TypeError: can't pickle _thread.lock objects....
 #AttributeError: 'TypeError' object has no attribute 'message'

誰かが「ここの関数および/または他の場所で何が間違っているか」を理解してください。または、このタイプのタスクに他の代替手段があるかどうかを提案してください。

基本的に各行を繰り返します(収集呼び出しなしでも可能ですか??)

そして、各行に関数を適用して、外部スパーク作業を実行します。

提案してください、前もって感謝します..:)

mongoDB の私のデータ

name  age
 a    1
 b    2
 c    3 #new update should make age as 30 and 2 more new recs should inserted
4

2 に答える 2

1

MongoDB で 50 万件のレコードをアップサートする場合、これを処理するにはおそらく一括モードの方が効率的です。mongoDB 内でリクエストを実行すると、spark で実際に行うこと (リクエストを作成するだけ) に比べてはるかに多くの電力が必要になり、これを並行して実行しても、mongo 側で不安定になる可能性があります (「反復」アプローチよりも遅くなります)。

次のコードを試すことができます。を使用しないためcollect()、ドライバーのメモリ効率が高くなります。

bulk = collection.initialize_unordered_bulk_op()
for row in rdd.toLocalIterator():
    key = {'name':row.name}
    data = dict(zip(columns,[row.x for x in columns]))
    bulk.update(key, data, {upsert:true})

print(bulk.execute())
于 2017-09-07T18:16:52.440 に答える