3

CSV ファイル (1 行のデータ) を読み取り、事前に構築されたランダム フォレスト モデル オブジェクトを使用して予測を行う単純なスパーク ジョブを実装しようとしています。このジョブには、データの前処理やデータ操作は含まれていません。

アプリケーションをローカルで実行して、スタンドアロン モードで spark を実行しています。構成は次のとおりです。 RAM: 8GB メモリ: 40GB コア数: 2 Spark バージョン: 1.5.2 Scala バージョン: 2.10.5 入力ファイル サイズ: 1KB (1 行のデータ) モデル ファイル サイズ: 1,595 KB (400 ツリー)ランダムフォレスト)

現在、spark-submit での実装には約 13 秒かかります。ただし、実行時間はこのアプリケーションにとって大きな懸念事項であるため、

  1. 実行時間を 1 秒または 2 秒に短縮するためにコードを最適化する方法はありますか? (優先度高)

  2. 起動とコンテキストの設定に約 5 ~ 6 秒かかるのに対し、実際のコードの実行には約 7 ~ 8 秒かかることに気付きました。

アプリケーションコードはこちら

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object RF_model_App {
  def main(args: Array[String]) {

val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature4.{RandomForestfeature4Model, RandomForestClassifier}
import org.apache.spark.ml.evaluation.Multiclassfeature4Evaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
import org.apache.spark.sql.functions.udf
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.feature.StringIndexer
import sqlContext.implicits._
val Test = sqlContext.read.format("com.databricks.spark.csv").option("header","true").load("/home/ubuntu/Test.csv")
Test.registerTempTable("Test")
val model_L1 = sc.objectFile[RandomForestfeature4Model]("/home/ubuntu/RF_L1.model").first()

val toInt = udf[Int, String]( _.toInt)
val toDouble = udf[Double, String]( _.toDouble)
val featureDf = Test.withColumn("id1", toInt(Test("id1")))  .withColumn("id2", toInt(Test("id2")))  .withColumn("id3", toInt(Test("id3")))  .withColumn("id4", toInt(Test("id4")))   .withColumn("feature3", toInt(Test("feature3")))   .withColumn("feature9", toInt(Test("feature9")))    .withColumn("feature10", toInt(Test("feature10")))  .withColumn("feature12", toInt(Test("feature12")))  .withColumn("feature14", toDouble(Test("feature14")))   .withColumn("feature15", toDouble(Test("feature15")))   .withColumn("feature16", toInt(Test("feature16")))  .withColumn("feature17", toDouble(Test("feature17")))   .withColumn("feature18", toInt(Test("feature18")))

val feature4_index = new StringIndexer()  .setInputCol("feature4")  .setOutputCol("feature4_index")
val feature6_index = new StringIndexer()  .setInputCol("feature6")  .setOutputCol("feature6_index")
val feature11_index = new StringIndexer()  .setInputCol("feature11")  .setOutputCol("feature11_index")
val feature8_index = new StringIndexer()  .setInputCol("feature8")  .setOutputCol("feature8_index")
val feature13_index = new StringIndexer()  .setInputCol("feature13")  .setOutputCol("feature13_index")
val feature2_index = new StringIndexer()  .setInputCol("feature2")  .setOutputCol("feature2_index")
val feature5_index = new StringIndexer()  .setInputCol("feature5")  .setOutputCol("feature5_index")
val feature7_index = new StringIndexer()  .setInputCol("feature7")  .setOutputCol("feature7_index")
val vectorizer_L1 =  new VectorAssembler()  .setInputCols(Array("feature3",  "feature2_index", "feature6_index", "feature4_index", "feature8_index", "feature7_index", "feature5_index", "feature10", "feature9", "feature12", "feature11_index", "feature13_index", "feature14", "feature15", "feature18", "feature17", "feature16")).setOutputCol("features_L1")
val feature_pipeline_L1 = new Pipeline()  .setStages(Array( feature4_index, feature6_index, feature11_index,feature8_index, feature13_index,  feature2_index, feature5_index, feature7_index,vectorizer_L1))
val testPredict= feature_pipeline_L1.fit(featureDf).transform(featureDf)
val getPOne = udf((v: org.apache.spark.mllib.linalg.Vector) => v(1))
val getid2 = udf((v: Int) => v)
val L1_output = model_L1.transform(testPredict).select(getid2($"id2") as "id2",getid2($"prediction") as "L1_prediction",getPOne($"probability") as "probability")

L1_output.repartition(1).write.format("com.databricks.spark.csv").option("header", "true").mode("overwrite").save("/home/L1_output")

  }
};
4

1 に答える 1

1

単に間違っていることから始めましょう:

  • 使用する機能メカニズムが正しくありません。StringIndexerデータの分布に基づいてインデックスを割り当てるため、同じレコードが他のレコードに応じて異なるエンコードになります。StringIndexerModelトレーニング、テスト、予測には同じ (-s) を使用する必要があります。
  • val getid2 = udf((v: Int) => v)高価なアイデンティティです。

持続的にSparkContext

job-serverまたはを含む永続的なコンテキストを保持する複数のツールがありますLivy

最後に、単純に Spark Streaming を使用して、データをそのまま処理することができます。

シャッフリング

単一の作成にも使用repartitionしているため、1つのファイルのCSVを想定しています。このアクションは非常にコストがかかりますが、定義上、RDD 内のデータをランダムに再シャッフルして、作成するパーティションを増やしたり減らしたりして、パーティション間でバランスを取ります。これにより、ネットワーク上のすべてのデータが常にシャッフルされます。

その他の考慮事項:

待ち時間が重要で、パフォーマンスの低い単一のマシンのみを使用する場合は、Spark をまったく使用しないでください。ここで得るものは何もありません。このような場合、優れた地元の図書館はより良い仕事をすることができます.

:

お客様のデータやハードウェアにはアクセスしないため、時間を 7 秒に短縮するなどの要件はまったく意味がありません。

于 2016-02-26T15:00:26.627 に答える