Spark ストリーミング オフセットをコンシューマに読み込もうとしていますが、正しく実行できないようです。
これが私のコードです。
val dfoffset = hiveContext.sql(s"select * from $db")
dfoffset.show()
val dfoffsetArray = dfoffset.collect()
println("printing array of data")
dfoffsetArray.foreach(println)
val fromOffsets = collection.mutable.Map[TopicAndPartition, Long]()
for (i <- dfoffsetArray) {
val topicAndPartition = (TopicAndPartition(i(1).toString, i(0).toString.toInt) -> (i(2).toString.toLong))
fromOffsets += topicAndPartition
}
val kafkaParams = Map[String, String]("bootstrap.servers" -> serverName, "group.id" -> "test")
val topics = Array(topicName).toSet
//stuck here
var directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
directKafkaStream.foreachRDD(rdd1 => { ..
これは、データフレームを表示することからの出力です
partition_number|topic_name|current_offset|
+----------------+----------+--------------+
| 0|TOPIC_NAME| 4421|
どんな助けでも大歓迎です。
spark 1.6 、Scala 2.10.5、kafka 10 を使用しています