0

私たちは最近、Airflow を「データ ワークフロー」エンジンとして採用しようとしました。ほとんどのことは理解できましたが、DAG をトリガーするタイミングをスケジューラがどのように計算するかについては、まだ灰色の領域にいます。

この単純なダグを見てください:

from airflow import DAG
from datetime import datetime
from airflow.operators.bash_operator import BashOperator

dag_options = {                
            'owner':                'Airflow',  
            'depends_on_past':      False,      
            'start_date':           datetime.now()
}

with DAG('test_dag1', schedule_interval="5 * * * *", default_args=dag_options) as dag:
                task1 = BashOperator(      
                task_id='task1', 
                bash_command='date',                
                dag=dag)      

スケジュールはこれを取得しますが、実行しません。ここで、「start_date」を次のように変更すると:

datetime(year=xxxx,month=yyyy=day=zzzz) 

xxxx、yyyy、zzzz は今日の日付で、実行が開始されます。この原因は、スケジューラがソース DAG フォルダーからこの DAG を再読み取りし続け、毎回 datetime.now() を実行し、開始日が現在キューに入れられている日付と異なることに気付き、この DAG を再追加して再スケジュール/実行日を前倒ししますか (私のdag_dir_list_intervalは 300 です)?

また、気流では、私が理解しているように、dag の一時停止が解除される (または dags_are_paused_at_creation = False で追加される) と、スケジューラーは次のように実行をスケジュールします。

  • 1 日目の実行: 直後 (開始日 + 間隔)
  • 2 日目の実行: (開始日 + (間隔 * 2)) の直後
  • 3 日目の実行: (開始日 + (間隔 * 3)) の直後

これは正しい仮定ですか?

更新 (2017 年 7 月 30 日)

上記の仮定に基づいて、今日 (07/30/2017) にこの DAG を作成しました。

from airflow import DAG
from datetime import datetime
from airflow.operators.bash_operator import BashOperator

dag_options = {                
            'owner':             'Airflow',  
            'depends_on_past':   False,      
            'start_date':   
datetime(year=2017,month=7,day=30,hour=20,minute=10)
}

with DAG('test_dag_100', schedule_interval="*/10 * * * *", 
default_args=dag_options) as dag:
                task1 = BashOperator(      
                task_id='task_100', 
                bash_command='date',                
                dag=dag)      

(UTC)に開始する必要があります:

  • 2017/7/30 20:20:00
  • 2017/7/30 20:30:00
  • 2017/7/30 20:40:00

残念ながら、これは起こっていません。これが私のダッシュボードのスクリーンショットです。

20:21:00 に DAG が実行されなかった理由を誰か説明できますか? 20:31:00 以降、まだ実行されませんでした...ここで何が欠けていますか?

ちなみに、何らかの理由で、ダッシュボードから手動で DAG を開始するたびに、DAG が「実行中」の段階にあることに気付きました。どうしてこれなの?手動で開始することは、開始タイミング オプション (start_date/interval/etc) と関係がありますか??

あなたが提供できる説明をありがとう

4

1 に答える 1

2

あなたの仮定は正しいです。Airflow は、開始日から指定されたスケジュール間隔が経過した後に、最初の DAG 実行をスケジュールします。datetime.now() を開始日として使用すると、Airflow で DAG がトリガーされることはほとんどありません。スケジューリングドキュメントに記載されています。

「5 * * * *」のスケジュール間隔で datetime(2017,7,27,1,0) のように特定の開始日を指定した場合、7/27 の午前 1:05 に DAG は初めて実行するようにトリガーされます。その後も 5 分ごとに実行され続けます。

于 2017-07-27T15:43:16.353 に答える