「これらの python ファイルを実行したい (Python オペレーターを介した Python 関数ではない)」と尋ねていることは承知しています。しかし、これはおそらくAirflowの使用があなたよりも効果的ではないと思います. 以前に書かれた回答にも混乱が見られるので、これがあなたが望んでいた方法であり、タスクを実行するために私が推奨する方法です。
仮定:
dags/
my_dag_for_task_1_and_2.py
tasks/
file1.py
file2.py
を避けるためのあなたの要求PythonOperator
:
# my_dag_for_task_1_and_2.py
import datetime as dt
from airflow import DAG
from airflow.operators import BashOperator
with DAG(
'my_dag_for_task_1_and_2',
default_args={
'owner': 'me',
'start_date': datetime(…),
…,
},
schedule_interval='8 * * * *',
) as dag:
task_1 = BashOperator(
task_id='task_1',
bash_command='/path/to/python /path/to/dags/tasks/file1.py',
)
task_2 = BashOperator(
task_id='task_2',
bash_command='/path/to/python /path/to/dags/tasks/file2.py',
)
task_1 >> task_2
Airflow 用にゼロから Python を作成したわけではありませんが、以下を使用しPythonOperator
ます。
# my_dag_for_task_1_and_2.py
import datetime as dt
from airflow import DAG
from airflow.operators import PythonOperator
import tasks.file1
import tasks.file2
with DAG(
'my_dag_for_task_1_and_2',
default_args={
'owner': 'me',
'start_date': datetime(…),
…,
},
schedule_interval='8 * * * *',
) as dag:
task_1 = PythonOperator(
task_id='task_1',
python_callable=file1.function_in_file1,
)
task_2 = PythonOperator(
task_id='task_2',
python_callable=file2.function_in_file2, # maybe main?
)
task_1 >> task_2