4

Celeryでキャンバス ジョブを実行した後にグラフを作成する方法に関するドキュメントがあります。ただし、ジョブを実行するにグラフを生成したいと思います。

簡単なチェーンを作成したとします。

c = chain(add.s(1, 2), mul(4))

チェーンのグラフを生成するにはどうすればよいですか?

ありがとう、

ミキ

4

2 に答える 2

4

私も全く同じ欲求を持っていました。ジョブを実行する前にグラフを生成します。だから私はそれに少し取り組んだ:)

セロリはそれを許可していないようです。その理由 (少なくともそれを実行しようとしたときに私が理解したこと) は、グラフでは各ノードに一意の名前を付ける必要があるためです。キャンバスが実行されると、この一意の名前はセロリの task_id になりますが、実行前はそのような区別を可能にするものは何もありません。

したがって、解決策は、このグラフを自分で生成し、もちろん各ノードを一意に識別することです (このため、カウンターが機能します)。

これがこの関数の仕事です:

# -*- coding: utf-8 -*-
from celery.canvas import chain, group, Signature


def analyze_canvas(canvas):
    return _analyze_canvas(canvas)['dependencies']


def _analyze_canvas(canvas, previous=[], i=0):
    dependencies = []

    if isinstance(canvas, chain):
        for t in canvas.tasks:
            if not (isinstance(t, group) or isinstance(t, chain)):
                n = str(t) + " - (" + str(i) + ")"
                i += 1
                dependencies.append((n, previous))
                previous = [n]
            else:
                analysis = _analyze_canvas(t, previous, i)
                dependencies.extend(analysis['dependencies'])
                previous = analysis['previous']
    elif isinstance(canvas, group):
        new_previous = []
        for t in canvas.tasks:
            if not (isinstance(t, group) or isinstance(t, chain)):
                n = str(t) + " - (" + str(i) + ")"
                i += 1
                dependencies.append((n, previous))
                new_previous.append(n)
            else:
                analysis = _analyze_canvas(t, previous, i)
                dependencies.extend(analysis['dependencies'])
                new_previous = analysis['previous']
        previous = new_previous
    elif isinstance(canvas, Signature):
        n = str(t) + " - (" + str(i) + ")"
        i += 1
        dependencies.append((n, previous))
        previous = [n]
    return {"dependencies": dependencies,
            "previous": previous}

キャンバスの依存関係グラフを生成します。アイデアは、キャンバスの他のタスクを反復し、グループ/チェーン/署名を特定して適切な依存関係を生成することです。

この時点から、いくつかのセロリ ユーティリティを使用してドット ファイルを生成できます。ここに小さな使用例があります:

from celery_util import analyze_canvas
from celery.datastructures import DependencyGraph
from celery import Celery, group

app = Celery()

@app.task
def t1():
    pass

@app.task
def t2():
    pass

canvas = t1.si() | t2.si() | group(t1.si(), t1.si(), t2.si()) | t2.si()

d = analyze_canvas(canvas)
dg = DependencyGraph(it=d)
pipo = open("pipo.dot", "w+")
dg.to_dot(pipo)

この例では、ダミーのタスクを宣言し、きれいなキャンバスにチェーン/グループ化するだけです。私は celery util を使用DependencyGraphして、オブジェクト表現と、メソッドで行うグラフをドットでダンプする機能を備えていますto_dot

そして美しい結果は次のとおりです。 タスク グラフ

于 2015-03-17T17:23:01.207 に答える