Spark ストリーミングを使用して Kafka から古いメッセージを読み取ろうとしています。ただし、リアルタイムで送信されたメッセージしか取得できません (つまり、spark プログラムの実行中に新しいメッセージを入力すると、それらのメッセージが取得されます)。
私の groupID と consumerID を変更して、zookeeper が私のプログラムが以前に見たことを知っているメッセージを出していないことを確認しています。
spark が Zookeeper のオフセットを -1 と見なしていると仮定すると、キュー内のすべての古いメッセージを読み取るべきではありませんか? カフカキューの使用方法を誤解しているだけですか? 私はスパークとカフカに非常に慣れていないので、何かを誤解しているだけである可能性を排除できません。
package com.kibblesandbits
import org.apache.spark.SparkContext
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils
import net.liftweb.json._
object KafkaStreamingTest {
val cfg = new ConfigLoader().load
val zookeeperHost = cfg.zookeeper.host
val zookeeperPort = cfg.zookeeper.port
val zookeeper_kafka_chroot = cfg.zookeeper.kafka_chroot
implicit val formats = DefaultFormats
def parser(json: String): String = {
return json
}
def main(args : Array[String]) {
val zkQuorum = "test-spark02:9092"
val group = "myGroup99"
val topic = Map("testtopic" -> 1)
val sparkContext = new SparkContext("local[3]", "KafkaConsumer1_New")
val ssc = new StreamingContext(sparkContext, Seconds(3))
val json_stream = KafkaUtils.createStream(ssc, zkQuorum, group, topic)
var gp = json_stream.map(_._2).map(parser)
gp.saveAsTextFiles("/tmp/sparkstreaming/mytest", "json")
ssc.start()
}
これを実行すると、次のメッセージが表示されます。したがって、オフセットが設定されているためにメッセージが表示されないだけではないと確信しています。
14/12/05 13:34:08 INFO ConsumerFetcherManager: [ConsumerFetcherManager-1417808045047] パーティション ArrayBuffer([[testtopic,0], initOffset -1 のフェッチャーをブローカー id:1,host:test-spark02.vpc,port: に追加しました: 9092] , [[testtopic,1], initOffset -1 からブローカー ID:1,host:test-spark02.vpc,port:9092] , [[testtopic,2], initOffset -1 からブローカー ID:1,host: test-spark02.vpc,port:9092] , [[testtopic,3], initOffset -1 からブローカー ID:1,host:test-spark02.vpc,port:9092] , [[testtopic,4], initOffset -1ブローカー ID:1、ホスト:test-spark02.vpc、ポート:9092] へ)
次に、1000 件の新しいメッセージを入力すると、一時ディレクトリに保存された 1000 件のメッセージが表示されます。しかし、既存のメッセージの読み方がわかりません。(この時点で) 数万に上るはずです。