16

Cron の代わりに Airflow を使用してみます。しかし、schedule_interval は期待どおりに機能しません。

以下のようなPythonコードを書きました。
私の理解では、Airflow は「2016/03/30 8:15:00」に実行されるべきでしたが、その時点では機能しませんでした。

「'schedule_interval': timedelta(minutes = 5)」のように変更すると、正しく動作したと思います。

「notice_slack.sh」は、自分のチャンネルに slack api を呼び出すためのものです。

# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals
import os
from airflow.operators import BashOperator
from airflow.models import DAG
from datetime import datetime, timedelta

args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2016, 3, 29, 8, 15),
}

dag = DAG(
    dag_id='notice_slack',
    default_args=args,
    schedule_interval="@daily",
    dagrun_timeout=timedelta(minutes=1))

# cmd file name
CMD = '/tmp/notice_slack.sh'

run_this = BashOperator(
    task_id='run_transport', bash_command=CMD, dag=dag)

この cron 設定のように、毎日特定の時間にいくつかのスクリプトを実行したいと考えています。

15 08 * * * bash /tmp/notice_slack.sh

Scheduling & Triggersというドキュメントを読んだことがありますが、cron とは少し異なります。
そこで、「start_date」と「schedule_interval」の設定で並べてみます。

誰が私が何をすべきか知っていますか?

気流バージョン

INFO - エグゼキュータ LocalExecutor の使用

v1.7.0

amazon-linux-ami/2015.09-リリースノート

4

5 に答える 5

17

これを試して:

# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals
import os
from airflow.operators import BashOperator
from airflow.models import DAG
from datetime import datetime, timedelta

args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2016, 3, 29),
}

dag = DAG(
    dag_id='notice_slack',
    default_args=args,
    schedule_interval="15 08 * * *",
    dagrun_timeout=timedelta(minutes=1))

# cmd file name
CMD = 'bash /tmp/notice_slack.sh'

run_this = BashOperator(
    task_id='run_transport', bash_command=CMD, dag=dag)

start_date(datetime) – タスクの start_date は、最初のタスク インスタンスの execution_date を決定します。ベスト プラクティスは、start_date を DAG の schedule_interval に丸めることです。

schedule_interval(datetime.timedelta または dateutil.relativedelta.relativedelta または cron 式として機能する str) – DAG の実行頻度を定義します。この timedelta オブジェクトは、最新のタスク インスタンスの execution_date に追加され、次のスケジュールを把握します。

cron 設定でschedule_intervalandを同じように構成するだけで問題ありません。bash_command

于 2016-04-07T02:15:05.757 に答える
15

2016/03/30 8:15:00 + スケジュール間隔 (毎日) が経過すると、Airflow は DAG を開始します。したがって、DAG は 2016/03/31 8:15:00 に実行されます。

Airflow FAQを確認できます

于 2016-06-28T01:26:36.157 に答える
9

まず、開始日は過去の日付にする必要があります -'start_date': datetime(2016, 3, 29, 8, 15) 試しますかの代わりに'start_date': datetime(2016, 2, 29, 8, 15)

'catchup':False を適用してバックフィルを防ぎます - これがあなたがやりたいことでない限り。

Airflow のドキュメントから - Airflow スケジューラは、start_date + schedule_interval が経過した直後にタスクをトリガーします。

スケジュール間隔は cron として指定できます。毎日午前 8 時 15 分に実行する場合、式は - * '15 8 * * 'になります。

10 月 31 日の午前 8 時 15 分にのみ実行する場合、式は - * '15 8 31 10 'になります。

これを提供するに'schedule_inteval':'15 8 * * *'は、Dag プロパティで

これについては、 https://crontab.guru/で詳しく知ることができます。

または、Airflow プリセットがあります - ここに画像の説明を入力

これらのいずれかが要件を満たしている場合は、単純に次のようになります。'schedule_interval':'@hourly'

最後に、スケジュールを python timedelta オブジェクトとして適用することもできます (例: 午後 12 時)。

'schedule_interval': timedelta(hours=12)

于 2020-10-22T18:15:49.700 に答える
6

エアフロー cron 式の作成方法がよくわからない場合は、crontab.guruを使用してみてください。

于 2019-08-04T16:19:54.963 に答える