問題タブ [airflow]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
python - 気流がすべての依存タスクを UI から一度に実行できない
私の DAG には 3 つのタスクがあり、UI から個々のタスクをトリガーする必要があるため、Celery executor を使用しています。UI から個々のタスクを実行できます。
現在直面している問題は、タスクの依存関係を設定したにもかかわらず、UI から DAG のすべてのステップを一度に実行できないことです。
コマンド ラインから完全な DAG を実行できますが、UI を介して同じことを実行する方法はありますか?
airflow - airflow: 1.7: DAG が GUI で更新されない
test1.py
ディレクトリの下に定義されたdagファイルがあります/opt/airflow/dags
。最初は、ファイルには 1 つのタスクしかありませんtest1_task01
。私の GUI では、dag は 1 つのタスクだけで正しく表示されます。ただし、次のような別のタスクを DAG に追加すると、次のようになります。
更新された DAG が GUI に表示されません。GUI にはまだ古い DAG があります。airflow list_tasks <dagname>
は正しい DAG 関係を示し、airflow scheduler
最新バージョンを実行しますが、GUI は何らかの形で最新情報を取得していません。
GUI を介して DAG の更新されたビューを取得するにはどうすればよいですか?
java - Airflow で Spark コードを実行するには?
こんにちは地球の人々!Airflow を使用して、Spark タスクのスケジュールと実行を行っています。この時点で見つかったのは、Airflow で管理できる Python DAG だけです。
DAG の例:
問題は、私は Python コードが苦手で、いくつかのタスクを Java で記述していることです。私の質問は、Python DAG で Spark Java jar を実行する方法です。それとも、他の方法がありますか?spark submit を見つけました: http://spark.apache.org/docs/latest/submitting-applications.html
しかし、すべてを接続する方法がわかりません。たぶん、誰かが以前にそれを使用し、実際の例を持っています。お時間をいただきありがとうございます!
python - BaseOperator.xcom_pull のコンテキスト引数は何ですか
私は API ドキュメントを読んでいましたが、 BaseOperator.xcom_pullのcontext引数が何であるかが不明でした。
dag.default_argsになると思ったのですが、受け取りましたKeyError: 'ti'
こちらの push() の例に従って、前のタスク内で xcom_push を実行しました。
airflow - Airflow で動的に作成されたタスク間の依存関係を作成する方法
以下のコードを使用して動的タスクを作成しています。これらの動的に作成されたタスクへの依存関係を作成したいと考えています。たとえば、runStep_0 は runStep_1 などに依存する必要があります。
python - Airflow が「LocalExecutor」で同時 DAG をトリガーしない
気流 1.7.1.3 を使用しています。
同時実行 DAG / タスクに問題があります。DAG が実行されている場合、スケジューラは他の DAG を起動しません。スケジューラは完全にフリーズしているようです (ログはもうありません) ... 実行中の DAG が終了するまで。次に、新しい DAGrun がトリガーされます。私のさまざまなタスクは長時間実行される ECS タスクです (~10 分)
私は使用LocalExecutor
し、 と についてデフォルトの設定をparallelism=32
しdag_concurrency=16
ました。airflow scheduler -n 20
自動的に使用して再起動し'depends_on_past': False
、すべての DAG 宣言を設定します。
参考までに、ECS クラスターで実行されているコンテナーにエアフローをデプロイしました。max_threads = 2
利用可能なCPUは2つしかありません。
何か案は ?ありがとう
airflow - 気流の埋め戻しの明確化
私は Airbnb の airflow を使い始めたばかりで、バックフィルがいつ、どのように行われるのかまだはっきりしていません。
具体的には、私を混乱させる2つのユースケースがあります:
数分間実行
airflow scheduler
し、1 分間停止してから再起動すると、DAG は最初の 30 秒ほど余分なタスクを実行しているように見えますが、その後は通常どおり (10 秒ごとに実行) 続行します。これらの余分なタスクは、以前の実行で完了できなかった「埋め戻された」タスクですか? もしそうなら、どうすれば気流にそれらのタスクをバックフィルしないように指示できますか?airflow scheduler
数分間実行してから実行してからairflow clear MY_tutorial
再起動airflow scheduler
すると、大量の余分なタスクが実行されるようです。これらのタスクも何らかの形で「埋め戻された」タスクですか? それとも私は何かを逃していますか。
現在、私は非常に単純なダグを持っています:
気流構成で変更したのは2つだけです
- sqlite db の使用から postgres db の使用に変更しました
CeleryExecutor
a の代わりに aを使用していますSequentialExecutor
助けてくれてどうもありがとう!
airflow - SlackAPIPostOperator で「ds」を取得するにはどうすればよいですか?
datetime パラメータを必要とする python スクリプトを実行し、その出力を slack に投稿したいと考えています。しかし、気流テンプレート変数を取得する方法がわかりませんds
。
たとえば、以下のコードがあるとします。
で実行するairflow backfill dag_id -s 2016-10-01
ので、ds
(ここでは 2016-10-01) はスラック テキストに渡す必要があります。
Pythonスクリプトの出力をファイルに書き込んでから、それを読み取ってスラックテキストに直接渡します。しかし、それは完璧な解決策ではないと思います。