10

いくつかの Java アプリケーションが送信ログ メッセージをさまざまなチャネルの文字列 JSON オブジェクトとして使用している既存の RabbitMQ 展開があります。Celery を使用してこれらのメッセージを消費し、さまざまな場所 (DB、Hadoop など) に書き込みたいと考えています。

Celery は、RabbitMQ メッセージのプロデューサーとコンシューマーの両方になるように設計されていることがわかります。これは、これらのメッセージが配信されるメカニズムを隠そうとするためです。Celery に別のアプリによって作成されたメッセージを消費させ、到着時にジョブを実行させる方法はありますか?

4

1 に答える 1

15

現在、セロリ ワーカーにカスタム コンシューマーを追加することは困難ですが、開発バージョン (3.1 になる予定) ではこれが変更されており、コンシューマー ブートステップのサポートが追加されています。

実装を終えたばかりなので、まだドキュメントはありませんが、例を次に示します。

from celery import Celery
from celery.bin import Option
from celery.bootsteps import ConsumerStep
from kombu import Consumer, Exchange, Queue

class CustomConsumer(ConsumerStep):
   queue = Queue('custom', Exchange('custom'), routing_key='custom')

   def __init__(self, c, enable_custom_consumer=False, **kwargs):
       self.enable = self.enable_custom_consumer

   def get_consumers(self, connection):
       return [
           Consumer(connection.channel(),
               queues=[self.queue],
               callbacks=[self.on_message]),
       ]

   def on_message(self, body, message):
       print('GOT MESSAGE: %r' % (body, ))
       message.ack()


celery = Celery(broker='amqp://localhost//')
celery.steps['consumer'].add(CustomConsumer)
celery.user_options['worker'].add(
    Option('--enable-custom-consumer', action='store_true',
           help='Enable our custom consumer.'),
)

最終バージョンでは API が変更される可能性があることに注意してくださいget_consumer(connection)。現在、コンシューマのチャネルは、接続が失われたときとシャットダウン時に閉じられますが、チャネルを手動で処理したい場合があります。その場合、ConsumerStep をカスタマイズするか、新しい StartStopStep を作成する可能性が常にあります。

于 2012-10-02T12:09:28.570 に答える