5

celleryでアプリケーションを作ろうとしています。少数のワーカーで機能するはずであり、さまざまなワーカーがさまざまなキューから消費しています。私はこのようなものを持っています:

@celery.task
def task1():
    do_something()
    task2.delay()

@celery.task
def task2()
    do_something()

したがって、worker1 で実行されている task1 は、worker2 を消費するキューに送信する task2 を呼び出す必要があります。問題は、それが機能していないことです。AsyncResult の ID を受け取りましたが、このタスクの状態は常に PENDING です。Python コンソールから手動で task2 を呼び出すと、正常に動作します。たぶん私は何か間違ったことをしていて、あるタスクを別のタスクから実行することはできませんか? 後もう一つ。Worker1 は task1 を実行しており、task2 を消費していないキューに送信します - このキューからは worker2 のみを消費しています

4

1 に答える 1

0

これは、あなたが望むものを達成すると思う簡単な例です。

from celery import Celery
import random
import string

celery = Celery('two_q',backend='amqp',broker='amqp://guest@localhost//')

@celery.task
def generate_rand_string(n):
    # n = number of characters
    rand_str = "".join([random.choice(string.lowercase) for i in range(n)])
    #calls the second task and adds it to second queue
    reverse.apply_async((rand_str,),queue="q2")
    print rand_str
    return rand_str

@celery.task
def reverse(s):
    print s[::-1]
    return s[::-1]

generate_rand_string.apply_async((10,), queue="q1")

キューのリストを指定する -Q 引数で呼び出された場合

 celery worker --app=two_q -l info -Q q1,q2

次の出力が生成されます。

pawel@iqmxma82x7:~/py/celery$ celery worker --app=two_q -l info -Q q1,q2

 -------------- celery@iqmxma82x7 v3.0.23 (Chiastic Slide)
---- **** ----- 
--- * ***  * -- Linux-3.2.0-54-generic-pae-i686-with-Ubuntu-12.04-precise
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> broker:      amqp://guest@localhost:5672//
- ** ---------- .> app:         cel_group:0x9bfef8c
- ** ---------- .> concurrency: 4 (processes)
- *** --- * --- .> events:      OFF (enable -E to monitor this worker)
-- ******* ---- 
--- ***** ----- [queues]
 -------------- .> q1:          exchange:q1(direct) binding:q1
                .> q2:          exchange:q2(direct) binding:q2

[Tasks]
  . two_q.generate_rand_string
  . two_q.reverse

[2013-09-15 19:10:35,708: WARNING/MainProcess] celery@iqmxma82x7 ready.
[2013-09-15 19:10:35,716: INFO/MainProcess] consumer: Connected to amqp://guest@127.0.0.1:5672//.
[2013-09-15 19:10:40,731: INFO/MainProcess] Got task from broker: two_q.generate_rand_string[fa2ad56e-c66d-44a9-b908-2d95b2c9e5f3]
[2013-09-15 19:10:40,767: WARNING/PoolWorker-1] jjikjkepkc
[2013-09-15 19:10:40,768: INFO/MainProcess] Got task from broker: two_q.reverse[f52a8247-4674-4183-a826-d73cef1b64d4]
[2013-09-15 19:10:40,770: INFO/MainProcess] Task two_q.generate_rand_string[fa2ad56e-c66d-44a9-b908-2d95b2c9e5f3] succeeded in 0.0217289924622s: 'jjikjkepkc'
[2013-09-15 19:10:40,782: WARNING/PoolWorker-3] ckpekjkijj
[2013-09-15 19:10:40,801: INFO/MainProcess] Task two_q.reverse[f52a8247-4674-4183-a826-d73cef1b64d4] succeeded in 0.0195469856262s: 'ckpekjkijj'

2 つのキュー (q1、q2) と 2 つのワーカーを取得します。

-Q 引数なしで呼び出すか、キューを 1 つだけ指定して呼び出した場合の比較として、次のようにします。

celery worker --app=two_q -l info 

「逆」タスクは呼び出されません。これは、それが追加された q2 がセロリに認識されないためです。

それが役に立てば幸い。

于 2013-09-15T18:18:25.777 に答える