3

Spark 2.0.x では、JobProgressListener実装を使用して、クラスターからジョブ/ステージ/タスクの進行状況情報をリアルタイムで取得しています。イベント フローがどのように機能するかを理解しており、作業に関する最新情報を正常に受け取ることができます。

私の問題は、同じ Spark コンテキストで同時に複数の異なる送信を実行していることです。各送信に属するジョブ/ステージ/タスクを区別することは一見不可能です。各ジョブ/ステージ/タスクは一意の ID を受け取ります。これは素晴らしいことです。ただし、受信した JobProgressListener イベント オブジェクトと一緒に返される送信 "id" または "name" を提供する方法を探しています。

「ジョブ グループ」は Spark コンテキストで設定できることは認識していますが、複数のジョブが同じコンテキストで同時に実行されていると、それらがスクランブルされてしまいます。

単一の SQLContext のリスナー イベントで返されるカスタム プロパティを忍び込ませる方法はありますか? そうすることで、後続の Stage および Task イベントをリンクして、必要なものを取得できるはずです。

注意: 私はこれらのジョブに spark-submit を使用していません。これらは、SparkSession/SQLContext への Java 参照を使用して実行されています。

解決策やアイデアをありがとう。

4

1 に答える 1

0

ローカル プロパティを使用しています。これは、onStageSubmit イベント中にリスナーからアクセスできます。その後、その段階で実行されたタスクを識別するために、対応する stageId を使用しています。

Future({
      sc.setLocalProperty("job-context", "second")
      val listener = new MetricListener("second")
      sc.addSparkListener(listener)
      //do some spark actions
      val df = spark.read.load("...")
      val countResult = df.filter(....).count()
      println(listener.rows)
      sc.removeSparkListener(listener)
    })

class MetricListener(name:String) extends SparkListener{

  var rows: Long = 0L
  var stageId = -1
  
  override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
    if (stageSubmitted.properties.getProperty("job-context") == name){
      stageId = stageSubmitted.stageInfo.stageId
    }
  }

  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
    if (taskEnd.stageId == stageId)
      rows = rows + taskEnd.taskMetrics.inputMetrics.recordsRead
  }
  
}
于 2021-06-15T08:09:52.157 に答える