1

新しい SO ドキュメンテーション プロジェクトのために Kombu をドキュメント化していたときに、この問題に遭遇しました。

Consumer Mixinの次の Kombu コードを考えてみます。

from kombu import Connection, Queue
from kombu.mixins import ConsumerMixin
from kombu.exceptions import MessageStateError
import datetime

# Send a message to the 'test_queue' queue
with Connection('amqp://guest:guest@localhost:5672//') as conn:
    with conn.SimpleQueue(name='test_queue') as queue:
        queue.put('String message sent to the queue')


# Callback functions
def print_upper(body, message):
    print body.upper()
    message.ack()    

def print_lower(body, message):
    print body.lower()
    message.ack()


# Attach the callback function to a queue consumer 
class Worker(ConsumerMixin):
    def __init__(self, connection):
        self.connection = connection

    def get_consumers(self, Consumer, channel):
        return [
            Consumer(queues=Queue('test_queue'), callbacks=[print_even_characters, print_odd_characters]),
        ]

# Start the worker
with Connection('amqp://guest:guest@localhost:5672//') as conn:
    worker = Worker(conn)
    worker.run()

コードは次のように失敗します。

kombu.exceptions.MessageStateError: Message already acknowledged with state: ACK

print_even_characters()メッセージがとで 2 回 ACK されたためprint_odd_characters()です。

機能する簡単な解決策は、最後のコールバック関数のみに ACK を送信することですが、他のキューまたは接続で同じ関数を使用したい場合、モジュール性が損なわれます。

複数のコールバック関数に送信されるキューに入れられた Kombu メッセージを ACK する方法は?

4

1 に答える 1

1

ソリューション

1 - チェック中message.acknowledged

このmessage.acknowledgedフラグは、メッセージがすでに ACK されているかどうかをチェックします。

def print_upper(body, message):
    print body.upper()
    if not message.acknowledged: 
        message.ack()


def print_lower(body, message):
    print body.lower()
    if not message.acknowledged: 
        message.ack()

長所: 読みやすく、短い。

短所: Python EAFP 慣用句を破ります。

2 - 例外のキャッチ

def print_upper(body, message):
    print body.upper()
    try:
        message.ack()
    except MessageStateError:
        pass


def print_lower(body, message):
    print body.lower()
    try:
        message.ack()
    except MessageStateError:
        pass

長所:読みやすく、Pythonic です。

短所:少し長い - コールバックごとに 4 行のボイラープレート コード。

3 - 最後のコールバックの ACKing

ドキュメントは、コールバックが順番に呼び出されることを保証します。したがって、単純.ack()に最後のコールバックのみを実行できます。

def print_upper(body, message):
    print body.upper()


def print_lower(body, message):
    print body.lower()
    message.ack()

長所:短く、読みやすく、ボイラープレート コードがありません。

短所:モジュラーではない: 最後のコールバックが常に最後でない限り、コールバックを別のキューで使用することはできません。この暗黙の仮定により、呼び出し元のコードが壊れる可能性があります。

これは、コールバック関数をWorkerクラスに移動することで解決できます。一部のモジュール性を放棄します。これらの関数は外部から呼び出されませんが、安全性と可読性は向上します。

概要

1 と 2 の違いは、単にスタイルの問題です。

解決策 3 は、実行順序が重要な場合、およびすべてのコールバックを正常に通過する前にメッセージに ACK を送信する必要がないかどうかを選択する必要があります。

1 つまたは複数のコールバックが失敗した場合でも、メッセージを常に ACK する必要がある場合は、1 または 2 を選択する必要があります。

他にも可能な設計があることに注意してください。この回答は、ワーカーの外部にあるコールバック関数を参照しています。

于 2016-08-22T05:55:05.570 に答える