問題タブ [reactive-kafka]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
spring-cloud - Flux からリアクティブ Kafka SenderRecord を作成する
Spring Cloud Gateway と Reactive Kafka を統合するプロトタイプを作成しています。私の目標は、投稿本文をカフカにプッシュすることです。Fluxにアクセスできます。これにより、ボディをチャンクで提供できる可能性があります。この DataBuffer の Flux を SenderRecord に変換し、これを kafka ノンブロッキングにプッシュする方法について、専門家のアドバイスが必要です。
DataBufferUtils.join を使用して、 DataBufferをマージして bytes[] を取得し、これを使用して SenderRecord を作成しようとしました。これに対するより良い方法はありますか (バイト配列の割り当てを回避できますか)?
実装の詳細: Kafka 用の Spring Cloud Gateway グローバル フィルターがあります (url 構造: kafka://<producer-template>?topic=<topicname>
。次に、ボディ、すなわちフラックスを読み取り、kafkaSender.
ありがとうございました。
apache-kafka - 無期限にサブスクライブする Kafka コンシューマ スレッド - ReactiveKafkaConsumerTemplate
しばらくすると、消費者がトピックからのメッセージの消費をランダムに停止し、スローされたエラーがないという問題が発生しています。これはテスト環境であるため、トピックには多くのメッセージがありません。
そのトピックの消費者のラグは > 0 になり、その数で (それを消費する消費者がいないように) 無期限にスタックします。kafka ツールを使用してトピックを調べたところ、パーティションがコンシューマーに割り当てられていることがわかります。
トピックを消費するコンシューマーの複数のスレッドを実行しています。コードは次のとおりです。
}
私の質問: トピックにプッシュされたメッセージが一定期間ない場合、 kafkaConsumperTemplate.receive() が自動的にサブスクリプションを停止/終了/キャンセルする可能性はありますか? それとも、サブスクリプションが終了した後、スレッドが何らかの形で終了するのでしょうか?
リアクティブなカフカ コンシューマを使用すると、注釈がなくなる@KafkaListener
ため、ApplicationRunner を使用してコンシューマのスレッドを生成します。反応的なカフカ消費者を開始する別の正しい方法はありますか?