Akka Streams Kafka を使用して、Kafka トピックの最後のメッセージを取得することはまったく可能ですか? Kafka トピックをリッスンする Websocket を作成していますが、現在、接続時に以前の非赤メッセージをすべて取得します。これは非常に多くのメッセージになる可能性があるため、最後のメッセージと今後のメッセージにのみ関心があります。(または将来のメッセージのみ)
起源:
def source(): Flow[Any, String, NotUsed] = {
val source = Consumer.plainSource(consumerSettings, Subscriptions.topics(MyTopic))
Flow.fromSinkAndSource[Any, String](Sink.ignore, source.map(_.value)
}
消費者設定:
@Provides
def providesConsumerSettings(@Named("kafkaUrl") kafkaUrl: String): ConsumerSettings[String, String] = {
val deserializer = new StringDeserializer()
val config = configuration.getOptional[Configuration]("akka.kafka.consumer")
.getOrElse(Configuration.empty)
ConsumerSettings(config.underlying, deserializer, deserializer)
.withBootstrapServers(kafkaUrl)
.withGroupId(GroupId)
}
設定を追加してみましたConsumerSettings.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
「オフセットを最新のオフセットに自動的にリセットする」必要がありますが、効果はないようです。