0

Spark の Web サイトで提供されているオンライン リソースを使用して、サンプル モデルの開発を練習していました。モデルを作成し、Spark-Shell を使用してサンプル データに対して実行することはできましたが、実際に運用環境でモデルを実行するにはどうすればよいでしょうか。Spark Jobサーバー経由ですか?

import org.apache.spark.mllib.classification.SVMWithSGD
import org.apache.spark.mllib.regression.LabeledPoint  
import org.apache.spark.mllib.linalg.Vectors

val data = sc.textFile("hdfs://mycluster/user/Cancer.csv")
val parsedData = data.map { line =>
  val parts = line.split(',')
  LabeledPoint(parts.last.toDouble,     Vectors.dense(parts.take(9).map(_.toDouble)))
}
var svm = new SVMWithSGD().setIntercept(true)
val model = svm.run(parsedData)
var predictedValue = model.predict(Vectors.dense(5,1,1,1,2,1,3,1,1))
println(predictedValue)

上記のコードは、spark-shell で実行すると完璧に動作しますが、本番環境でモデルを実際に実行する方法がわかりません。spark jobserver 経由で実行しようとしましたが、エラーが発生しました。

curl -d "input.string = 1, 2, 3, 4, 5, 6, 7, 8, 9" 'ptfhadoop01v:8090/jobs?appName=SQL&classPath=spark.jobserver.SparkPredict'

プログラムはそれがベクトル要素であることを期待しているのに対し、文字列値を渡しているためだと確信しています。誰かがこれを達成する方法を教えてくれますか? また、これはデータが本番環境でモデルに渡される方法ですか? それとも他の方法ですか?

4

1 に答える 1

2

Spark Job-server は、Spark ジョブのパイプラインを設計し、(オプションで) REST API を介して複数のジョブで SparkContext を使用する本番ユース ケースで使用されます。Sparkplugは Spark Job-server の代替であり、同様の構造を提供します。

ただし、実稼働環境で (単一の) Spark ジョブを実行する方法についての質問に答えるには、そうするためにサードパーティのライブラリは必要ありません。SparkContext オブジェクトを構築し、それを使用して Spark ジョブをトリガーするだけです。たとえば、コード スニペットの場合、必要なのは次のとおりです。

package runner

import org.apache.spark.mllib.classification.SVMWithSGD
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors

import com.typesafe.config.{ConfigFactory, Config}
import org.apache.spark.{SparkConf, SparkContext}
/**
 * 
 */
object SparkRunner {

  def main (args: Array[String]){

    val config: Config = ConfigFactory.load("app-default-config") /*Use a library to read a config file*/
    val sc: SparkContext = constructSparkContext(config)

    val data = sc.textFile("hdfs://mycluster/user/Cancer.csv")
    val parsedData = data.map { line =>
      val parts = line.split(',')
      LabeledPoint(parts.last.toDouble, Vectors.dense(parts.take(9).map(_.toDouble)))
    }
    var svm = new SVMWithSGD().setIntercept(true)
    val model = svm.run(parsedData)
    var predictedValue = model.predict(Vectors.dense(5,1,1,1,2,1,3,1,1))
    println(predictedValue)
  }


  def constructSparkContext(config: Config): SparkContext = {
    val conf = new SparkConf()
    conf
      .setMaster(config.getString("spark.master"))
      .setAppName(config.getString("app.name"))
    /*Set more configuration values here*/

    new SparkContext(conf)
  }


}

必要に応じて、Spark ライブラリ自体で提供されるspark-submit スクリプトSparkSubmitのラッパーを使用することもできます。

于 2016-07-19T11:40:00.543 に答える