私はSpring 5、詳細にはReactorプロジェクトを使用して、膨大なMongoコレクションからKafkaトピックへの情報を読み取ります。残念ながら、Kafka メッセージの生成は、それらを消費するプログラムよりもはるかに高速です。したがって、背圧メカニズムを実装する必要があります。
毎秒 100 メッセージのスループットが必要だとします。少しグーグルで調べた結果、buffer(int maxSize)
メソッドの機能を組み合わせて、定義済みの間隔を使用してメッセージを送信する で結果を圧縮することにしました。Flux
// Create a clock that emits an event every second
final Flux<Long> clock = Flux.interval(Duration.ofMillis(1000L));
// Create a buffered producer
final Flux<ProducerRecord<String, Data>> outbound =
repository.findAll()
.map(this::buildData)
.map(this::createKafkaMessage)
.buffer(100)
// Limiting the emission in time interval
.zipWith(clock, (msgs, tick) -> msgs)
.flatMap(Flux::fromIterable);
// Subscribe a Kafka sender
kafkaSender.createOutbound()
.send(outbound)
.then()
.block();
これを行うよりスマートな方法はありますか?つまり、少し複雑に思えます (zip 部分、全体)。