新しい SO ドキュメンテーション プロジェクトのために Kombu をドキュメント化していたときに、この問題に遭遇しました。
Consumer Mixinの次の Kombu コードを考えてみます。
from kombu import Connection, Queue
from kombu.mixins import ConsumerMixin
from kombu.exceptions import MessageStateError
import datetime
# Send a message to the 'test_queue' queue
with Connection('amqp://guest:guest@localhost:5672//') as conn:
with conn.SimpleQueue(name='test_queue') as queue:
queue.put('String message sent to the queue')
# Callback functions
def print_upper(body, message):
print body.upper()
message.ack()
def print_lower(body, message):
print body.lower()
message.ack()
# Attach the callback function to a queue consumer
class Worker(ConsumerMixin):
def __init__(self, connection):
self.connection = connection
def get_consumers(self, Consumer, channel):
return [
Consumer(queues=Queue('test_queue'), callbacks=[print_even_characters, print_odd_characters]),
]
# Start the worker
with Connection('amqp://guest:guest@localhost:5672//') as conn:
worker = Worker(conn)
worker.run()
コードは次のように失敗します。
kombu.exceptions.MessageStateError: Message already acknowledged with state: ACK
print_even_characters()
メッセージがとで 2 回 ACK されたためprint_odd_characters()
です。
機能する簡単な解決策は、最後のコールバック関数のみに ACK を送信することですが、他のキューまたは接続で同じ関数を使用したい場合、モジュール性が損なわれます。
複数のコールバック関数に送信されるキューに入れられた Kombu メッセージを ACK する方法は?