0

Spark Kafka ダイレクト ストリーミングを使用して Kafka からメッセージを読み込んでいます。メッセージ損失ゼロを実装したいのですが、スパークを再起動した後、欠落したメッセージを Kafka から読み取る必要があります。チェックポイントを使用してすべての読み取りオフセットを保存しているため、次回スパークは保存されたオフセットから読み取りを開始します。これが私の理解です。

以下のコードを使用しました。スパークを停止し、いくつかのメッセージを Kafka にプッシュしました。Kafka からの見逃したメッセージを読み取っていないスパークを再起動した後。Spark は kafka から最新のメッセージを読み取ります。Kafka からの見逃したメッセージを読むには?

val ssc = new StreamingContext(spark.sparkContext, Milliseconds(6000))
ssc.checkpoint("C:/cp")

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("test")
val ssc = new StreamingContext(spark.sparkContext, Milliseconds(50))
val msgStream = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

注: アプリケーション ログには、 auto.offset.resetがlatestではなくnoneと表示されます。なぜ ?

WARN KafkaUtils: overriding auto.offset.reset to none for executor

SBT

scalaVersion := "2.11.8"
val sparkVersion = "2.2.0"
val connectorVersion = "2.0.7"
val kafka_stream_version = "1.6.3"

窓 : 7

4

2 に答える 2