25

Airflow を使用して単純なタスク python を実行しようとしています。

from __future__ import print_function
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta


from pprint import pprint

seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
                                  datetime.min.time())

args = {
    'owner': 'airflow',
    'start_date': seven_days_ago,
}

dag = DAG(dag_id='python_test', default_args=args)


def print_context(ds, **kwargs):
    pprint(kwargs)
    print(ds)
    return 'Whatever you return gets printed in the logs'

run_this = PythonOperator(
    task_id='print',
    provide_context=True,
    python_callable=print_context,
    dag=dag)

たとえば、試してみると:

気流テスト python_test print 2015-01-01

できます!

def print_context(ds, **kwargs)今、私は自分の関数を他のpythonファイルに入れたいと思っています。だから私はという別のファイルを作成します: simple_test.py と変更:

run_this = PythonOperator(
    task_id='print',
    provide_context=True,
    python_callable=simple_test.print_context,
    dag=dag)

今、私は再び実行しようとします:

気流テスト python_test print 2015-01-01

そしてOK!それはまだ動作します!

ただし、ファイルを使用してワーカー モジュールなどのモジュールを作成する場合は、それをSimplePython.pyインポート ( from worker import SimplePython) して試してください。

気流テスト python_test print 2015-01-01

次のメッセージが表示されます。

ImportError: ワーカーという名前のモジュールがありません

質問:

  1. DAG 定義内にモジュールをインポートすることはできますか?
  2. Airflow+Celery は、必要なすべての Python ソース ファイルをワーカー ノード間でどのように配布しますか?
4

4 に答える 4

2

2 番目の質問: Airflow+Celery は、必要なすべての python ソース ファイルをワーカー ノード間でどのように配布しますか?

ドキュメントから : ワーカーはその DAGS_FOLDER にアクセスできる必要があり、独自の方法でファイル システムを同期する必要があります。一般的なセットアップは、DAGS_FOLDER を Git リポジトリに保存し、Chef、Puppet、Ansible、または環境内のマシンの構成に使用するものを使用してマシン間で同期することです。すべてのボックスに共通のマウント ポイントがある場合、そこでパイプライン ファイルを共有することも同様に機能するはずです。

http://pythonhosted.org/airflow/installation.html?highlight=chef

于 2016-03-31T07:42:30.633 に答える
1

最初の質問については、可能です。

__init__.pyそして、同じディレクトリの下に名前が付けられた空のファイルを作成する必要があると思いますSimplePython.py(あなたの場合はworkerディレクトリです)。そのworkerディレクトリを実行すると、Python モジュールと見なされます。

次に、DAG 定義で を試してくださいfrom worker.SimplePython import print_context

あなたの場合、カスタマイズした機能を削除せずに airflow コア プロジェクトをアップグレードしたい場合があるため、airflow 用のプラグインを作成した方がよいと思います。

于 2015-11-24T21:16:06.140 に答える