8

DjangoとCeleryを使用していて、複数のキューへのルーティングを設定しようとしています。routing_keyタスクとexchange(タスクデコレータまたはを使用して)を指定するとapply_async()、タスクはブローカー(MySQLデータベースに接続しているKombu)に追加されません。

タスクデコレータでキュー名を指定すると(ルーティングキーが無視されることを意味します)、タスクは正常に機能します。ルーティング/交換の設定に問題があるようです。

問題が何であるかについて何か考えはありますか?

設定は次のとおりです。

settings.py

INSTALLED_APPS = (
    ...
    'kombu.transport.django',
    'djcelery',
)
BROKER_BACKEND = 'django'
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE = "tasks"
CELERY_DEFAULT_EXCHANGE_TYPE = "topic"
CELERY_DEFAULT_ROUTING_KEY = "task.default"
CELERY_QUEUES = {
    'default': {
        'binding_key':'task.#',
    },
    'i_tasks': {
        'binding_key':'important_task.#',
    },
}

tasks.py

from celery.task import task

@task(routing_key='important_task.update')
def my_important_task():
    try:
        ...
    except Exception as exc:
        my_important_task.retry(exc=exc)

タスクを開始します:

from tasks import my_important_task
my_important_task.delay()
4

1 に答える 1

48

Django ORM をブローカーとして使用しています。これは、宣言がメモリにのみ格納されることを意味します ( http://readthedocs.org/docs/kombu/en/latest/introduction.htmlにあるトランスポートの比較表を参照してください。 #輸送比較)

したがって、routing_key を使用important_task.updateしてこのタスクを適用すると、キューがまだ宣言されていないため、ルーティングできません。

これを行うとうまくいきます:

@task(queue="i_tasks", routing_key="important_tasks.update")
def important_task():
    print("IMPORTANT")

ただし、「トピック」交換を使用する必要があることを示すものは何もないため、自動ルーティング機能を使用する方がはるかに簡単です。自動ルーティングを使用するには、設定を削除するだけです:

  • CELERY_DEFAULT_QUEUE
  • CELERY_DEFAULT_EXCHANGE
  • CELERY_DEFAULT_EXCHANGE_TYPE
  • CELERY_DEFAULT_ROUTING_KEY
  • CELERY_QUEUES

そして、次のようにタスクを宣言します。

@task(queue="important")
def important_task():
    return "IMPORTANT"

次に、そのキューから消費するワーカーを開始します。

$ python manage.py celeryd -l info -Q important

celeryまたは、デフォルト ( ) キューとキューの両方から消費するにはimportant:

$ python manage.py celeryd -l info -Q celery,important

もう 1 つの良い方法は、キュー名をタスクにハードコーディングせず、CELERY_ROUTES代わりに使用することです。

@task
def important_task():
    return "DEFAULT"

次に、設定で:

CELERY_ROUTES = {"myapp.tasks.important_task": {"queue": "important"}}

それでもトピック交換の使用を主張する場合は、このルーターを追加して、タスクが最初に送信されたときにすべてのキューを自動的に宣言できます。

class PredeclareRouter(object):
    setup = False

    def route_for_task(self, *args, **kwargs):
        if self.setup:
            return
        self.setup = True
        from celery import current_app, VERSION as celery_version
        # will not connect anywhere when using the Django transport
        # because declarations happen in memory.
        with current_app.broker_connection() as conn:
            queues = current_app.amqp.queues
            channel = conn.default_channel
            if celery_version >= (2, 6):
                for queue in queues.itervalues():
                    queue(channel).declare()
            else:
                from kombu.common import entry_to_queue
                for name, opts in queues.iteritems():
                    entry_to_queue(name, **opts)(channel).declare()
CELERY_ROUTES = (PredeclareRouter(), )
于 2012-05-25T14:17:11.837 に答える