current_app.send_task を使用している場合、Celery キューのルーティングに問題があります
2 つのワーカーがあります (各キューに 1 つずつ)
python manage.py celery worker -E -Q priority --concurrency=8 --loglevel=DEBUG
python manage.py celery worker -Q low --concurrency=8 -E -B --loglevel=DEBUG
celeryconfig.py ファイルで定義された 2 つのキューがあります。
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.core.exceptions import ImproperlyConfigured
from celery import Celery
from django.conf import settings
try:
app = Celery('proj', broker=getattr(settings, 'BROKER_URL', 'redis://'))
except ImproperlyConfigured:
app = Celery('proj', broker='redis://')
app.conf.update(
CELERY_TASK_SERIALIZER='json',
CELERY_ACCEPT_CONTENT=['json'],
CELERY_RESULT_SERIALIZER='json',
CELERY_RESULT_BACKEND='djcelery.backends.database:DatabaseBackend',
CELERY_DEFAULT_EXCHANGE='tasks',
CELERY_DEFAULT_EXCHANGE_TYPE='topic',
CELERY_DEFAULT_ROUTING_KEY='task.priority',
CELERY_QUEUES=(
Queue('priority',routing_key='priority.#'),
Queue('low', routing_key='low.#'),
),
CELERY_DEFAULT_EXCHANGE='priority',
CELERY_IMPORTS=('mymodule.tasks',)
CELERY_ENABLE_UTC = True
CELERY_TIMEZONE = 'UTC'
if __name__ == '__main__':
app.start()
タスクの定義では、decorator を使用してキューを明示します。
@task(name='mymodule.mytask', routing_key='low.mytask', queue='low')
def mytask():
# does something
pass
このタスクが以下を使用して実行される場合、このタスクは実際に低キューで実行されます。
from mymodule.tasks import mytask
mytask.delay()
ただし、次を使用して実行する場合はそうではありません: (デフォルトのキューで実行されます: "priority")
from celery import current_app
current_app.send_task('mymodule.mytask')
この後の方法でタスクが「低」キューにルーティングされないのはなぜでしょうか。
ps: 私は redis を使用しています。