問題タブ [reactive-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
0 に答える
253 参照

spring-cloud - Flux からリアクティブ Kafka SenderRecord を作成する

Spring Cloud Gateway と Reactive Kafka を統合するプロトタイプを作成しています。私の目標は、投稿本文をカフカにプッシュすることです。Fluxにアクセスできます。これにより、ボディをチャンクで提供できる可能性があります。この DataBuffer の Flux を SenderRecord に変換し、これを kafka ノンブロッキングにプッシュする方法について、専門家のアドバイスが必要です。

DataBufferUtils.join を使用して、 DataBufferをマージして bytes[] を取得し、これを使用して SenderRecord を作成しようとしました。これに対するより良い方法はありますか (バイト配列の割り当てを回避できますか)?

実装の詳細: Kafka 用の Spring Cloud Gateway グローバル フィルターがあります (url 構造: kafka://<producer-template>?topic=<topicname>。次に、ボディ、すなわちフラックスを読み取り、kafkaSender.

ありがとうございました。

0 投票する
0 に答える
446 参照

apache-kafka - 無期限にサブスクライブする Kafka コンシューマ スレッド - ReactiveKafkaConsumerTemplate

しばらくすると、消費者がトピックからのメッセージの消費をランダムに停止し、スローされたエラーがないという問題が発生しています。これはテスト環境であるため、トピックには多くのメッセージがありません。

そのトピックの消費者のラグは > 0 になり、その数で (それを消費する消費者がいないように) 無期限にスタックします。kafka ツールを使用してトピックを調べたところ、パーティションがコンシューマーに割り当てられていることがわかります。

トピックを消費するコンシューマーの複数のスレッドを実行しています。コードは次のとおりです。

}

私の質問: トピックにプッシュされたメッセージが一定期間ない場合、 kafkaConsumperTemplate.receive() が自動的にサブスクリプションを停止/終了/キャンセルする可能性はありますか? それとも、サブスクリプションが終了した後、スレッドが何らかの形で終了するのでしょうか?

リアクティブなカフカ コンシューマを使用すると、注釈がなくなる@KafkaListenerため、ApplicationRunner を使用してコンシューマのスレッドを生成します。反応的なカフカ消費者を開始する別の正しい方法はありますか?