私はAirflowの初心者です。この記事https://stlong0521.github.io/20161023%20-%20Airflow.htmlを参照して、Celery Executor を使用して Airflow の分散モードをセットアップしようとしています。
仕様について詳しく説明する前に、PostgreSQL を別のインスタンスにインストールしたことを確認したいと思います。
セットアップの仕様は次のとおりです。
Airflow コア/サーバー コンピューター
- パイソン3.5
- エアフロー (AIRFLOW_HOME = ~/airflow)
- セロリ
- psycogp2
- RabbitMQ
airflow.cfg で行われた構成:
sql_alchemy_conn = postgresql+psycopg2://username:password@192.168.2.12:5432/airflow
executor = CeleryExecutor
broker_url = amqp://username:password@192.168.1.12:5672//
celery_result_backend = db+postgresql://username:password@192.168.2.12:5432/airflow
実行されたテスト:
RabbitMQ is running
Can connect to PostgreSQL and have confirmed that Airflow has created tables
Can start and view the webserver (including custom dags)
Airflow ワーカー コンピューター
以下がインストールされています:
- Python 3.5 と
- エアフロー (AIRFLOW_HOME = ~/airflow)
- セロリ
- psycogp2
airflow.cfg で行われた構成は、サーバーとまったく同じです。
sql_alchemy_conn = postgresql+psycopg2://username:password@192.168.2.12:5432/airflow
executor = CeleryExecutor
broker_url = amqp://username:password@192.168.1.12:5672//
celery_result_backend = db+postgresql://username:password@192.168.2.12:5432/airflow
ワーカー マシンで実行されたコマンドからの出力:
エアフローフラワーを実行する場合:
[2018-02-19 14:58:14,276] {__init__.py:57} INFO - Using executor CeleryExecutor
[2018-02-19 14:58:14,360] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python3.5/lib2to3/Grammar.txt
[2018-02-19 14:58:14,384] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python3.5/lib2to3/PatternGrammar.txt
[I 180219 14:58:15 command:139] Visit me at http://0.0.0.0:5555
[I 180219 14:58:15 command:144] Broker: amqp://username:password@192.168.1.12:5672//
[I 180219 14:58:15 command:147] Registered tasks:
['celery.accumulate',
'celery.backend_cleanup',
'celery.chain',
'celery.chord',
'celery.chord_unlock',
'celery.chunks',
'celery.group',
'celery.map',
'celery.starmap']
[I 180219 14:58:15 mixins:224] Connected to amqp://username:password@192.168.1.12:5672//
Airflow Core マシンで DAG を渡しています。また、DAG が処理するサンプル データ (Excel シート) を同じコア マシンにコピーしました。
私のワーカーログ
raise CalledProcessError(retcode, cmd)
subprocess.CalledProcessError: Command 'airflow run dag_name_x task_name_xx 2018-02-19T10:15:41.657243 --local -sd /home/Distributedici/airflow/dags/sample_data_xx.py' returned non-zero exit status 1
今私のクエリは
1) DAG フォルダもワーカー コンピュータにコピーする必要がありますか?
2) 現在、ワーカー コンピューターに dag フォルダーをコピーしていないため、ワーカー プロセスがタスクを取得するのを確認できません。
私が間違いを犯している場所と、ワーカープロセスがプロセスを取得する方法を教えてください。