5

Celery では、クエリから取得したアイテムごとに 1 つのサブタスクを実行するメイン タスクを実行しています。サブタスクは並行して実行する必要があります。UI には、合計で実行されたサブタスクの数を示す進行状況バーがあります。主なタスクの状態を更新して、進行状況バーに情報を提供しています。私の問題は、すべてのサブタスクをブローカーにプッシュした直後にメインタスクが終了したため、ブローカーの状態を更新できなくなったことです。すべてのサブタスクが完了するまで、メイン タスクを待機できることを望みます。出来ますか?他の解決策はありますか?これが私の疑似コードです(実際のコードはグローバルを使用しません;-))。

total = 0
done = 0

@task(ignore_result=True)
def copy_media(path):
    global total, done
    copy_media.update_state(state=STARTED, meta={'total': total, 'done': done})
    documents = Document.objects.all()
    total = documents.count()
    copy_media.update_state(state=STARTED, meta={'total': total, 'done': done})
    for document in documents:
        process_doc.delay(document, path, copy_media)

@task(ignore_result=True)
def process_doc(document, path, copy_media):
    global total, done
    # Do some stuff
    done += 1
    copy_media.update_state(state=STARTED, meta={'total': total, 'done': done})
4

3 に答える 3

3

を使用する方法を見つけましたTaskSet。しかし、サブタスクの結果を無視できないため、完全に満足しているわけではありません。process_docタスクの結果を無視すると、results.ready()常に return Falseresults.completed_count()常に 0 などを返します。コードは次のとおりです。

@task(ignore_result=True)
def copy_media(path):
    copy_media.update_state(state=STARTED, meta={'total': total, 'done': done})
    documents = Document.objects.all()
    total = documents.count()
    copy_media.update_state(state=STARTED, meta={'total': total, 'done': done})
    job = TaskSet(tasks=[process_doc.subtask((document, path))
                         for document in documents])
    results = job.apply_async()
    doc_name = ''
    while not results.ready():
        done = results.completed_count()
        if done:
            last = done - 1
            for idx in xrange(last, -1, -1):
                if results[idx].ready():
                    doc_name = results[idx].result
                    break
        copy_media.update_state(state=STARTED, meta={'total': total, 'done': done, 'doc-name': doc_name})
        time.sleep(0.25)

@task()
def process_doc(document, path):
    # Do some stuff
    return document
于 2012-04-06T21:30:37.510 に答える
0

memcached-backed キャッシングを使用して、完了したタスクの数を保存できます。アトミックインクリメント用のdjangoキャッシュAPIにもありcache.inrc、カウントの同時更新によって問題が発生しないようにします。

また、すべてのサブタスクが完了するまでメイン タスクの実行を保持することは、基本的にセロリ ワーカーの 1 つを長時間ブロックしているため、悪い考えです。celery が 1 つのワーカー プロセスで実行されると、無限のロックが発生します。

于 2012-04-06T21:37:15.647 に答える
-1

実行しているセロリのバージョンはわかりませんが、グループサブタスク (3.0 の新機能) を見ることができます。

于 2012-10-10T13:45:53.033 に答える