問題タブ [spring-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
1 に答える
631 参照

spring-integration - スプリング クラウド データ フローで kafka を使用すると、シンク コンポーネントが適切なデータを取得しない

私は英語のネイティブ スピーカーではありませんが、質問をできるだけ明確に表現するように努めています。この問題に遭遇したため、2 日間混乱しましたが、まだ解決策が見つかりません。

Hadoop YARN の Spring Could Data Flow で実行されるストリームを構築しました。

ストリームは、HTTP ソース、プロセッサ、およびファイル シンクで構成されます。

1.Http ソース
HTTP ソース コンポーネントには、application.properties で定義された dest1 と dest2 という 2 つの異なる宛先にバインドする 2 つの出力チャネルがあります。

spring.cloud.stream.bindings.output.destination=dest1 spring.cloud.stream.bindings.output2.destination=dest2

以下は、参考用の HTTP ソースのコード スニペットです。

2. プロセッサ
プロセッサには、2 つの複数入力チャネルと、異なる宛先にバインドする 2 つの出力チャネルがあります。宛先バインディングは、プロセッサ コンポーネント プロジェクトの application.properties で定義されます。

以下は、プロセッサのコード スニペットです。

3. ファイル シンク コンポーネント。

Spring の公式の fil sink コンポーネントを使用します。maven://org.springframework.cloud.stream.app:file-sink-kafka:1.0.0.BUILD-SNAPSHOT

そして、applicaiton.properties ファイルに宛先バインディングを追加するだけです。spring.cloud.stream.bindings.input.destination=fileSink

4.発見:

私が期待したデータフローは次のようになります。

Source.handleRequest() -->Processor.handleRequest()

Source.handleRequest2() -->Processor.handleRequest2() --> Sink.fileWritingMessageHandler();

"processed by transform2" という文字列のみがファイルに保存されます。

しかし、私のテストの後、データフローは実際には次のようになります。

Source.handleRequest() -->Processor.handleRequest() --> Sink.fileWritingMessageHandler();

Source.handleRequest2() -->Processor.handleRequest2() --> Sink.fileWritingMessageHandler();

"processed by transform1" と "processed by transform2" の両方の文字列がファイルに保存されます。

5.質問:

Processor.handleRequest() の出力チャネルの宛先は、fileSink ではなく hdfsSink にバインドされますが、データは引き続きファイル Sink に流れます。私はこれを理解できず、これは私が望むものではありません。Processor.handleRequest2() からのデータのみが、両方ではなくファイル シンクに流れます。私が正しく行わない場合、誰かがそれを行う方法と解決策を教えてもらえますか? 2日間私を混乱させました。

ご親切にありがとうございました。

アレックス

0 投票する
1 に答える
932 参照

spring-kafka - KafkaTemplate からトピック メタデータを取得する

KafkaTemplate の実装から、実際の Kafka Producer にアクセスできないことがわかりました。metrics()この Producer ラッピングは適切かもしれませんが、やのように必要な Kafka Producer のメソッドがいくつかありpartitionsFor(java.lang.String topic)ます。KafkaTemplate では、実際の Kafka Producer メソッドをラップする同じメソッドを使用できます。これは新しいバージョンで実装される可能性がありますか? 実装してプル リクエストを送信できますか?

0 投票する
1 に答える
1871 参照

spring-kafka - Spring カフカ メッセージ コンシューマの遅延

Kafkaメッセージを消費するためにSpring kafka 1.0.3を使用しています。kafka には 2 つのトピックがあり、各トピックには 1 つのパーティションがあります。Java コードでは、各トピック メッセージを消費する 2 つの @KafkaListener があります。ConcurrentKafkaListenerContainerFactory の同時実行数は 1 に設定されています。ただし、メッセージが 20 秒以上遅れることがあります。

@KafkaListenr コードは以下のとおりです。

Kafka メッセージは spring kafka によって送信され、ログは次のとおりです。

ログにいくつかありますがpartitions assigned:[]、なぜ割り当てられたものが空なのかわかりません。そして、webOutMessage メッセージが で消費された2016-09-26 16:16:45,870後、次のアクションは で 15 秒以上後に消費者の再調整だった2016-09-26 16:17:01,099ので、次のメッセージは 15 秒以上遅れました。

誰かが理由を知っていますか?

デバッグ ログを追加します。遅延は毎回ではありません。場合によっては問題ありません:</p>

0 投票する
1 に答える
307 参照

spring-boot - spring-data-stream プロジェクトで同じコンシューマー グループを持つすべてのコンシューマーに発行されたメッセージ

Zookeeper と 3 つの kafka ブローカーをローカルで実行しました。1 つのプロデューサーと 1 つのコンシューマーを開始しました。コンシューマーがメッセージを消費していることがわかります。

次に、同じコンシューマー グループ名で 3 つのコンシューマーを開始しました (Spring Boot プロジェクトなのでポートが異なります)。しかし、私が見つけたのは、すべてのコンシューマーが現在メッセージを消費 (受信) していることです。ただし、メッセージのみがコンシューマー間で繰り返されないという点で、メッセージの負荷が分散されることを期待しています。何が問題なのかわからない。

ここに私のプロパティファイルがあります

ここでのグループは timerGroup です。

消費者コード: https://github.com/codecentric/edmp-sample-stream-sink

プロデューサーコード: https://github.com/codecentric/edmp-sample-stream-source

0 投票する
1 に答える
3795 参照

java - Spring Integration Kafka と Spring Kafka の比較

ワーカー スプリング アプリケーションとコンシューマー スプリング アプリケーションの間にメッセージ チャネルを実装しようとしています (複数の JVM に同じコンシューマーのレプリカが存在します)。

Java Config では、Spring 統合に関するドキュメントが限られているため、Spring Kafka のドキュメントを見つけることができました。依存関係がどのように機能しているか正確にはわかりませんが、

  1. Spring Kafka 統合は、Spring Kafka に基づいています。これについてのアイデアを教えてください。
  2. Spring Integration Kafka の新しいリリースに関する適切なドキュメントはどこにありますか?
0 投票する
1 に答える
425 参照

spring - Spring 統合 kafka kafka に生成するときにエラーを処理する方法

int-kafka:outbound-channel-adapter を使用して kafka を生成している間、使用可能なエラー チャネルがないようです。この場合、再試行回数後にメッセージを処理してカフカに生成できませんでしたか?

Produce to Kafka の失敗を引き起こす可能性のあるエラー。(次のコードは、インターネットからのコード スニペットにすぎません。エラー ハンドルを追加する方法を考えているだけです)