DStream を介して Kafka から到着するデータがあります。いくつかのキーワードを取得するために特徴抽出を実行したいと考えています。
すべてのデータが到着するのを待ちたくない (潜在的に終了しない連続ストリームであることを意図しているため) ため、チャンクで抽出を実行したいと考えています。精度が少し低下しても問題ありません。
これまでのところ、次のようなものをまとめました。
def extractKeywords(stream: DStream[Data]): Unit = {
val spark: SparkSession = SparkSession.builder.getOrCreate
val streamWithWords: DStream[(Data, Seq[String])] = stream map extractWordsFromData
val streamWithFeatures: DStream[(Data, Array[String])] = streamWithWords transform extractFeatures(spark) _
val streamWithKeywords: DStream[DataWithKeywords] = streamWithFeatures map addKeywordsToData
streamWithFeatures.print()
}
def extractFeatures(spark: SparkSession)
(rdd: RDD[(Data, Seq[String])]): RDD[(Data, Array[String])] = {
val df = spark.createDataFrame(rdd).toDF("data", "words")
val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(numOfFeatures)
val rawFeatures = hashingTF.transform(df)
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
val idfModel = idf.fit(rawFeatures)
val rescaledData = idfModel.transform(rawFeature)
import spark.implicits._
rescaledData.select("data", "features").as[(Data, Array[String])].rdd
}
しかし、私は受け取ったjava.lang.IllegalStateException: Haven't seen any document yet.
- 私は物事を一緒に廃棄しようとしているだけなので驚かない. データの到着を待っていないので、データで使用しようとすると、生成されたモデルが空になる可能性があることを理解しています.
この問題に対する正しいアプローチは何でしょうか?