13

セロリのドキュメントに従って、開発マシンで 2 つのキューを定義しました。

私のセロリ設定:

CELERY_ALWAYS_EAGER = True
CELERY_TASK_RESULT_EXPIRES = 60  # 1 mins
CELERYD_CONCURRENCY = 2
CELERYD_MAX_TASKS_PER_CHILD = 4
CELERYD_PREFETCH_MULTIPLIER = 1
CELERY_CREATE_MISSING_QUEUES = True
CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('feeds', Exchange('feeds'), routing_key='arena.social.tasks.#'),
)
CELERY_ROUTES = {
    'arena.social.tasks.Update': {
        'queue': 'fs_feeds',
    },
}

プロジェクトの virtualenv で 2 つのターミナル ウィンドウを開き、次のコマンドを実行しました。

terminal_1$ celery -A arena worker -Q default -B -l debug --purge -n deafult_worker
terminal_2$ celery -A arena worker -Q feeds -B -l debug --purge -n feeds_worker

私が得たのは、すべてのタスクが両方のキューで処理されているということです。

私の目標は、定義された 1 つのタスクのみを処理する 1 つのキューと、CELERY_ROUTES他のすべてのタスクを処理するデフォルト キューを持つことです。

また、このSO questionrabbitmqctl list_queuesreturns celery 0、および running rabbitmqctl list_bindingsreturnsexchange celery queue celery []を2回実行しました。ウサギのサーバーを再起動しても何も変わりませんでした。

4

2 に答える 2

26

Ok, so i figured it out. Following is my whole setup, settings and how to run celery, for those who might be wondering about same thing as my question did.

Settings

CELERY_TIMEZONE = TIME_ZONE
CELERY_ACCEPT_CONTENT = ['json', 'pickle']
CELERYD_CONCURRENCY = 2
CELERYD_MAX_TASKS_PER_CHILD = 4
CELERYD_PREFETCH_MULTIPLIER = 1

# celery queues setup
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
CELERY_DEFAULT_ROUTING_KEY = 'default'
CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('feeds', Exchange('feeds'), routing_key='long_tasks'),
)
CELERY_ROUTES = {
    'arena.social.tasks.Update': {
        'queue': 'feeds',
        'routing_key': 'long_tasks',
    },
}

How to run celery?

terminal - tab 1:

celery -A proj worker -Q default -l debug -n default_worker

this will start first worker that consumes tasks from default queue. NOTE! -n default_worker is not a must for the first worker, but is a must if you have any other celery instances up and running. Setting -n worker_name is the same as --hostname=default@%h.

terminal - tab 2:

celery -A proj worker -Q feeds -l debug -n feeds_worker

this will start second worker that consumers tasks from feeds queue. Notice -n feeds_worker, if you are running with -l debug (log level = debug), you will see that both workers are syncing between them.

terminal - tab 3:

celery -A proj beat -l debug

this will start the beat, executing tasks according to the schedule in your CELERYBEAT_SCHEDULE. I didn't have to change the task, or the CELERYBEAT_SCHEDULE.

For example, this is how looks my CELERYBEAT_SCHEDULE for the task that should go to feeds queue:

CELERYBEAT_SCHEDULE = {
    ...
    'update_feeds': {
        'task': 'arena.social.tasks.Update',
        'schedule': crontab(minute='*/6'),
    },
    ...
}

As you can see, no need for adding 'options': {'routing_key': 'long_tasks'} or specifying to what queue it should go. Also, if you were wondering why Update is upper cased, its because its a custom task, which are defined as sub classes of celery.Task.

Update Celery 5.0+

Celery made a couple changes since version 5, here is an updated setup for routing of tasks.

How to create the queues?

Celery can create the queues automatically. It works perfectly for simple cases, where celery default values for routing are ok.

task_create_missing_queues=True or, if you're using django settings and you're namespacing all celery configs under CELERY_ key, CELERY_TASK_CREATE_MISSING_QUEUES=True. Note, that it is on by default.

Automatic scheduled task routing

After configuring celery app:

celery_app.conf.beat_schedule = {
  "some_scheduled_task": {
    "task": "module.path.some_task",
    "schedule": crontab(minute="*/10"),
    "options": {"queue": "queue1"}
  }
}

Automatic task routing

Celery app still has to be configured first and then:

app.conf.task_routes = {
  "module.path.task2": {"queue": "queue2"},
}

Manual routing of tasks

In case and you want to route the tasks dynamically, then when sending the task specify the queue:

from module import task

def do_work():
  # do some work and launch the task
  task.apply_async(args=(arg1, arg2), queue="queue3")

More details re routing can be found here: https://docs.celeryproject.org/en/stable/userguide/routing.html

And regarding calling tasks here: https://docs.celeryproject.org/en/stable/userguide/calling.html

于 2014-04-22T13:59:55.043 に答える
2

受け入れられた回答に加えて、誰かがここに来て、なぜ自分の設定が機能しないのか疑問に思っている場合 (私が先ほど行ったように)、理由は次のとおりです。セロリのドキュメントに設定名が正しくリストされていません。

セロリ 5.0.5 の設定CELERY_DEFAULT_QUEUEでは、CELERY_QUEUES,は代わりに , という名前にCELERY_ROUTESする必要があります。これらは私がテストした設定ですが、交換とルーティング キーにも同じルールが適用されると思います。CELERY_TASK_DEFAULT_QUEUECELERY_TASK_QUEUESCELERY_TASK_ROUTES

于 2021-03-04T22:58:16.707 に答える