問題タブ [akka-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
1 に答える
366 参照

scala - 反応型カフカを使用してメッセージを条件付きで処理する

私は反応型カフカを使おうとしてきましたが、条件付き処理に問題があり、満足のいく答えが見つかりませんでした。

基本的に、膨大な数のメッセージ (1 日あたり約 100 億メッセージ) を含む 1 つの kafka トピックを消費し、メッセージのいくつかのプロパティに基づいてそれらのメッセージのいくつか (1 日あたり数千) のみを処理しようとしています。メッセージの処理済みバージョンを別のトピックにプッシュしますが、それを適切に行うのに苦労しています。

私の最初の試みは次のようなものでした:

このアプローチの問題は、処理できるメッセージを読んだときにのみコミットすることです。これは明らかにクールではありません。プログラムを停止して再起動する必要がある場合は、大量の役に立たないメッセージを読み直さなければならないためです。数が多いので、そんな余裕はありません。

次に、次の行の周りで何かをして、GraphDSL を使用しようとしました。

処理できないメッセージはグラフの 2 番目のブランチを通過し、処理可能なメッセージが実際に宛先にプッシュされる前にコミットされるため、このソリューションは明らかに良くありません。少なくとも 1 回の配信も保証します。

この問題を解決する方法を知っている人はいますか?

0 投票する
1 に答える
407 参照

playframework - websocket との接続時に akka-stream-kafka を使用して kafka トピックから最後のメッセージを取得する

Akka Streams Kafka を使用して、Kafka トピックの最後のメッセージを取得することはまったく可能ですか? Kafka トピックをリッスンする Websocket を作成していますが、現在、接続時に以前の非赤メッセージをすべて取得します。これは非常に多くのメッセージになる可能性があるため、最後のメッセージと今後のメッセージにのみ関心があります。(または将来のメッセージのみ)

起源:

消費者設定:


設定を追加してみましたConsumerSettings.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")

「オフセットを最新のオフセットに自動的にリセットする」必要がありますが、効果はないようです。

0 投票する
0 に答える
87 参照

apache-kafka - Akkaストリームの内部および明示的なバッファは、alpakka Kafkaの基礎となるkafkaクライアント設定とどのように相互作用しますか?

akka ストリーム バッファを使用してストリームのスループットを向上させようとしていますが、それが Kafka にどのように適用されるのか疑問に思っています

特に、

基礎となる Kafka API に関して、ここで正確に何が起こるのでしょうか?

基礎となる Kafka クライアントに次の構成があります。

したがってMAX_POLL_RECORDS_CONFIG、私は を持っFETCH_MAX_BYTES_CONFIGていますMAX_PARTITION_FETCH_BYTES_CONFIG

私が疑問に思っているのは、基になるクライアントで構成されたフェッチに関してバッファーがどのように再生されるかということです。

  1. Consumer.committableSource独自のアクターで具体化され、そのバッファーを介して、基礎となる Kafka クライアントからメッセージを受信しますか? 基になるクライアントが最大 100 万のメッセージを取得するように構成されており、Actor が1000?のバッファーとして設定されているとします。それはどう言う意味ですか?どうなりますか?アクター バッファーは、Kafka クライアントのポーリング リクエストをオーバーライドしますか? それとも、ポーリングの結果 (基になるクライアントで構成された最大値) が通過するまで、Kafka クライアントがプッシュするメール ボックスにデータを取得しますか?

Kafka ストリームの内部バッファまたは明示的バッファがポーリング リクエストの設定とどのように相互作用するかを知る必要があると思います。