Celeryでキャンバス ジョブを実行した後にグラフを作成する方法に関するドキュメントがあります。ただし、ジョブを実行する前にグラフを生成したいと思います。
簡単なチェーンを作成したとします。
c = chain(add.s(1, 2), mul(4))
チェーンのグラフを生成するにはどうすればよいですか?
ありがとう、
ミキ
Celeryでキャンバス ジョブを実行した後にグラフを作成する方法に関するドキュメントがあります。ただし、ジョブを実行する前にグラフを生成したいと思います。
簡単なチェーンを作成したとします。
c = chain(add.s(1, 2), mul(4))
チェーンのグラフを生成するにはどうすればよいですか?
ミキ
私も全く同じ欲求を持っていました。ジョブを実行する前にグラフを生成します。だから私はそれに少し取り組んだ:)
セロリはそれを許可していないようです。その理由 (少なくともそれを実行しようとしたときに私が理解したこと) は、グラフでは各ノードに一意の名前を付ける必要があるためです。キャンバスが実行されると、この一意の名前はセロリの task_id になりますが、実行前はそのような区別を可能にするものは何もありません。
したがって、解決策は、このグラフを自分で生成し、もちろん各ノードを一意に識別することです (このため、カウンターが機能します)。
これがこの関数の仕事です:
# -*- coding: utf-8 -*-
from celery.canvas import chain, group, Signature
def analyze_canvas(canvas):
return _analyze_canvas(canvas)['dependencies']
def _analyze_canvas(canvas, previous=[], i=0):
dependencies = []
if isinstance(canvas, chain):
for t in canvas.tasks:
if not (isinstance(t, group) or isinstance(t, chain)):
n = str(t) + " - (" + str(i) + ")"
i += 1
dependencies.append((n, previous))
previous = [n]
else:
analysis = _analyze_canvas(t, previous, i)
dependencies.extend(analysis['dependencies'])
previous = analysis['previous']
elif isinstance(canvas, group):
new_previous = []
for t in canvas.tasks:
if not (isinstance(t, group) or isinstance(t, chain)):
n = str(t) + " - (" + str(i) + ")"
i += 1
dependencies.append((n, previous))
new_previous.append(n)
else:
analysis = _analyze_canvas(t, previous, i)
dependencies.extend(analysis['dependencies'])
new_previous = analysis['previous']
previous = new_previous
elif isinstance(canvas, Signature):
n = str(t) + " - (" + str(i) + ")"
i += 1
dependencies.append((n, previous))
previous = [n]
return {"dependencies": dependencies,
"previous": previous}
キャンバスの依存関係グラフを生成します。アイデアは、キャンバスの他のタスクを反復し、グループ/チェーン/署名を特定して適切な依存関係を生成することです。
この時点から、いくつかのセロリ ユーティリティを使用してドット ファイルを生成できます。ここに小さな使用例があります:
from celery_util import analyze_canvas
from celery.datastructures import DependencyGraph
from celery import Celery, group
app = Celery()
@app.task
def t1():
pass
@app.task
def t2():
pass
canvas = t1.si() | t2.si() | group(t1.si(), t1.si(), t2.si()) | t2.si()
d = analyze_canvas(canvas)
dg = DependencyGraph(it=d)
pipo = open("pipo.dot", "w+")
dg.to_dot(pipo)
この例では、ダミーのタスクを宣言し、きれいなキャンバスにチェーン/グループ化するだけです。私は celery util を使用DependencyGraph
して、オブジェクト表現と、メソッドで行うグラフをドットでダンプする機能を備えていますto_dot
。
そして美しい結果は次のとおりです。