3

Akka Streams Kafka を使用して、Kafka トピックの最後のメッセージを取得することはまったく可能ですか? Kafka トピックをリッスンする Websocket を作成していますが、現在、接続時に以前の非赤メッセージをすべて取得します。これは非常に多くのメッセージになる可能性があるため、最後のメッセージと今後のメッセージにのみ関心があります。(または将来のメッセージのみ)

起源:

def source(): Flow[Any, String, NotUsed] = {
  val source = Consumer.plainSource(consumerSettings, Subscriptions.topics(MyTopic))
  Flow.fromSinkAndSource[Any, String](Sink.ignore, source.map(_.value)
}

消費者設定:

  @Provides
def providesConsumerSettings(@Named("kafkaUrl") kafkaUrl: String): ConsumerSettings[String, String] = {
  val deserializer = new StringDeserializer()
  val config = configuration.getOptional[Configuration]("akka.kafka.consumer")
    .getOrElse(Configuration.empty)

  ConsumerSettings(config.underlying, deserializer, deserializer)
    .withBootstrapServers(kafkaUrl)
    .withGroupId(GroupId)
}

設定を追加してみましたConsumerSettings.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")

「オフセットを最新のオフセットに自動的にリセットする」必要がありますが、効果はないようです。

4

1 に答える 1