私は Airbnb の airflow を使い始めたばかりで、バックフィルがいつ、どのように行われるのかまだはっきりしていません。
具体的には、私を混乱させる2つのユースケースがあります:
数分間実行
airflow scheduler
し、1 分間停止してから再起動すると、DAG は最初の 30 秒ほど余分なタスクを実行しているように見えますが、その後は通常どおり (10 秒ごとに実行) 続行します。これらの余分なタスクは、以前の実行で完了できなかった「埋め戻された」タスクですか? もしそうなら、どうすれば気流にそれらのタスクをバックフィルしないように指示できますか?airflow scheduler
数分間実行してから実行してからairflow clear MY_tutorial
再起動airflow scheduler
すると、大量の余分なタスクが実行されるようです。これらのタスクも何らかの形で「埋め戻された」タスクですか? それとも私は何かを逃していますか。
現在、私は非常に単純なダグを持っています:
default_args = {
'owner': 'me',
'depends_on_past': False,
'start_date': datetime(2016, 10, 4),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG(
'MY_tutorial', default_args=default_args, schedule_interval=timedelta(seconds=10))
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 8)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
second_template = """
touch ~/airflow/logs/test
echo $(date) >> ~/airflow/logs/test
"""
t4 = BashOperator(
task_id='write_test',
bash_command=second_template,
dag=dag)
t1.set_upstream(t4)
t2.set_upstream(t1)
t3.set_upstream(t1)
気流構成で変更したのは2つだけです
- sqlite db の使用から postgres db の使用に変更しました
CeleryExecutor
a の代わりに aを使用していますSequentialExecutor
助けてくれてどうもありがとう!