2

ブロードキャスト タスクを作成しようとしましたが、呼び出しごとに 1 人のワーカーしか受信しません。助けてくれませんか?(私はrabbitmqとnode-celeryを使用しています)

default_exchange = Exchange('celery', type='direct')
celery.conf.update(
    CELERY_RESULT_BACKEND = "amqp",
    CELERY_RESULT_SERIALIZER='json',
    CELERY_QUEUES = (
        Queue('celery', default_exchange, routing_key='celery'),
        Broadcast('broadcast_tasks'),
    ),
    CELERY_ROUTES = (
        {'my_tasks.sample_broadcast_task': {
            'queue': 'broadcast_tasks',
            }},
        {'my_tasks.sample_normal_task': {
            'queue': 'celery',
            'exchange': 'celery',
            'exchange_type': 'direct',
            'routing_key': 'celery',
            }}
    ),
    )

次の構成もテストしましたが、機能しません。

celery.conf.update(
    CELERY_RESULT_BACKEND = "amqp",
    CELERY_RESULT_SERIALIZER='json',
    CELERY_QUEUES=(
        Queue('celery', Exchange('celery'), routing_key='celery'),
        Broadcast('broadcast'),
    ),
    )
@celery.task(ignore_result=True, queue='broadcast',
             options=dict(queue='broadcast'))
def sample_broadcast_task():
    print "test"

編集

-Qブロードキャストを追加してワーカーの実行方法を変更した後、次のエラーに直面しました:

PreconditionFailed: Exchange.declare: (406) PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'broadcast' in vhost '/': received 'direct' but current is 'fanout'
4

3 に答える 3

2

多くの多くのことを試した後、私は最終的に解決策を見つけます。これは私にとってはうまくいきます。(セロリ 3.1.24 (Cipater) および Python 2.7.12 )

ワーカー - tasks.py :

from celery import Celery
import celery_config
from kombu.common import Broadcast, Queue, Exchange

app = Celery()
app.config_from_object(sysadmin_celery_config)

@app.task
def print_prout(x):
    print x
    return x

ワーカー - celery_config.py :

# coding=utf-8
from kombu.common import Broadcast, Queue, Exchange


BROKER_URL = 'amqp://login:pass@172.17.0.1//'
CELERY_RESULT_BACKEND = 'redis://:login@172.17.0.1'


CELERY_TIMEZONE = 'Europe/Paris'
CELERY_ENABLE_UTC = True

CELERY_TASK_SERIALIZER = 'pickle'
CELERY_RESULT_SERIALIZER = 'pickle'
CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']
CELERY_DISABLE_RATE_LIMITS = True
CELERY_ALWAYS_EAGER = False

CELERY_QUEUES = (Broadcast('broadcast_tasks'), )

ワーカーは次のように起動しました:

celery -A celery_worker.tasks worker --loglevel=info --concurrency=1 -n worker_name_1

クライアント上 (私にとっては別の Docker コンテナー)。

from celery import Celery
from celery_worker import tasks

result = tasks.print_prout.apply_async(['prout'], queue='broadcast_tasks')

print result.get()

次のステップは、すべてのワーカーから返された結果を取得して表示する方法です。「print result.get()」は、最後のワーカーの結果のみを返すようです。明らかではないようです ( Celery ブロードキャストですべてのワーカーから結果を返すようにします) 。

于 2016-10-14T08:14:21.100 に答える
1

あなたの説明によると:

ブロードキャスト タスクを作成しようとしましたが、呼び出しごとに 1 人のワーカーしか受信しません

直接型交換を使用している可能性があります。

于 2016-01-05T06:07:53.790 に答える