41

こんにちは地球の人々!Airflow を使用して、Spark タスクのスケジュールと実行を行っています。この時点で見つかったのは、Airflow で管理できる Python DAG だけです。
DAG の例:

spark_count_lines.py
import logging

from airflow import DAG
from airflow.operators import PythonOperator

from datetime import datetime

args = {
  'owner': 'airflow'
  , 'start_date': datetime(2016, 4, 17)
  , 'provide_context': True
}

dag = DAG(
  'spark_count_lines'
  , start_date = datetime(2016, 4, 17)
  , schedule_interval = '@hourly'
  , default_args = args
)

def run_spark(**kwargs):
  import pyspark
  sc = pyspark.SparkContext()
  df = sc.textFile('file:///opt/spark/current/examples/src/main/resources/people.txt')
  logging.info('Number of lines in people.txt = {0}'.format(df.count()))
  sc.stop()

t_main = PythonOperator(
  task_id = 'call_spark'
  , dag = dag
  , python_callable = run_spark
)

問題は、私は Python コードが苦手で、いくつかのタスクを Java で記述していることです。私の質問は、Python DAG で Spark Java jar を実行する方法です。それとも、他の方法がありますか?spark submit を見つけました: http://spark.apache.org/docs/latest/submitting-applications.html
しかし、すべてを接続する方法がわかりません。たぶん、誰かが以前にそれを使用し、実際の例を持っています。お時間をいただきありがとうございます!

4

3 に答える 3

24

使えるはずですBashOperator。残りのコードはそのままにして、必要なクラスとシステム パッケージをインポートします。

from airflow.operators.bash_operator import BashOperator

import os
import sys

必要なパスを設定します。

os.environ['SPARK_HOME'] = '/path/to/spark/root'
sys.path.append(os.path.join(os.environ['SPARK_HOME'], 'bin'))

そして演算子を追加します:

spark_task = BashOperator(
    task_id='spark_java',
    bash_command='spark-submit --class {{ params.class }} {{ params.jar }}',
    params={'class': 'MainClassName', 'jar': '/path/to/your.jar'},
    dag=dag
)

これを簡単に拡張して、Jinja テンプレートを使用して追加の引数を提供できます。

bash_commandもちろん、ケースに適したテンプレートに置き換えることで、Spark 以外のシナリオでこれを調整できます。次に例を示します。

bash_command = 'java -jar {{ params.jar }}'

と調整params

于 2016-10-03T13:41:08.777 に答える
22

バージョン 1.8 (本日リリース) の Airflow には、

SparkSQLHook コード - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_sql_hook.py

SparkSubmitHook コード - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_submit_hook.py

これら 2 つの新しい Spark オペレーター/フックは、1.8 バージョンの時点で「contrib」ブランチにあるため、(十分に) 文書化されていないことに注意してください。

したがって、SparkSubmitOperator を使用して、Spark 実行用の Java コードを送信できます。

于 2017-03-20T17:06:01.660 に答える