問題タブ [spring-kafka]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
spring - spring-kafka (非統合) 消費者がメッセージを消費しない
アプリケーションを spring-kafka (spring-integration-kafka ではない) と統合しています。プロジェクトの春のドキュメントは次のとおりです。http://docs.spring.io/spring-kafka/docs/1.0.1.RELEASE/reference/htmlsingle
プロデューサーは完全に動作しますが、コンシューマーはメッセージを消費していません。任意のポインター。
これが私の構成です:
**より多くの情報で編集**
ゲイリーさん、返信ありがとうございます。ログに例外はありません。またKafkaTemplate
、同様の構成で試してみたところ、メッセージをキューに発行できましたが、消費者にとってはうまくいきませんでした。Message オブジェクトの代わりに String を使用するようにコードを変更しています。ログの一部は次のとおりです。
また、ログに次のように表示されます。
java - Kafka サブスクライバーが代替メッセージのみを取得する
以下に示すように、1 つのパーティションと 1 つのレプリケーション ファクターのみを持つトピック car があります。
Topic:cars PartitionCount:1 ReplicationFactor:1 Configs:
Topic: cars Partition: 0 Leader: 0 Replicas: 0 Isr: 0
ローカルホストの 9092 で 1 つ、9093 で 1 つの 2 つのブローカーを実行しています。
私の Java アプリケーションはトピック cars にメッセージを送信します。非常に奇妙な動作が見られます。
- 私の Java アプリケーションは、メッセージ #1、メッセージ #3 などの代替メッセージしか受信しません。
この問題をデバッグするために、コンソール コンシューマーを開始しました。これで、Java アプリケーションが取得できなかったメッセージのみが表示されました。たとえば、メッセージ #2、メッセージ #4 など
メッセージがトピックに正しく投稿されていることは明らかですが、Java アプリケーションが代替メッセージしか受信しない原因は何ですか?
そして最後に、なぜコンシューマ コンソールは上記の動作しか示さないのですか?
spring - Spring Kafka、組み込み Kafka でのテスト
Servicetest と組み込みの Kafka で奇妙な動作が見られます。
テストはスポック テストです。JUnit ルール KafkaEmbedded を使用し、次のように brokersAsString を伝播します。
KafkaEmbedded のコードを調べると、インスタンスを構築すると、KafkaEmbedded(int count)
トピックごとに 2 つのパーティションを持つ 1 つの Kafka サーバーが作成されます。
テストでのパーティションの割り当てとサーバーとクライアントの同期の問題に取り組むために、spring-kafka の ContainerTestUtils クラスに見られる戦略に従います。
ログを観察すると、次のパターンに気付きます。
[..] 表記の省略行に注意してください
3000 ミリ秒に設定metadata.max.age.ms
した結果、メタデータ情報を頻繁に更新しようとします。
ここで困惑しているのは、2 つのパーティションが接続されるのを待つと、タイムアウトになることです。1 つのパーティションが接続されるのを待つ場合にのみ、しばらくするとすべてが正常に実行されます。
埋め込まれた Kafka にはトピックごとに 2 つのパーティションがあるというコードを間違って理解していましたか? リスナーに 1 つしか割り当てられないのは正常ですか?
apache-kafka - Spring Cloud Stream と Kafka 統合のエラー処理
Spring Cloud Stream と Kafka を統合して Spring Boot アプリケーションを作成しようとしています。Kafka で 1 つのパーティションを持つサンプル トピックを作成し、ここに示す指示に基づいて作成された Spring Boot アプリケーションからトピックに公開しました。
http://docs.spring.io/spring-cloud-stream/docs/1.0.2.RELEASE/reference/htmlsingle/index.html
と
https://blog.codecentric.de/en/2016/04/event-driven-microservices-spring-cloud-stream/
Spring Boot アプリ -
Kafka プロデューサー クラス
例外を処理したい場合を除いて、すべてがうまく機能しています。
Kafka トピックがダウンした場合、アプリのログ ファイルに ConnectionRefused 例外がスローされていることがわかりますが、組み込みの再試行ロジックは停止することなく継続的に再試行しているようです!
私が処理してさらに例外処理を行うためにスローされる例外はまったくありません。上記の Spring Cloud Stream ドキュメントで Kafka の Producer オプションと Binder オプションを読みましたが、この例外をスローしてキャプチャするためのカスタマイズ オプションが見つかりません。
私はSpring Boot / Spring Cloud Stream / Spring Integration(クラウドストリームプロジェクトの基盤となる実装のようです)を初めて使用しています。
この例外をSpring Cloud Streamアプリにカスケードするために知っていることは他にありますか?
java - SpringBoot/spring-kafka アプリケーションの Autowired KafkaTemplate が null ポインターをスローする
Spring Boot アプリケーション内でspring- kafka KafkaTemplateを使用して、メッセージを Kafka トピックに書き込もうとしています。
KafkaConfig クラスを作成しました。
...そして、私がKafkaに書いているクラスのKafkaTemplateを自動配線しました:
何らかの理由で、自動配線が機能していないようです。デバッグで実行すると、KafkaTemplate が null であることに気付きました。
このオブジェクトはヌルであってはなりません。KafkaTemplate オブジェクトである必要があります。これにより、null ポインター例外がスローされます。
kafka-spring は、過去にほぼ同じコードを使用してうまく機能していたため、今回はオートワイヤリングが機能しない理由が少しわかりません。私が間違っていることがわかりますか?
apache-kafka - カフカの消費者が初めてレコードを取得したときに、多くのレコードから単一のレコードのみを取得しますか?
spring-kafka および spring-kafka-test バージョン 1.0.2.RELEASE を使用しています。
私のテストの 1 つで、私のアプリケーションは、KafkaTemplate とほとんどデフォルトの構成設定を使用して、EmbeddedKafka インスタンスの単一の TopicPartion に 100 レコードを連続して送信します。
KafkaTestUtils.getRecords(consumer) メソッドを使用して、Kafka インスタンスからレコードを取得し、それらがすべて送信されたことを確認します。
初めて getRecords を呼び出したとき、1 つのレコードしか受け取りません。もう一度呼び出すと、残りの 99 が得られます。
コンシューマーの位置を TopicPartition の先頭に明示的に設定してから getRecords を呼び出すと、すべて 100 になります。
getRecords が最初に 1 つのレコードしか取得しないのはなぜですか? コンシューマーで seekToBeginning を明示的に呼び出して、一度に 100 個すべてを取得するより良い方法はありますか?
spring-integration - Spring XD で Kafka シンク モジュールから確認応答を取得する際に、Kafka ソース モジュールのオフセットを手動でコミットする方法は?
XD ストリームでは、メッセージはソース モジュールを介して Kafka トピックから消費され、シンク Kafka モジュールに送信されます。カスタム ソースおよびシンク Kafka モジュールを開発する理由は、正常に送信されたメッセージで、ダウンストリームのシンク モジュールから確認を取得した場合にのみ、ソース モジュールからのオフセットを更新したいからです。
Kafka 0.10.0.0 環境のトピックで Spring Integration Kafka 2.0.1.RELEASE および Spring Kafka 1.0.3.RELEASE を使用しています。私は次のことを試しました:
ソース モジュールの構成:
ソース モジュール: InboundKafkaMessageDrivenAdapter
シンク モジュール: 構成
シンク モジュール: SinkActivator
ソースはメッセージを受信してシンクに送信することに成功していますが、シンクで確認応答を取得しようとすると:
承認 acknowledgment = msg.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
次の例外がスローされます。
原因: java.lang.IllegalArgumentException: ヘッダー 'kafka_acknowledgment' に指定された型が正しくありません。[interface org.springframework.kafka.support.Acknowledgment] が期待されますが、実際の型は [class org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ConsumerAcknowledgment] です
spring-integration-kafka-2.0.1.RELEASE のソース コードでは、AckMode=MANUAL の場合にクラス KafkaMessageListenerContainer がメッセージに kafka_acknowledgment ヘッダーが追加されますが、型は ConsumerAcknowledgement の内部静的クラスです。
では、ソースから送信されたメッセージでシンク モジュールから確認応答を取得するにはどうすればよいでしょうか。
spring-integration - Spring Kafka すべてのトピックをリッスンし、パーティション オフセットを調整する
spring- kafkaのドキュメントに基づいて、アノテーション ベースの @KafkaListener を使用してコンシューマーを構成しています。
私が見ているのは -
開始時にオフセットをゼロに指定しない限り、Kafka コンシューマーは既存のメッセージではなく将来のメッセージを取得します。(私が望むものへのオフセットを指定していないので、これは予想される結果であることを理解しています)
ドキュメントには、トピックとパーティションの組み合わせを指定し、それに加えてゼロのオフセットを指定するオプションがありますが、これを行う場合は、消費者に聞いてもらいたいトピックを明示的に指定する必要があります。
上記のアプローチ 2 を使用すると、これが私の消費者の現在の外観です。
「topicPattern」ワイルドカードまたは「トピック」リストをアノテーション構成の一部として指定するオプションがあることは理解していますが、ゼロから開始するオフセット値を指定できる場所がわかりません。トピック/トピックパターンがリストされています。両方を組み合わせる方法はありますか?お知らせ下さい。