0

Spark ストリーミング オフセットをコンシューマに読み込もうとしていますが、正しく実行できないようです。

これが私のコードです。

val dfoffset = hiveContext.sql(s"select * from $db")
dfoffset.show()
val dfoffsetArray = dfoffset.collect()
println("printing array of data")
dfoffsetArray.foreach(println)
val fromOffsets = collection.mutable.Map[TopicAndPartition, Long]()
for (i <- dfoffsetArray) {
  val topicAndPartition = (TopicAndPartition(i(1).toString, i(0).toString.toInt) -> (i(2).toString.toLong))
  fromOffsets += topicAndPartition
}

val kafkaParams = Map[String, String]("bootstrap.servers" -> serverName, "group.id" -> "test")
val topics = Array(topicName).toSet
//stuck here 
var directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

directKafkaStream.foreachRDD(rdd1 => { ..

これは、データフレームを表示することからの出力です

partition_number|topic_name|current_offset|
+----------------+----------+--------------+
|               0|TOPIC_NAME|          4421|

どんな助けでも大歓迎です。

spark 1.6 、Scala 2.10.5、kafka 10 を使用しています

4

1 に答える 1

1

公式ドキュメントで KafkaUtils.createDirectStream が示されているよう、 createDirectStream の 3 番目のパラメーターとしてを渡す必要がありfromOffsetsます (4 番目のパラメーターを忘れないでくださいmessageHandler)。

fromOffsetsパラメーターは であると想定されます。collection.immutable.Map[TopicAndPartition, Long]通常、Scala では可能な限り可変ではなく不変を使用します。を次のように
変換できます。dfoffsetArrayimmutable.Map[TopicAndPartition, Long]

val fromOffsets = dfoffsetArray.map( i =>
  TopicAndPartition(i(1).toString, i(0).toString.toInt) -> (i(2).toString.toLong)
).toMap

は、メッセージのキーと値を扱うmessageHandlerの型です。(MessageAndMetadata[K, V]) ⇒ R)単純なハンドラーを次のように定義できます。

val messageHandler =
  (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)

それからあなたのcreateDirectStream意志は次のようになります...

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder,
  (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)

これで、ストリームを自由に変換できます。ハッピーストリーミング!


私は数ヶ月前にこの記事で指導を受けました。多分あなたはそれが役に立つと思うでしょう。

于 2017-12-15T09:35:15.410 に答える