46

pymongoで一括アップサートを行うにはどうすればよいですか? 多数のエントリを更新したいのですが、一度に 1 つずつ実行すると非常に遅くなります。

ほぼ同じ質問に対する答えは次のとおりです: MongoDB での一括更新/アップサート?

受け入れられた答えは、実際には質問に答えません。インポート/エクスポートを行うための mongo CLI へのリンクを提供するだけです。

また、一括アップサートを行うことができない/ベストプラクティスではない理由を説明する人にもオープンですが、この種の問題に対する推奨される解決策は何かを説明してください.

4

6 に答える 6

47

pymongo の最新リリース (3.x 以降) は、サーバー リリースが一括操作をサポートしていない場合にダウングレードする一貫したインターフェイスで一括操作をラップします。これは、MongoDB が公式にサポートするドライバーで一貫しています。

そのため、代わりに別の適切な操作アクションbulk_write()を使用する代わりに使用することをお勧めします。UpdateOneそしてもちろん、特定のビルダーよりも自然言語リストを使用することをお勧めします

古い文書の直訳:

from pymongo import UpdateOne

operations = [
    UpdateOne({ "field1": 1},{ "$push": { "vals": 1 } },upsert=True),
    UpdateOne({ "field1": 1},{ "$push": { "vals": 2 } },upsert=True),
    UpdateOne({ "field1": 1},{ "$push": { "vals": 3 } },upsert=True)
]

result = collection.bulk_write(operations)

または、従来のドキュメント変換ループ:

import random
from pymongo import UpdateOne

random.seed()

operations = []

for doc in collection.find():
    # Set a random number on every document update
    operations.append(
        UpdateOne({ "_id": doc["_id"] },{ "$set": { "random": random.randint(0,10) } })
    )

    # Send once every 1000 in batch
    if ( len(operations) == 1000 ):
        collection.bulk_write(operations,ordered=False)
        operations = []

if ( len(operations) > 0 ):
    collection.bulk_write(operations,ordered=False)

返される結果にはBulkWriteResult、一致したドキュメントと更新されたドキュメントのカウンター、および_id発生した「アップサート」の戻り値が含まれます。

一括操作配列のサイズについては、少し誤解があります。サーバーに送信される実際の要求は、16MB BSON 制限を超えることはできません。これは、BSON 形式を使用しているサーバーに送信される「要求」にもその制限が適用されるためです。

ただし、実際の操作は 1000 のバッチでのみ送信および処理されるため、作成できるリクエスト配列のサイズはこれで決まりません。唯一の実際の制限は、これらの 1000 の操作命令自体が実際には 16MB を超える BSON ドキュメントを作成しないことです。これは確かにかなり難しい注文です。

一括メソッドの一般的な概念は、一度に多くのものを送信し、1 つのサーバー応答のみを処理する結果として、「トラフィックが少ない」ことです。すべての更新リクエストに付随するオーバーヘッドを削減することで、多くの時間を節約できます。

于 2016-03-25T03:51:51.070 に答える
32

MongoDB 2.6 以降では、一括操作がサポートされています。これには、一括挿入、アップサート、更新などが含まれます。これのポイントは、レコードごとの操作を行う際のラウンドトリップ遅延による遅延を削減/排除することです (「ドキュメントごと」が正しい)。

それで、これはどのように機能しますか?私が取り組んでいるのは Python の例です。

>>> import pymongo
>>> pymongo.version
'2.7rc0'

この機能を使用するには、「一括」オブジェクトを作成し、それにドキュメントを追加してから、execute を呼び出すと、すべての更新が一度に送信されます。警告: 収集された操作の BSON サイズ (bsonsize の合計) は、ドキュメント サイズの制限である 16 MB を超えることはできません。もちろん、操作の数は大幅に変動する可能性があり、マイレージは変動する可能性があります。

Bulk アップサート操作の Pymongo での例:

import pymongo
conn = pymongo.MongoClient('myserver', 8839)
db = conn['mydbname']
coll = db.myCollection
bulkop = coll.initialize_ordered_bulk_op()
retval = bulkop.find({'field1':1}).upsert().update({'$push':{'vals':1}})
retval = bulkop.find({'field1':1}).upsert().update({'$push':{'vals':2}})
retval = bulkop.find({'field1':1}).upsert().update({'$push':{'vals':3}})
retval = bulkop.execute()

これは不可欠な方法です。詳細については、次を参照してください。

http://api.mongodb.org/python/2.7rc1/examples/bulk.html

編集:- Python ドライバーのバージョン 3.5 以降、initialize_ordered_bulk_op は非推奨です。代わりに bulk_write() を使用してください。[ http://api.mongodb.com/python/current/api/pymongo/collection.html#pymongo.collection.Collection.bulk_write ]

于 2014-03-25T00:11:09.290 に答える
1

答えは同じです。一括アップサートはサポートされていません。

于 2011-03-13T21:45:31.227 に答える
1

multi=True を使用して、クエリ仕様に一致するすべてのドキュメントを更新できます。

ここには、コマンドのバッチを思い通りに実行することに関するバグがあります。

于 2011-03-14T18:15:35.193 に答える
0

Python 3.5+、motor および asyncio による最速の一括更新:

import asyncio
import datetime
import logging
import random
import time

import motor.motor_asyncio
import pymongo.errors


async def execute_bulk(bulk):
    try:
        await bulk.execute()
    except pymongo.errors.BulkWriteError as err:
        logging.error(err.details)


async def main():
    cnt = 0
    bulk = db.initialize_unordered_bulk_op()
    tasks = []
    async for document in db.find({}, {}, no_cursor_timeout=True):
        cnt += 1
        bulk.find({'_id': document['_id']}).update({'$set': {"random": random.randint(0,10)}})
        if not cnt % 1000:
            task = asyncio.ensure_future(execute_bulk(bulk))
            tasks.append(task)
            bulk = db.initialize_unordered_bulk_op()
    if cnt % 1000:
        task = asyncio.ensure_future(bulk.execute(bulk))
        tasks.append(task)
    logging.info('%s processed', cnt)
    await asyncio.gather(*tasks)


logging.basicConfig(level='INFO')    
db = motor.motor_asyncio.AsyncIOMotorClient()['database']['collection']
start_time = time.time()
loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    execution_time = time.time() - start_time
    logging.info('Execution time: %s', datetime.timedelta(seconds=execution_time))
于 2016-12-07T17:41:56.563 に答える