1

気流とcronの実行は非常に混乱していると言わざるを得ません。今から5分ごとにcronを開始したいだけです。複雑な DAG の作成は簡単です。ドキュメントに基づいてcronを実行するロジックを理解することはそうではありません。

以下のコードを実行するとどうなりますか? 実行される毎秒、以下を印刷します。

1 2016-08-26 17:17:01.584360
2 2016-08-26 17:17:08.124035
3 2016-08-26 17:17:15.293874
1 2016-08-26 17:17:24.100623
2 2016-08-26 17:17:31.637739
3 2016-08-26 17:17:37.919901
1 2016-08-26 17:17:45.255641
2 2016-08-26 17:17:52.859954
3 2016-08-26 17:17:59.048536
1 2016-08-26 17:18:06.175670
2 2016-08-26 17:18:12.759000
3 2016-08-26 17:18:20.112758
1 2016-08-26 17:18:26.909130
2 2016-08-26 17:18:34.396926
on and on....WOWEE

以下は私のコードです。

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators import PythonOperator
from airflow.operators import TriggerDagRunOperator
from airflow.operators import DummyOperator
from datetime import datetime, timedelta

def checkfornewdata():
    f = open('/tmp/first_text.log','a')
    f.write('1 %s\n' % datetime.now())
    f.close()
    return True

def fetchdata():
    f = open('/tmp/first_text.log','a')
    f.write('2 %s\n' % datetime.now())
    f.close()
    return True

def uploadtoes():
    f = open('/tmp/first_text.log','a')
    f.write('3 %s\n' % datetime.now())
    f.close()
    return True


mytime = datetime.combine(datetime.now()-timedelta(minutes=5),
                                  datetime.min.time())


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    "start_date": mytime,
    'email': ['test@gmail.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

# */5 * * * *
dag = DAG('first_test', schedule_interval="*/5 * * * *", default_args=default_args)

node_0 = PythonOperator(
    task_id='isnewdata',
    provide_context=False,
    python_callable=checkfornewdata,
    dag=dag)


node_0_1 = PythonOperator(
    task_id='fetchdata',
    provide_context=False,
    python_callable=fetchdata,
    dag=dag)

node_0_1_2 = PythonOperator(
    task_id='uploadtoes',
    provide_context=False,
    python_callable= uploadtoes,
    dag=dag)


node_0_1.set_upstream(node_0)
node_0_1_2.set_upstream(node_0_1)
4

1 に答える 1

1

少なくとも、"start_date": mytime非動的な時間を使用するように変更する必要があります。特にFAQ を参照してください

于 2017-01-20T12:03:35.943 に答える