1

基本的なデフォルト情報と初期情報を使用して DB にスタブ オブジェクトを作成し、Rabbit MQ にメッセージをドロップして、このスタブ オブジェクトのデータを入力するために必要な IO の重労働を行う Django アプリケーションがあります。Rabbit MQ からのメッセージを処理するコマンド ライン スクリプトとして実行される個別の rabbit MQ コンシューマー (daemontools によってラップされているため、ダウンしない) があります。

ここで何が起こるか - ときどき、メッセージが戻ってきて (このメッセージにはスタブ オブジェクト ID があります)、StubObject.objects.get(pk=message['ID']) を使用してこのオブジェクトを読み取ろうとすると、一致するクエリが取得されます。例外が存在しません (通常、メッセージが返されるまでに約 30 秒かかります)。ただし、クエリが存在しないという例外が発生したときにDBをチェックインすると、オブジェクトのデータは間違いなくそこにあります。これは、メッセージを処理する約 5 回に 1 回発生します。なぜこれが起こるのかはわかりませんが、これを解決しようとしています. 時々しか失敗しないのは特に奇妙です。何か案は?

connection.queries を印刷しようとしましたが、出力が印刷されませんでした。

保存しようとしているうさぎの MQ コンシューマーは、標準の django python ファイルの外部で実行されるため、ORM にアクセスできるようにするために、大量のものをインポートする必要があります (おそらく問題がこれを行う方法にあるのかどうかはわかりません)。以下のコード:

#!/usr/bin/env python
import sys
import os
sys.path.append('/myproject')
os.environ['DJANGO_SETTINGS_MODULE'] = 'myproject.settings'

from django.db import connection
from django.core import serializers
from django.core.management import setup_environ
from register_obj.models import Obj
from myproject import settings
import datetime
import json
import pika

setup_environ(settings)

conn = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = conn.channel()

print '[*] Waiting for responses. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print "[x] Received %r" % (body,)
    objDict = json.loads(body)
        print objDict
    try:
        obj = Obj.objects.get(pk=objDict["id"])
        print connection.queries
    except Exception, e:
        print datetime.datetime.now()
        print "Error occurred, could not find obj by ID - "
        print objDict["id"]
        print e
        print '################Query##############'
        print connection.queries

    obj.updateDict(objDict)

    # Save the obj data
    obj.save()

channel.basic_consume(callback,
        queue='myQueue',
        no_ack=True)

channel.start_consuming()
4

1 に答える 1

1

通常、これはトランザクションをコミットする Django アプリとキュー コンシューマとの間の競合状態が原因です。

ほとんどの場合、トランザクション内の各リクエストを自動的にラップするように設定された Django アプリを実行しています。つまり、トランザクションを開始し、すべてのデータベース操作を実行し、リクエストが処理されてブラウザに送り返されるとコミットします (または何かが例外をスローした場合はロールバックします)。そこのどこかにスタブ オブジェクトを作成し、そのスタブの ID を使用してタスクをキューに入れます。問題は、キューが空でコンシューマがすぐにタスクを取得するときに発生します。一方、Django 側では残りのリクエストを完了するまで少し遅延があり (バイトをネットワーク経由でブラウザに送信して接続を閉じるだけでもしばらく時間がかかる場合があります)、トランザクションはまだ開いています。トランザクションが閉じられるまで、データベース内のそのオブジェクトの行は、コンシューマーなどの他のトランザクションで使用できません。

解決策は、タスクをキューに入れ、キューに入れる前にコミットするビューの手動トランザクション処理に切り替えることです。

次のようになります。

from django.django.db import transaction

@transaction.commit_manually
def some_view(request):
    try:
        # do some work...
        stub = Obj.objects.create(...)
    except:
        transaction.rollback()
        raise
    else:
        transaction.commit()
        add_task_to_queue(obj_id=stub.id)
        # finish serving request
        ...

もちろん、手動でトランザクションを処理している場合は、開いているトランザクションを常にコミットまたはロールバックしていることに十分注意する必要があります。

于 2013-02-16T04:36:01.567 に答える