3

このワークフローでタスクを処理できるバッチを作成する必要があります。

                                             | task 4
                                  | task 3 ->| task 4
                       | task 2 ->           | task 4
                           
                                  | task 3 ->| task 4
    input ->  task 1 ->
                       | task 2 -> ... 
  • タスク #1 は入力データを処理し、リストのリストを返します。
  • タスク #2 はタスク #1 からリストを受け取り、リストのリストも返します。
  • タスク #3 は、タスク #2 からリストを受け取り、リストのリストも返します。
  • タスク #4 は、タスク #4 からリストを受け取り、リスト内のデータを処理します。

たとえば、タスク #1 は を返します[[],[],[],[]]。これは、フローが 4 つのタスク #2 を並行して実行する必要があることを意味します。各タスク #2 が返されます[[],[],[]]。ここで、4x3 タスク #3 が必要です。その後、タスク #3 が返されます[[],[]]。最後に、フローは 4x3x2 のタスク #4 を実行する必要があります。

Prefect Flow を使用することは可能ですか? マッピング機能を使用してみましたが、複雑なワークフロー スキーマをサポートしていないようです (または、適切に使用していない可能性があります)。

with Flow('test') as flow:
   res1 = task1()
   res2 = task2.map(res1)
   res3 = task3.map(res2)
   res4 = task4.map(res3)

フロー task1 を実行すると、正しい数のリストが返されます。次に、フローは 4 つの task2 を作成し、それぞれが 3 つのリストのリストを返します。しかし、12 個の task3 を作成する代わりに、フローはそのうちの 4 つだけを作成します。各 task3 は、task2 からの 1 つのリストではなく、task1 で作成された 4 つのリストのリストを受け取ります。

このようなワークフローを作成する方法について何かアイデアはありますか?

4

0 に答える 0