1

current_app.send_task を使用している場合、Celery キューのルーティングに問題があります

2 つのワーカーがあります (各キューに 1 つずつ)

python manage.py celery worker -E -Q priority --concurrency=8 --loglevel=DEBUG
python manage.py celery worker -Q low --concurrency=8 -E -B --loglevel=DEBUG

celeryconfig.py ファイルで定義された 2 つのキューがあります。

# -*- coding: utf-8 -*-
from __future__ import unicode_literals

from django.core.exceptions import ImproperlyConfigured

from celery import Celery
from django.conf import settings

try:
    app = Celery('proj', broker=getattr(settings, 'BROKER_URL', 'redis://'))
except ImproperlyConfigured:
    app = Celery('proj', broker='redis://')

app.conf.update(
    CELERY_TASK_SERIALIZER='json',
    CELERY_ACCEPT_CONTENT=['json'],
    CELERY_RESULT_SERIALIZER='json',
    CELERY_RESULT_BACKEND='djcelery.backends.database:DatabaseBackend',
    CELERY_DEFAULT_EXCHANGE='tasks',
    CELERY_DEFAULT_EXCHANGE_TYPE='topic',
    CELERY_DEFAULT_ROUTING_KEY='task.priority',
    CELERY_QUEUES=(
        Queue('priority',routing_key='priority.#'),
        Queue('low', routing_key='low.#'),
    ),
    CELERY_DEFAULT_EXCHANGE='priority',
    CELERY_IMPORTS=('mymodule.tasks',)

CELERY_ENABLE_UTC = True
CELERY_TIMEZONE = 'UTC'

if __name__ == '__main__':
    app.start()

タスクの定義では、decorator を使用してキューを明示します。

@task(name='mymodule.mytask', routing_key='low.mytask', queue='low')
def mytask():
    # does something
    pass

このタスクが以下を使用して実行される場合、このタスクは実際に低キューで実行されます。

from mymodule.tasks import mytask
mytask.delay()

ただし、次を使用して実行する場合はそうではありません: (デフォルトのキューで実行されます: "priority")

from celery import current_app
current_app.send_task('mymodule.mytask')

この後の方法でタスクが「低」キューにルーティングされないのはなぜでしょうか。

ps: 私は redis を使用しています。

4

1 に答える 1