0

ファンアウト交換に接続された Kombu ConsumerMixin キューを閉じて、Consumer がアクティブでないときに Publisher からのデータを蓄積しないようにするにはどうすればよいですか?

Python 2.7 で Kombu 3.0.24 (RabbitMQ を使用) を使用しています。

以下は、2 つのクラスのコードです。これらを汎用クラスにして、ダイレクト キューや RPC のようなクエリ/応答に再利用できるようにしたいと考えています。

問題は、コンシューマを停止して再起動すると、古いデータがコンシューマ キューで待機していることです。これは、コンシューマーを停止するときにキューを削除する必要があるためだと思いますが、その方法がわかりません。ありがとう。

MessageConsumer.py

from kombu.mixins import ConsumerMixin
from kombu import Queue, Exchange, Connection
import logging

class MessageConsumer(ConsumerMixin):

    def __init__(self,
                 broker='amqp://',
                 exchange='mExchange',
                 queue = 'mQueue',
                 type='direct',
                 no_ack=False):
        self.connection = Connection(broker)
        self.mExchange = Exchange(exchange, type=type)
        self.mQueue = Queue(queue, self.mExchange)
        self.mQueue.no_ack = no_ack

    def get_consumers(self, Consumer, channel):
        return [Consumer(queues=self.mQueue,
                         accept=['json'],
                         callbacks=[self.process_task])]

    def process_task(self, body, message):
        logging.debug('RECEIVED: {}'.format(body))

    def stop(self):
        self.should_stop = True
        self.connection.release()


if __name__ == '__main__':

    mMessageConsumer = MessageConsumer(exchange='sensor_data',
                                       queue='rx1_queue',
                                       type='fanout',
                                       no_ack=True)
    try:
        mMessageConsumer.run()
    except KeyboardInterrupt:
        mMessageConsumer.stop()

MessagePublisher.py

from kombu import Queue, Exchange, Connection
from kombu.pools import producers
import logging

class MessagePublisher(object):

    def __init__(self,
                 broker='amqp://',
                 exchange='mExchange',
                 type='direct',
                 no_ack=False):
        self.connection = Connection(broker)
        self.mExchange = Exchange(exchange, type=type)

    def publish(self, message, serializer='json', compression=None):
        with producers[self.connection].acquire(block=True) as producer:
            producer.publish(message,
                             serializer=serializer,
                             compression=compression,
                             exchange=self.mExchange,
                             declare=[self.mExchange]
                             )

    def close(self):
        self.connection.release()

if __name__ == '__main__':
    mMessagePublisher = MessagePublisher(type='fanout',exchange='sensor_data')
    x=0
    while True:
        x += 1
        mMessagePublisher.publish(x)
    mMessagePublisher.close()

これをコーディングするためのより効率的な方法がある場合は、それを提案してください。グーグルで見つけた例のほとんどは古いバージョンの Kombu を使用しているため、3.0.24 での最適な実装を見つけるのは困難です。

4

1 に答える 1

1

解決策を見つけました。「exclusive=True」でキューを作成する必要がありました。次に、プログラムが使用している間だけキューが存在します。具体的には:

self.mQueue = Queue(queue, self.mExchange, exclusive=True)
于 2015-03-13T16:39:03.533 に答える