23

いくつかのスレッドでメッセージを処理したいのですが、このコードの実行中にエラーが発生します:

from __future__ import with_statement
import pika
import sys
from pika.adapters.blocking_connection import BlockingConnection
from pika import connection, credentials
import time
import threading
import random
from pika.adapters.select_connection import SelectConnection
from pika.connection import Connection
import traceback


def doWork(body, args, channel):


    r = random.random()
    time.sleep(r * 10)
    try:        
        channel.basic_ack(delivery_tag=args.delivery_tag)

    except :
        traceback.print_exc()


auth = credentials.PlainCredentials(username="guest", password="guest")
params = connection.ConnectionParameters(host="localhost", credentials=auth)
conn = BlockingConnection(params)
channel = conn.channel()


while True:

    time.sleep(0.03)    
    try:

        method_frame, header_frame, body = channel.basic_get(queue="test_queue")
        if method_frame.NAME == 'Basic.GetEmpty':
            continue        

        t = threading.Thread(target=doWork, args=[body, method_frame, channel])
        t.setDaemon(True)
        t.start()

    except Exception, e:
        traceback.print_exc()
        continue

エラーの説明:

トレースバック (最新の呼び出しが最後):
  ファイル「C:\work\projects\mq\start.py」の 43 行目
    method_frame、header_frame、body = channel.basic_get(queue="test_queue")
  ファイル「C:\work\projects\mq\libs\pika\adapters\blocking_connection.py」、318 行目、basic_get 内
    self.basic_get_(self, self._on_basic_get, チケット, キュー, no_ack)
  ファイル「C:\work\projects\mq\libs\pika\channel.py」、469 行目、basic_get 内
    no_ack=no_ack))
  ファイル「C:\work\projects\mq\libs\pika\adapters\blocking_connection.py」、244 行目、send_method 内
    self.connection.process_data_events()
  ファイル「C:\work\projects\mq\libs\pika\adapters\blocking_connection.py」、94 行目、process_data_events 内
    self._handle_read()
  ファイル「C:\work\projects\mq\libs\pika\adapters\base_connection.py」、162 行目、_handle_read 内
    self._on_data_available(データ)
  ファイル「C:\work\projects\mq\libs\pika\connection.py」、589 行目、_on_data_available 内
    フレーム) # Args
  ファイル「C:\work\projects\mq\libs\pika\callback.py」、124 行目、処理中
    callback(*引数、**キーワード)
  ファイル「C:\work\projects\mq\libs\pika\adapters\blocking_connection.py」、269 行目、_on_remote_close
    frame.method.reply_text)
AMQPChannelError: (406、「PRECONDITION_FAILED - 不明な配信タグ 204」)

バージョン: pika 0.9.5、rabbitMQ 2.6.1

4

7 に答える 7

46

問題はおそらく、次のno_ack=Trueように設定していることです。

consumer_tag = channel.basic_consume(
    message_delivery_event,
    no_ack=True,
    queue=queue,
)

そして、メッセージを確認します:

channel.basic_ack(delivery_tag=args.delivery_tag)

承認するかどうかを選択し、正しい消費パラメーターを設定する必要があります。

于 2014-04-07T10:09:09.580 に答える
13

私にとっては、キューに ACK を送信しないことを伝え、それから ACK を送信しただけです。

例:間違っている:

channel.basic_consume(callback, queue=queue_name, no_ack=True)

そして、私のコールバックで:

def callback(ch, method, properties, body):
  # do stuff
  ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback, queue=queue_name, no_ack=False)

要点: 手動で ack する場合は、no_ack=False を設定します。

ドキュメントから:

no_ack: (bool) True に設定すると、自動確認モードが使用されます ( http://www.rabbitmq.com/confirms.htmlを参照) 。

于 2015-09-17T09:02:17.520 に答える
2

修正はありませんが、BlockingConnection アダプターを使用して発生することを確認できます。

channel.basic_recover() に応答して再配信されているメッセージを確認または拒否するときに、一貫して発生します。

pika 0.9.5、rabbitMQ 2.2.0、python 2.7、Erlang R14B01

私が実施している回避策は、常にdeliver_tag=0を指定することです

これは、あなたが ack/nacking しているメッセージが (ストリーム内で) 最後に読んだメッセージである場合にのみ機能すると思われます。私が書いているライブラリは、それぞれが独立して確認できるようにメッセージを抽象化しますが、これはこの解決策とは関係ありません。

これが修正されているか、ピカチームの誰かによって承認されているかどうかを確認できますか? または、それは RabbitMQ の問題でしょうか?

于 2012-02-23T19:22:01.957 に答える
0

RabbitMQを見た後- 新しいバージョンにアップグレードし、「PRECONDITION_FAILED 不明な配信タグ 1」を大量に取得しました

basic-consume を次のように変更しました。

    consumer_tag = channel.basic_consume(
        message_delivery_event,
        no_ack=True,
        queue=queue,
    )

これにより、メッセージの配信タグが指定されたときに、最初の (再配信されない) 確認応答で説明されているエラーが発生するという影響がありました。配信は、メッセージ配信のメソッド構造から抽出されました。

使用する

    channel.basic_ack(delivery_tag=0)

この場合もエラーを抑制します

http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/2011-July/013664.htmlを見ると、RabbitMQ の問題であるかのように見えます。

于 2012-02-23T19:44:40.290 に答える