11

タスクまたはジョブが完了した直後にコンソール (Spark Shell または Spark 送信ジョブ) でこれらのメトリクスを収集するにはどうすればよいですか?

Spark を使用して Mysql から Cassandra にデータをロードしていますが、非常に巨大です (例: ~200 GB と 600M 行)。タスクが完了したら、spark が処理した行数を正確に確認したいと思います。Spark UI から番号を取得できますが、spark シェルまたは spark-submit ジョブからその番号 ("Output Records Written") を取得するにはどうすればよいでしょうか。

Mysql から Cassandra にロードするサンプル コマンド。

val pt = sqlcontext.read.format("jdbc").option("url", "jdbc:mysql://...:3306/...").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "payment_types").option("user", "hadoop").option("password", "...").load()

pt.save("org.apache.spark.sql.cassandra",SaveMode.Overwrite,options = Map( "table" -> "payment_types", "keyspace" -> "test"))

上記のタスクのすべての Spark UI メトリックを主に出力サイズと書き込みレコードを取得したいと考えています。

助けてください。

御時間ありがとうございます!

4

1 に答える 1

9

答えを見つけました。SparkListener を使用して統計を取得できます。

ジョブに入力メトリックまたは出力メトリックがない場合、None.get 例外が発生する可能性がありますが、if stmt を指定することで安全に無視できます。

sc.addSparkListener(new SparkListener() {
  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
    val metrics = taskEnd.taskMetrics
    if(metrics.inputMetrics != None){
      inputRecords += metrics.inputMetrics.get.recordsRead}
    if(metrics.outputMetrics != None){
      outputWritten += metrics.outputMetrics.get.recordsWritten }
  }
})

以下の例を見つけてください。

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import com.datastax.spark.connector._
import org.apache.spark.sql._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}

val conf = new SparkConf()
.set("spark.cassandra.connection.host", "...")
.set("spark.driver.allowMultipleContexts","true")
.set("spark.master","spark://....:7077")
.set("spark.driver.memory","1g")
.set("spark.executor.memory","10g")
.set("spark.shuffle.spill","true")
.set("spark.shuffle.memoryFraction","0.2")
.setAppName("CassandraTest")
sc.stop
val sc = new SparkContext(conf)
val sqlcontext = new org.apache.spark.sql.SQLContext(sc)

var outputWritten = 0L

sc.addSparkListener(new SparkListener() {
  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
    val metrics = taskEnd.taskMetrics
    if(metrics.inputMetrics != None){
      inputRecords += metrics.inputMetrics.get.recordsRead}
    if(metrics.outputMetrics != None){
      outputWritten += metrics.outputMetrics.get.recordsWritten }
  }
})

val bp = sqlcontext.read.format("jdbc").option("url", "jdbc:mysql://...:3306/...").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "bucks_payments").option("partitionColumn","id").option("lowerBound","1").option("upperBound","14596").option("numPartitions","10").option("fetchSize","100000").option("user", "hadoop").option("password", "...").load()
bp.save("org.apache.spark.sql.cassandra",SaveMode.Overwrite,options = Map( "table" -> "bucks_payments", "keyspace" -> "test"))

println("outputWritten",outputWritten)

結果:

scala> println("outputWritten",outputWritten)
(outputWritten,16383)
于 2016-04-27T22:20:56.240 に答える