2

を使用して複数のプロセスがレコードを読み取る MongoDB コレクション (ジョブ キューとして使用) がありますfindAndModifyFindAndModifyフィールドが「false」であるレコードを検索し、active「true」に設定して、他のプロセスが同じレコードを読み取らないようにします。

問題は、ログを見ると、さまざまなプロセスが同じレコードを読み取っていることがわかります。これは、2 つのプロセスが同時にキューから読み取ったときに発生するようです。一度に 1 つのプロセスだけがコレクションから読み取るようにする方法はありますか?

Mongo 2.2.3 と pymongo 2.2 を使用しています。

どうもありがとう!

編集:問題のログは次のとおりです。

worker.3 2013-03-18 23:57:45,434 default-worker-3
project_name INFO キュー ジョブ: ジョブ ID: 5147a90f68e8fe0097002bdf

worker.3 2013-03-18 23:57:47,608 default-worker-3
project_name INFO 入力: 14497 ドキュメント

worker.2 2013-03-18 23:57:45,440 default-worker-2
project_name INFO キュー ジョブ: ジョブ ID: 5147a90f68e8fe0097002bdf

worker.2 2013-03-18 23:57:47,658 default-worker-2
project_name INFO 入力: 14497 ドキュメント

ご覧のとおり、worker.3 と worker.2 はキューから同じジョブを読み取ります (両方のワーカーで同じ mongodb ID を持っています)。

find_and_modify コマンド:

query = {"active": False}
try:
    return self.collection.find_and_modify(
            query=query,
            update={"$set": {"active": True}},
            upsert=False,
            sort={"added_on": 1},
            limit=1
        )
except Exception, exc:
    LOGGER.exception(exc)
4

3 に答える 3

4

非常に明確にさせてください.2つの異なるfindAndModifyコマンドがシナリオで同じドキュメントを返すことはできません。

ありえない。作業を実行するメソッドの最初の数行を次に示します。

    Lock::DBWrite lk( ns );
    Client::Context cx( ns );

    BSONObj doc;

    bool found = Helpers::findOne( ns.c_str() , queryOriginal , doc );

検索の前に WRITE ロックが取得される 122 行に注意してください。

https://github.com/mongodb/mongo/blob/master/src/mongo/db/commands/find_and_modify.cpp#L122

2 つのプロセスが同時に書き込みロックを保持することはできません。何か違うことが起こっている可能性が高いようです (複数のドキュメントが同じ id 値を持っている、find_and_modify を呼び出している関数によって同じドキュメントが返されて 2 つのスレッドに返されるなど、推測するに十分な情報がありません)。

FindAndModify は、実行中に排他的な書き込みロックを保持するアトミック コマンドです。私の提案は、何が起こっているのかについての誤った/不当な仮定に基づいてコードを変更するのではなく、ログが実際に何を示しているのかを理解することです.

于 2013-03-21T04:11:05.187 に答える
1

代わりに、「ロック」を 2 段階に分割してください。ロック タイムスタンプを持たない、またはタイムスタンプが期限切れの最初のオブジェクトをクエリし、新しいロックを設定する最初の更新レコード。次に、確立したロックデータを使用して同じオブジェクトを見つけます。

于 2013-03-19T16:40:09.723 に答える
1

まず、mongodb に対して単純なクエリを実行して、単一のジョブ レコードを取得します。

job = db.coll.find({query}).limit(1)

次に、ジョブ ID と場所を指定してレコードを更新しますactive=false

update_response = db.coll.update(
    {_id:job.id, active=false},
    {$set:{active:true}},
    false,
    false
)

ジョブが別のプロセスによって既に更新されている場合、 のクエリ制約により、更新は成功しませんactive=false。レコードが更新されたことを update_response で確認します。

if update_response['n'] > 0 and update_response['updatedExisting']==true:
    return job

更新が成功しなかった場合は、別のジョブを取得して再試行してください。

于 2013-03-19T17:18:24.113 に答える