3

私はSpring 5、詳細にはReactorプロジェクトを使用して、膨大なMongoコレクションからKafkaトピックへの情報を読み取ります。残念ながら、Kafka メッセージの生成は、それらを消費するプログラムよりもはるかに高速です。したがって、背圧メカニズムを実装する必要があります。

毎秒 100 メッセージのスループットが必要だとします。少しグーグルで調べた結果、buffer(int maxSize)メソッドの機能を組み合わせて、定義済みの間隔を使用してメッセージを送信する で結果を圧縮することにしました。Flux

 // Create a clock that emits an event every second
 final Flux<Long> clock = Flux.interval(Duration.ofMillis(1000L));
 // Create a buffered producer
 final Flux<ProducerRecord<String, Data>> outbound =
            repository.findAll()
                      .map(this::buildData)
                      .map(this::createKafkaMessage)
                      .buffer(100)
                      // Limiting the emission in time interval
                      .zipWith(clock, (msgs, tick) -> msgs)
                      .flatMap(Flux::fromIterable);
 // Subscribe a Kafka sender
 kafkaSender.createOutbound()
            .send(outbound)
            .then()
            .block();

これを行うよりスマートな方法はありますか?つまり、少し複雑に思えます (zip 部分、全体)。

4

1 に答える 1