5

私はで働いていapache airflow 1.8.0ます。

これは、ジョブを実行したときbackfillの出力です。

[2017-04-13 09:42:55,857] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:43:00     [scheduled]>
[2017-04-13 09:42:55,857] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:43:00 [scheduled]>
[2017-04-13 09:42:55,857] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:45:00 [scheduled]>
[2017-04-13 09:42:55,858] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:45:00 [scheduled]>
[2017-04-13 09:42:55,858] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:43:00 [scheduled]>
[2017-04-13 09:42:55,858] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:43:00 [scheduled]>
[2017-04-13 09:42:55,858] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:44:00 [scheduled]>
[2017-04-13 09:42:55,858] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:44:00 [scheduled]>
[2017-04-13 09:42:55,864] {models.py:1120} INFO - Dependencies not met for <TaskInstance: example_bash_operator.run_after_loop 2017-04-13 13:44:00 [scheduled]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 3 non-success(es). upstream_tasks_state={'skipped': Decimal('0'), 'successes': Decimal('0'), 'done': 0, 'upstream_failed': Decimal('0'), 'failed': Decimal('0')}, upstream_task_ids=['runme_0', 'runme_1', 'runme_2']

スケジュールしようとすると、DAGエラーがスローされます。

Traceback (most recent call last):
  File "/anaconda3/bin/airflow", line 28, in <module>
    args.func(args)
  File "/anaconda3/lib/python3.5/site-packages/airflow/bin/cli.py", line 167, in backfill
    pool=args.pool)
  File "/anaconda3/lib/python3.5/site-packages/airflow/models.py", line 3330, in run
    job.run()
  File "/anaconda3/lib/python3.5/site-packages/airflow/jobs.py", line 200, in run
    self._execute()
  File "/anaconda3/lib/python3.5/site-packages/airflow/jobs.py", line 2021, in _execute
    raise AirflowException(err)
airflow.exceptions.AirflowException: ---------------------------------------------------

タスクに関する出力は次のとおりです。

BackfillJob is deadlocked. These tasks have succeeded:
set()
 These tasks have started:
{}
 These tasks have failed:
set()
 These tasks are skipped:
set()
 These tasks are deadlocked:
{<TaskInstance: example_bash_operator.runme_0 2017-04-13 13:44:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:44:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_0 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:44:00 [scheduled]>, <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_0 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.run_this_last 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.run_after_loop 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.run_after_loop 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.run_this_last 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.run_this_last 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_0 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.run_after_loop 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:44:00 [scheduled]>, <TaskInstance: example_bash_operator.run_after_loop 2017-04-13 13:44:00 [scheduled]>, <TaskInstance: example_bash_operator.run_this_last 2017-04-13 13:44:00 [scheduled]>}

Python 2.7およびPython 3.5でテスト済み

SequentialExecutorLocalExecutorを使用

PS。現時点で DAG をバックフィルすると、一度だけ実行され、スケジュールされたすべてのタスクに対して上記のエラーがスローされます。

4

1 に答える 1

4

気流インスタンスがデッドロック状態になっています。失敗したタスクは、今後のタスクの実行を許可していません。

Airflow は、各 DAG 実行の各タスクを新しいプロセスとして起動し、タスクが行き詰まり、これが処理されない場合、デッドロックの状況が発生します。

この状況を解決するには、次のいずれかを実行できます。

  1. useairflow clear <<dag_id>>これはデッドロックを解決し、DAG/タスクの将来の実行を可能にします
  2. 上記で問題が解決しない場合は、次の手順を実行する必要があります。use airflow resetdbこれにより、気流データベースがクリアされ、問題が解決されます。

将来は、

  • execution_timeout=timedelta(minutes=2)オペレーターを明示的に制御できるように、タイムアウトを設定してみてください
  • また、on_failure_callback=handle_failure障害時にオペレーターがクリーンに存在する を提供してください。

お役に立てれば、

乾杯!

于 2017-04-14T05:37:45.277 に答える