0

EMR Spark クラスター (YARN を使用) を実行しており、EMR マスターから直接 Luigi タスクを実行しています。S3 のデータに依存する一連のジョブがあり、いくつかの SparkSubmitTasks の後、最終的に Redshift になります。

import luigi
import luigi.format
from luigi.contrib.spark import SparkSubmitTask
from luigi.contrib.redshift import RedshiftTarget


class SomeSparkTask(SparkSubmitTask):

    # Stored in /etc/luigi/client.cfg
    host = luigi.Parameter(default='host')
    database = luigi.Parameter(default='database')
    user = luigi.Parameter(default='user')
    password = luigi.Parameter(default='password')
    table = luigi.Parameter(default='table')

    <add-more-params-here>

    app = '<app-jar>.jar'
    entry_class = '<path-to-class>'

    def app_options(self):
        return <list-of-options>

    def output(self):
        return RedshiftTarget(host=self.host, database=self.database, user=self.user, password=self.password,
                              table=self.table, update_id=<some-unique-identifier>)

    def requires(self):
        return AnotherSparkSubmitTask(<params>)

私は2つの主な問題に直面しています:

1) ときどき、luigi は SparkSubmitTask がいつ完了したかを判断できないことがあります。ジョブが完了したと判断できません。

2) 何らかの理由で SparkSubmitTasks を実行でき、上に配置したタスクが Spark ジョブを終了した場合、出力タスクは実行されず、マーカー テーブルは作成も入力もされません。ただし、実際のテーブルは、実行される Spark ジョブで作成されます。RedshiftTarget の呼び出し方法を誤解していますか?

それまでの間、私はソースコードに慣れようとしています。

ありがとう!

4

1 に答える 1