私が直面している現在の問題は、非循環依存グラフで実行する必要があるタスクによってそれぞれ処理および更新する必要があるドキュメントが MongoDB コレクションにあることです。上流のタスクがドキュメントの処理に失敗した場合、そのドキュメントは前提条件の情報で更新されていないため、従属タスクはそのドキュメントを処理できません。
Airflow を使用する場合、次の 2 つの解決策があります。
ドキュメントごとに DAG をトリガーし、ドキュメント ID を で渡します
--conf
。これに関する問題は、これが Airflow の意図された使用方法ではないことです。スケジュールされたプロセスを実行することは決してなく、ドキュメントがコレクションにどのように表示されるかに基づいて、1 日あたり 1440 の Dagrun を作成します。その期間のコレクションで作成されたすべてのドキュメントを処理するために、その期間ごとに DAG を実行します。これは Airflow が期待される動作方法に従いますが、問題は、タスクが 1 つのドキュメントの処理に失敗した場合、依存するタスクが他のドキュメントを処理できないことです。また、ドキュメントがタスクによって処理されるのに他のドキュメントよりも時間がかかる場合、それらの他のドキュメントは、その単一のドキュメントが DAG を続行するのを待っています。
エアフローよりも良い方法はありますか? または、現在見ている 2 つの方法よりも Airflow でこれを処理するためのより良い方法はありますか?