6

Spark ストリーミングを使用して Kafka から古いメッセージを読み取ろうとしています。ただし、リアルタイムで送信されたメッセージしか取得できません (つまり、spark プログラムの実行中に新しいメッセージを入力すると、それらのメッセージが取得されます)。

私の groupID と consumerID を変更して、zookeeper が私のプログラムが以前に見たことを知っているメッセージを出していないことを確認しています。

spark が Zookeeper のオフセットを -1 と見なしていると仮定すると、キュー内のすべての古いメッセージを読み取るべきではありませんか? カフカキューの使用方法を誤解しているだけですか? 私はスパークとカフカに非常に慣れていないので、何かを誤解しているだけである可能性を排除できません。

package com.kibblesandbits

import org.apache.spark.SparkContext
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils

import net.liftweb.json._

object KafkaStreamingTest {

  val cfg = new ConfigLoader().load
  val zookeeperHost = cfg.zookeeper.host
  val zookeeperPort = cfg.zookeeper.port
  val zookeeper_kafka_chroot = cfg.zookeeper.kafka_chroot

  implicit val formats = DefaultFormats 

  def parser(json: String): String = {
    return json
}

def main(args : Array[String]) {
  val zkQuorum = "test-spark02:9092"

  val group = "myGroup99"
  val topic = Map("testtopic" -> 1)
  val sparkContext = new SparkContext("local[3]", "KafkaConsumer1_New")
  val ssc = new StreamingContext(sparkContext, Seconds(3))
  val json_stream = KafkaUtils.createStream(ssc, zkQuorum, group, topic)
  var gp = json_stream.map(_._2).map(parser)

  gp.saveAsTextFiles("/tmp/sparkstreaming/mytest", "json")
  ssc.start()
}

これを実行すると、次のメッセージが表示されます。したがって、オフセットが設定されているためにメッセージが表示されないだけではないと確信しています。

14/12/05 13:34:08 INFO ConsumerFetcherManager: [ConsumerFetcherManager-1417808045047] パーティション ArrayBuffer([[testtopic,0], initOffset -1 のフェッチャーをブローカー id:1,host:test-spark02.vpc,port: に追加しました: 9092] , [[testtopic,1], initOffset -1 からブローカー ID:1,host:test-spark02.vpc,port:9092] , [[testtopic,2], initOffset -1 からブローカー ID:1,host: test-spark02.vpc,port:9092] , [[testtopic,3], initOffset -1 からブローカー ID:1,host:test-spark02.vpc,port:9092] , [[testtopic,4], initOffset -1ブローカー ID:1、ホスト:test-spark02.vpc、ポート:9092] へ)

次に、1000 件の新しいメッセージを入力すると、一時ディレクトリに保存された 1000 件のメッセージが表示されます。しかし、既存のメッセージの読み方がわかりません。(この時点で) 数万に上るはずです。

4

1 に答える 1

8

Kafka コンシューマに構成を提供できる代替ファクトリ メソッドKafkaUtilsを使用します。

def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag](
      ssc: StreamingContext,
      kafkaParams: Map[String, String],
      topics: Map[String, Int],
      storageLevel: StorageLevel
    ): ReceiverInputDStream[(K, V)]

次に、kafka 構成でマップを作成し、パラメーター 'kafka.auto.offset.reset' を 'smallest' に設定して追加します。

val kafkaParams = Map[String, String](
      "zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
      "zookeeper.connection.timeout.ms" -> "10000",
      "kafka.auto.offset.reset" -> "smallest"
)

その構成を上記のファクトリ メソッドに提供します。"kafka.auto.offset.reset" -> "smallest" は、消費者にトピックの最小オフセットから開始するように指示します。

于 2014-12-06T12:50:15.527 に答える