現在、セロリ ワーカーにカスタム コンシューマーを追加することは困難ですが、開発バージョン (3.1 になる予定) ではこれが変更されており、コンシューマー ブートステップのサポートが追加されています。
実装を終えたばかりなので、まだドキュメントはありませんが、例を次に示します。
from celery import Celery
from celery.bin import Option
from celery.bootsteps import ConsumerStep
from kombu import Consumer, Exchange, Queue
class CustomConsumer(ConsumerStep):
queue = Queue('custom', Exchange('custom'), routing_key='custom')
def __init__(self, c, enable_custom_consumer=False, **kwargs):
self.enable = self.enable_custom_consumer
def get_consumers(self, connection):
return [
Consumer(connection.channel(),
queues=[self.queue],
callbacks=[self.on_message]),
]
def on_message(self, body, message):
print('GOT MESSAGE: %r' % (body, ))
message.ack()
celery = Celery(broker='amqp://localhost//')
celery.steps['consumer'].add(CustomConsumer)
celery.user_options['worker'].add(
Option('--enable-custom-consumer', action='store_true',
help='Enable our custom consumer.'),
)
最終バージョンでは API が変更される可能性があることに注意してくださいget_consumer(connection)
。現在、コンシューマのチャネルは、接続が失われたときとシャットダウン時に閉じられますが、チャネルを手動で処理したい場合があります。その場合、ConsumerStep をカスタマイズするか、新しい StartStopStep を作成する可能性が常にあります。