このワークフローでタスクを処理できるバッチを作成する必要があります。
| task 4
| task 3 ->| task 4
| task 2 -> | task 4
| task 3 ->| task 4
input -> task 1 ->
| task 2 -> ...
- タスク #1 は入力データを処理し、リストのリストを返します。
- タスク #2 はタスク #1 からリストを受け取り、リストのリストも返します。
- タスク #3 は、タスク #2 からリストを受け取り、リストのリストも返します。
- タスク #4 は、タスク #4 からリストを受け取り、リスト内のデータを処理します。
たとえば、タスク #1 は を返します[[],[],[],[]]
。これは、フローが 4 つのタスク #2 を並行して実行する必要があることを意味します。各タスク #2 が返されます[[],[],[]]
。ここで、4x3 タスク #3 が必要です。その後、タスク #3 が返されます[[],[]]
。最後に、フローは 4x3x2 のタスク #4 を実行する必要があります。
Prefect Flow を使用することは可能ですか? マッピング機能を使用してみましたが、複雑なワークフロー スキーマをサポートしていないようです (または、適切に使用していない可能性があります)。
with Flow('test') as flow:
res1 = task1()
res2 = task2.map(res1)
res3 = task3.map(res2)
res4 = task4.map(res3)
フロー task1 を実行すると、正しい数のリストが返されます。次に、フローは 4 つの task2 を作成し、それぞれが 3 つのリストのリストを返します。しかし、12 個の task3 を作成する代わりに、フローはそのうちの 4 つだけを作成します。各 task3 は、task2 からの 1 つのリストではなく、task1 で作成された 4 つのリストのリストを受け取ります。
このようなワークフローを作成する方法について何かアイデアはありますか?