問題タブ [spring-kafka]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
spring-integration - スプリング クラウド データ フローで kafka を使用すると、シンク コンポーネントが適切なデータを取得しない
私は英語のネイティブ スピーカーではありませんが、質問をできるだけ明確に表現するように努めています。この問題に遭遇したため、2 日間混乱しましたが、まだ解決策が見つかりません。
Hadoop YARN の Spring Could Data Flow で実行されるストリームを構築しました。
ストリームは、HTTP ソース、プロセッサ、およびファイル シンクで構成されます。
1.Http ソース
HTTP ソース コンポーネントには、application.properties で定義された dest1 と dest2 という 2 つの異なる宛先にバインドする 2 つの出力チャネルがあります。
spring.cloud.stream.bindings.output.destination=dest1 spring.cloud.stream.bindings.output2.destination=dest2
以下は、参考用の HTTP ソースのコード スニペットです。
2. プロセッサ
プロセッサには、2 つの複数入力チャネルと、異なる宛先にバインドする 2 つの出力チャネルがあります。宛先バインディングは、プロセッサ コンポーネント プロジェクトの application.properties で定義されます。
以下は、プロセッサのコード スニペットです。
3. ファイル シンク コンポーネント。
Spring の公式の fil sink コンポーネントを使用します。maven://org.springframework.cloud.stream.app:file-sink-kafka:1.0.0.BUILD-SNAPSHOT
そして、applicaiton.properties ファイルに宛先バインディングを追加するだけです。spring.cloud.stream.bindings.input.destination=fileSink
4.発見:
私が期待したデータフローは次のようになります。
Source.handleRequest() -->Processor.handleRequest()
Source.handleRequest2() -->Processor.handleRequest2() --> Sink.fileWritingMessageHandler();
"processed by transform2" という文字列のみがファイルに保存されます。
しかし、私のテストの後、データフローは実際には次のようになります。
Source.handleRequest() -->Processor.handleRequest() --> Sink.fileWritingMessageHandler();
Source.handleRequest2() -->Processor.handleRequest2() --> Sink.fileWritingMessageHandler();
"processed by transform1" と "processed by transform2" の両方の文字列がファイルに保存されます。
5.質問:
Processor.handleRequest() の出力チャネルの宛先は、fileSink ではなく hdfsSink にバインドされますが、データは引き続きファイル Sink に流れます。私はこれを理解できず、これは私が望むものではありません。Processor.handleRequest2() からのデータのみが、両方ではなくファイル シンクに流れます。私が正しく行わない場合、誰かがそれを行う方法と解決策を教えてもらえますか? 2日間私を混乱させました。
ご親切にありがとうございました。
アレックス
spring-kafka - KafkaTemplate からトピック メタデータを取得する
KafkaTemplate の実装から、実際の Kafka Producer にアクセスできないことがわかりました。metrics()
この Producer ラッピングは適切かもしれませんが、やのように必要な Kafka Producer のメソッドがいくつかありpartitionsFor(java.lang.String topic)
ます。KafkaTemplate では、実際の Kafka Producer メソッドをラップする同じメソッドを使用できます。これは新しいバージョンで実装される可能性がありますか? 実装してプル リクエストを送信できますか?
spring-kafka - Spring カフカ メッセージ コンシューマの遅延
Kafkaメッセージを消費するためにSpring kafka 1.0.3を使用しています。kafka には 2 つのトピックがあり、各トピックには 1 つのパーティションがあります。Java コードでは、各トピック メッセージを消費する 2 つの @KafkaListener があります。ConcurrentKafkaListenerContainerFactory の同時実行数は 1 に設定されています。ただし、メッセージが 20 秒以上遅れることがあります。
@KafkaListenr コードは以下のとおりです。
Kafka メッセージは spring kafka によって送信され、ログは次のとおりです。
ログにいくつかありますがpartitions assigned:[]
、なぜ割り当てられたものが空なのかわかりません。そして、webOutMessage メッセージが で消費された2016-09-26 16:16:45,870
後、次のアクションは で 15 秒以上後に消費者の再調整だった2016-09-26 16:17:01,099
ので、次のメッセージは 15 秒以上遅れました。
誰かが理由を知っていますか?
デバッグ ログを追加します。遅延は毎回ではありません。場合によっては問題ありません:</p>
spring-boot - spring-data-stream プロジェクトで同じコンシューマー グループを持つすべてのコンシューマーに発行されたメッセージ
Zookeeper と 3 つの kafka ブローカーをローカルで実行しました。1 つのプロデューサーと 1 つのコンシューマーを開始しました。コンシューマーがメッセージを消費していることがわかります。
次に、同じコンシューマー グループ名で 3 つのコンシューマーを開始しました (Spring Boot プロジェクトなのでポートが異なります)。しかし、私が見つけたのは、すべてのコンシューマーが現在メッセージを消費 (受信) していることです。ただし、メッセージのみがコンシューマー間で繰り返されないという点で、メッセージの負荷が分散されることを期待しています。何が問題なのかわからない。
ここに私のプロパティファイルがあります
ここでのグループは timerGroup です。
消費者コード: https://github.com/codecentric/edmp-sample-stream-sink
プロデューサーコード: https://github.com/codecentric/edmp-sample-stream-source
java - Spring Integration Kafka と Spring Kafka の比較
ワーカー スプリング アプリケーションとコンシューマー スプリング アプリケーションの間にメッセージ チャネルを実装しようとしています (複数の JVM に同じコンシューマーのレプリカが存在します)。
Java Config では、Spring 統合に関するドキュメントが限られているため、Spring Kafka のドキュメントを見つけることができました。依存関係がどのように機能しているか正確にはわかりませんが、
- Spring Kafka 統合は、Spring Kafka に基づいています。これについてのアイデアを教えてください。
- Spring Integration Kafka の新しいリリースに関する適切なドキュメントはどこにありますか?
spring - Spring 統合 kafka kafka に生成するときにエラーを処理する方法
int-kafka:outbound-channel-adapter を使用して kafka を生成している間、使用可能なエラー チャネルがないようです。この場合、再試行回数後にメッセージを処理してカフカに生成できませんでしたか?
Produce to Kafka の失敗を引き起こす可能性のあるエラー。(次のコードは、インターネットからのコード スニペットにすぎません。エラー ハンドルを追加する方法を考えているだけです)