私は反応型カフカを使おうとしてきましたが、条件付き処理に問題があり、満足のいく答えが見つかりませんでした。
基本的に、膨大な数のメッセージ (1 日あたり約 100 億メッセージ) を含む 1 つの kafka トピックを消費し、メッセージのいくつかのプロパティに基づいてそれらのメッセージのいくつか (1 日あたり数千) のみを処理しようとしています。メッセージの処理済みバージョンを別のトピックにプッシュしますが、それを適切に行うのに苦労しています。
私の最初の試みは次のようなものでした:
// This is pseudo code.
Source(ProducerSettings(...))
.filter(isProcessable(_))
.map(process(_))
.via(Producer.flow(producerSettings))
.map(_.commitScalaDsl())
.runWith(Sink.ignore)
このアプローチの問題は、処理できるメッセージを読んだときにのみコミットすることです。これは明らかにクールではありません。プログラムを停止して再起動する必要がある場合は、大量の役に立たないメッセージを読み直さなければならないためです。数が多いので、そんな余裕はありません。
次に、次の行の周りで何かをして、GraphDSL を使用しようとしました。
in ~> broadcast ~> isProcessable ~> process ~> producer ~> merge ~> commit
~> broadcast ~> isNotProcessable ~> merge
処理できないメッセージはグラフの 2 番目のブランチを通過し、処理可能なメッセージが実際に宛先にプッシュされる前にコミットされるため、このソリューションは明らかに良くありません。少なくとも 1 回の配信も保証します。
この問題を解決する方法を知っている人はいますか?