0

私は反応型カフカを使おうとしてきましたが、条件付き処理に問題があり、満足のいく答えが見つかりませんでした。

基本的に、膨大な数のメッセージ (1 日あたり約 100 億メッセージ) を含む 1 つの kafka トピックを消費し、メッセージのいくつかのプロパティに基づいてそれらのメッセージのいくつか (1 日あたり数千) のみを処理しようとしています。メッセージの処理済みバージョンを別のトピックにプッシュしますが、それを適切に行うのに苦労しています。

私の最初の試みは次のようなものでした:

// This is pseudo code.
Source(ProducerSettings(...))
    .filter(isProcessable(_))
    .map(process(_))
    .via(Producer.flow(producerSettings))
    .map(_.commitScalaDsl())
    .runWith(Sink.ignore)

このアプローチの問題は、処理できるメッセージを読んだときにのみコミットすることです。これは明らかにクールではありません。プログラムを停止して再起動する必要がある場合は、大量の役に立たないメッセージを読み直さなければならないためです。数が多いので、そんな余裕はありません。

次に、次の行の周りで何かをして、GraphDSL を使用しようとしました。

in ~> broadcast ~> isProcessable    ~> process ~> producer ~> merge ~> commit
   ~> broadcast ~>              isNotProcessable           ~> merge

処理できないメッセージはグラフの 2 番目のブランチを通過し、処理可能なメッセージが実際に宛先にプッシュされる前にコミットされるため、このソリューションは明らかに良くありません。少なくとも 1 回の配信も保証します。

この問題を解決する方法を知っている人はいますか?

4

1 に答える 1