私のテストでは、複数のキューをラウンド ロビン スタイルで処理します。
このテスト コードを使用する場合:
from celery import task
import time
@task
def my_task(item_id):
time.sleep(0.5)
print('Processing item "%s"...' % item_id)
def add_items_to_queue(queue_name, items_count):
for i in xrange(0, items_count):
my_task.apply_async(('%s-%d' % (queue_name, i),), queue=queue_name)
add_items_to_queue('queue1', 10)
add_items_to_queue('queue2', 10)
add_items_to_queue('queue3', 5)
そして、(django-celeryを使用して)キューを開始します:
`manage.py celery worker -Q queue1,queue2,queue3`
以下を出力します。
Processing item "queue1-0"...
Processing item "queue3-0"...
Processing item "queue2-0"...
Processing item "queue1-1"...
Processing item "queue3-1"...
Processing item "queue2-1"...
Processing item "queue1-2"...
Processing item "queue3-2"...
Processing item "queue2-2"...
Processing item "queue1-3"...
Processing item "queue3-3"...
Processing item "queue2-3"...
Processing item "queue1-4"...
Processing item "queue3-4"...
Processing item "queue2-4"...
Processing item "queue1-5"...
Processing item "queue2-5"...
Processing item "queue1-6"...
Processing item "queue2-6"...
Processing item "queue1-7"...
Processing item "queue2-7"...
Processing item "queue1-8"...
Processing item "queue2-8"...
Processing item "queue1-9"...
Processing item "queue2-9"...
そのため、すべての queue1 タスクがqueue2 & 3 タスクの前に 公開されていたとしても、次の queue1 アイテムに進む前に各キューから 1 つのアイテムをプルします。
注: @WarLord が指摘したように、この正確な動作は がCELERYD_PREFETCH_MULTIPLIER
1 に設定されている場合にのみ機能します。1 より大きい場合、アイテムがキューからバッチでフェッチされることを意味します。したがって、PREFETCH_MULTIPLIER が 4 に設定された 4 つのプロセスがある場合、キューからすぐに 16 個のアイテムが取り出されることを意味するため、上記のように正確な出力は得られませんが、それでも大まかにラウンド ロビンに従います。 .