問題タブ [spring-cloud-stream]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
spring-cloud-stream - コンシューマ グループのすべてのコンシューマがメッセージを受信します
Kafka トピックからメッセージを受信するための春のクラウド ストリーム アプリを作成しました。アプリをスケーリングすると、アプリ インスタンスの 1 つだけが Kafka トピックからメッセージを受信するように、コンシューマー グループをセットアップしようとしています。
以下は私の application.yml
spring:
cloud:
stream:
bindings:
orderTopic:
group: orderGroup
destination: orderTopic
kafka:
binder:
brokers: 192.168.61.21
defaultBrokerPort: 9092
zkNodes: 192.168.61.21
defaultZkPort: 2181
です。アプリをクラウド ファウンドリー (pcfdev) にデプロイし、アプリのインスタンス数を 2 に設定しました。注文を orderTopic トピックに送信すると、2 つのアプリ インスタンスのうちの 1 つだけが注文を受け取ることを期待しています。 、ただし、両方のアプリ インスタンスは、以下に示すように同じ順序を受け取ります
2016-05-10T16:33:46.42-0600 [APP/0] OUT Order Number 23 received.
2016-05-10T16:33:47.42-0600 [APP/1] OUT Order Number 24 received.
2016-05-10T16:33:47.42-0600 [APP/0] OUT Order Number 24 received.
2016-05-10T16:33:48.42-0600 [APP/1] OUT Order Number 25 received.
2016-05-10T16:33:48.42-0600 [APP/0] OUT Order Number 25 received.
2016-05-10T16:33:49.43-0600 [APP/1] OUT Order Number 26 received.
2016-05-10T16:33:49.43-0600 [APP/0] OUT Order Number 26 received.
助けてください
java - Spring Cloud Stream @ServiceActivator が例外で errorChannel にメッセージを送信しない
spring-cloud-starter-stream-kafka を使用して、Spring Cloud Stream を使用しています。次のように、チャネルを kafka トピックにバインドしましたapplication.properties
。
プログラムでエラー チャネルに例外メッセージを生成することができません。驚いたことに、私は別のスレッドにいるにもかかわらず、それを生成しようとさえしていないようです (@MessagingGateway
メッセージを にダンプする がgatewayOutput
あり、残りのフローは非同期で行われます)。my の定義は次のServiceActivator
とおりです。
これが生成されたログです (完全な例外を切り捨てました)。ありません...
- サブスクライバーがいない errorChannel に関する苦情
- Kafka プロデューサー スレッドのロギング
編集:これが私のチャンネルクラスの内容です:
java - Spring-Cloud-Stream で Kafka への再接続を構成する方法
問題なく動作する spring-cloud-stream を使用して、単純な Kafka コンシューマを開発しました。Kafka がシャットダウンされると、フレームワークは自動再接続も実行します。問題は、再接続が全速力で試行されることです (私のマシンでは 1 秒あたり約 10 回)。
質問: 2 回の試行間の間隔、バックオフなどに関して再接続動作を構成するにはどうすればよいですか?
更新 マリウスが提案したように、私は問題を開きました。これは、希望する機能が存在しないことを示唆しています。答えてくれてありがとう!
https://github.com/spring-cloud/spring-cloud-stream/issues/541
spring-cloud-stream - spring-cloud-stream カフカ エラー処理
spring-cloud-stream 1.0.0.RELEASE のドキュメントを少し調べましたが、エラー処理に関するドキュメントが見つからないようです。
kafka 0.9 での観察によると、消費者が RuntimeException をスローすると、3 回の再試行が見られます。3 回再試行した後、ログに次のように表示されます。
この時点で、コンシューマー オフセットは 1 遅れます。コンシューマーを再起動すると、メッセージは 3 回再試行されます。ただし、コンシューマーが例外をスローしないように同じパーティションに別のメッセージを送信すると、コンシューマー オフセットが更新され、例外をスローした元のメッセージは再起動後に再試行されなくなります。
これは、私が見つけられなかったどこかに文書化されていますか? エラー処理はバインダー固有ですか、それともバインダー間で一貫性を保つために scs を抽象化していますか? これは、消費者オフセットが kafka バインダーで更新される方法の予期しない結果であると思われます。enableDlq kafka コンシューマー プロパティが追加されたことがわかりました。それをテストしようとしていますが、kafka でデッド レターを処理する方法がわかりません。私はrabbitmqのデッドレターキューに精通していますが、rabbitmqではrabbitmq shovelプラグインを使用してdlqメッセージを再発行して再試行し、一時的なサービス停止が原因で失敗した場合をカバーできます. 同様のユーティリティを自分で作成する以外に、kafka で使用できる同様の機能については知りません。
更新: enableDlq kafka コンシューマー プロパティを有効にしてテストすると、エラー処理に関する同じコンシューマー オフセットの問題が示されます。コンシューマーが RuntimeException をスローすると、再試行が 3 回表示された後、エラー メッセージがログに記録されずerror.<destination>.<group>
、文書化されているように発行されたメッセージが表示されますが、コンシューマー オフセットは更新されず、1 遅れます。コンシューマーを再起動すると、元のトピック パーティションから同じ失敗したメッセージの処理を再試行し、3 回再試行して、同じメッセージをerror.<destination>.<group>
トピックに再度配置します (重複した dlq メッセージ)。コンシューマーが RuntimeException をスローしない同じトピック パーティションに別のメッセージを発行すると、オフセットが更新され、元の失敗したメッセージは再起動時に再試行されなくなります。
enableDlqがtrueかどうかに関係なく、消費者がエラーをスローしたときに、消費者はkafkaで消費者オフセットを更新する必要があると思います。これにより、すべての再試行に失敗したメッセージが破棄される (enableDlq が false の場合) か、dlq に発行されて再試行されない (enableDlq が true の場合) という一貫性が少なくとも保たれます。
spring-cloud-stream - Spring Cloud Stream 1.0.0-RELEASE とスレッド
以前 RC2 を使用していたとき、各 kafka コンシューマーが独自のスレッド (この例ではpool-6-thread-1 )で実行されているように見えることに気付きました。
しかし、1.0.0RELEASE にアップグレードした後、プロパティの設定に関係なく、同じスレッド ( kafka-binder- ) がすべての消費に使用されているようです。concurrency
これは新しいデフォルトの動作ですか? それぞれ独自のスレッドで実行される MessageChannelBinders のプールを利用するようにアプリケーションを構成するにはどうすればよいですか?
spring-integration - spring-cloud-stream カフカ アブロ
spring-cloud-stream のコンフルエント プラットフォームからの avro および/またはスキーマ レジストリのサポートはありますか、またはサポートする予定はありますか? spring-cloud-stream-binder-kafka から除外されている spring-integration-kafka 1.3.0.RELEASE には avro への依存関係があることがわかりますが、spring-integration-kafka (2.0) の master ブランチと spring- kafka には、私が見つけることができる avro 依存関係はありません。
java - Spring Cloud Stream メッセージをインターセプトする方法は?
Spring では、RestTemplate や SpringMVC など、多くの製品のメッセージを傍受できます。Spring Cloud Stream メッセージを傍受することは可能ですか? 受信メッセージと送信メッセージの両方。