28

次のようなものがあるとき

group1 = group(task1.si(), task1.si(), task1.si())
group2 = group(task2.si(), task2.si(), task2.si())

workflow = chain(group1, group2, task3.si())

直感的な解釈では、タスク 3 はグループ 2 のすべてのタスクが終了した後にのみ実行する必要があります。

実際には、タスク 3 は group1 が開始されたがまだ完了していない間に実行されます。

私は何を間違っていますか?

4

2 に答える 2

24

つまり、セロリでは 2 つのグループを連鎖させることはできません。
これは、タスクで連鎖されたグループが自動的にコードになるためだと思われます
--> Celery docs: http://docs.celeryproject.org/en/latest/userguide/canvas.html

グループを別のタスクと一緒にチェーンすると、自動的にコードにアップグレードされます。

グループは親タスクを返します。2 つのグループを連鎖させると、最初のグループが完了すると、コードがコールバック「タスク」を開始するのではないかと思います。この「タスク」は、実際には 2 番目のグループの「親タスク」であると思われます。さらに、この親タスクは、グループ内のすべてのサブタスクの開始が完了するとすぐに完了し、その結果、2 番目のグループの後の次の項目が実行されるのではないかと考えています。

これを示すために、いくつかのサンプル コードを示します。実行中の celery インスタンスが既に存在している必要があります。

# celery_experiment.py

from celery import task, group, chain, chord
from celery.signals import task_sent, task_postrun, task_prerun

import time
import logging

import random
random.seed()

logging.basicConfig(level=logging.DEBUG)

### HANDLERS ###    
@task_prerun.connect()
def task_starting_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, **kwds):    
    try:
        logging.info('[%s] starting' % kwargs['id'])
    except KeyError:
        pass

@task_postrun.connect()
def task_finished_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, retval=None, state=None, **kwds):
    try:    
        logging.info('[%s] finished' % kwargs['id'])
    except KeyError:
        pass


def random_sleep(id):
    slp = random.randint(1, 3)
    logging.info('[%s] sleep for %ssecs' % (id, slp))
    time.sleep(slp)

@task()
def thing(id):
    logging.info('[%s] begin' % id)
    random_sleep(id)
    logging.info('[%s] end' % id)


def exec_exp():
    st = thing.si(id='st')
    st_arr = [thing.si(id='st_arr1_a'), thing.si(id='st_arr1_b'), thing.si(id='st_arr1_c'),]
    st_arr2 = [thing.si(id='st_arr2_a'), thing.si(id='st_arr2_b'),]
    st2 = thing.si(id='st2')
    st3 = thing.si(id='st3')
    st4 = thing.si(id='st4')

    grp1 = group(st_arr)
    grp2 = group(st_arr2)

    # chn can chain two groups together because they are seperated by a single subtask
    chn = (st | grp1 | st2 | grp2 | st3 | st4)

    # in chn2 you can't chain two groups together. what will happen is st3 will start before grp2 finishes
    #chn2 = (st | st2 | grp1 | grp2 | st3 |  st4)

    r = chn()
    #r2 = chn2()
于 2013-02-28T22:48:59.127 に答える
19

セロリにも同じ問題があり、最初のステップが「百万のタスクを生成する」ワークフローを作ろうとしています。グループのグループ、サブタスクを試してみましたが、最終的にステップ 1 が終了する前にステップ 2 が開始されました。

簡単に言えば、和音と愚かなフィニッシャーを使用して解決策を見つけたかもしれません。

@celery.task
def chordfinisher( *args, **kwargs ):
  return "OK"

大したことはありませんが、これにより次のことが可能になります。

tasks = []
for id in ids:
    tasks.append( mytask.si( id ) )
step1 = chord( group( tasks ), chordfinisher.si() )

step2 = ...

workflow = chain( step1, step2 )

もともと、サブタスクに step1 を入れたかったのですが、同じ理由で、グループを呼び出すアクションが終了し、タスクが終了したと見なされ、ワー​​クフローが続行されます...

誰かがもっと良いものを持っているなら、私は興味があります!

于 2013-09-26T02:51:05.783 に答える