こんにちは地球の人々!Airflow を使用して、Spark タスクのスケジュールと実行を行っています。この時点で見つかったのは、Airflow で管理できる Python DAG だけです。
DAG の例:
spark_count_lines.py
import logging
from airflow import DAG
from airflow.operators import PythonOperator
from datetime import datetime
args = {
'owner': 'airflow'
, 'start_date': datetime(2016, 4, 17)
, 'provide_context': True
}
dag = DAG(
'spark_count_lines'
, start_date = datetime(2016, 4, 17)
, schedule_interval = '@hourly'
, default_args = args
)
def run_spark(**kwargs):
import pyspark
sc = pyspark.SparkContext()
df = sc.textFile('file:///opt/spark/current/examples/src/main/resources/people.txt')
logging.info('Number of lines in people.txt = {0}'.format(df.count()))
sc.stop()
t_main = PythonOperator(
task_id = 'call_spark'
, dag = dag
, python_callable = run_spark
)
問題は、私は Python コードが苦手で、いくつかのタスクを Java で記述していることです。私の質問は、Python DAG で Spark Java jar を実行する方法です。それとも、他の方法がありますか?spark submit を見つけました: http://spark.apache.org/docs/latest/submitting-applications.html
しかし、すべてを接続する方法がわかりません。たぶん、誰かが以前にそれを使用し、実際の例を持っています。お時間をいただきありがとうございます!