3

DStream を介して Kafka から到着するデータがあります。いくつかのキーワードを取得するために特徴抽出を実行したいと考えています。

すべてのデータが到着するのを待ちたくない (潜在的に終了しない連続ストリームであることを意図しているため) ため、チャンクで抽出を実行したいと考えています。精度が少し低下しても問題ありません。

これまでのところ、次のようなものをまとめました。

def extractKeywords(stream: DStream[Data]): Unit = {

  val spark: SparkSession = SparkSession.builder.getOrCreate

  val streamWithWords: DStream[(Data, Seq[String])] = stream map extractWordsFromData

  val streamWithFeatures: DStream[(Data, Array[String])] = streamWithWords transform extractFeatures(spark) _

  val streamWithKeywords: DStream[DataWithKeywords] = streamWithFeatures map addKeywordsToData

  streamWithFeatures.print()
}

def extractFeatures(spark: SparkSession)
                   (rdd: RDD[(Data, Seq[String])]): RDD[(Data, Array[String])] = {

  val df = spark.createDataFrame(rdd).toDF("data", "words")

  val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(numOfFeatures)
  val rawFeatures = hashingTF.transform(df)

  val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
  val idfModel = idf.fit(rawFeatures)

  val rescaledData = idfModel.transform(rawFeature)

  import spark.implicits._
  rescaledData.select("data", "features").as[(Data, Array[String])].rdd
}

しかし、私は受け取ったjava.lang.IllegalStateException: Haven't seen any document yet.- 私は物事を一緒に廃棄しようとしているだけなので驚かない. データの到着を待っていないので、データで使用しようとすると、生成されたモデルが空になる可能性があることを理解しています.

この問題に対する正しいアプローチは何でしょうか?

4

1 に答える 1