問題タブ [spring-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
1 に答える
265 参照

apache-kafka - Spring-kafka 大量処理

spring-kafka 1.0.5 を使用して、同時実行数が 10 の 10 個のパーティションを持つ忙しいトピックから消費しています。

私の現在のコードは、HashMap に保持されているパーティション ID に基づいてメッセージをキューに追加します。

残念ながら、そのデザインは、単純な消費の 2 倍の処理時間を要します。

私の要件は、パーティションを個別に処理することですが、@KafkaListener に基づくパーティションへの参照を含むハッシュマップを回避するにはどうすればよいですか。

これについてもっと効率的な方法はありますか?理想的には、リスナー アノテーションの各スレッドが独自のリストを管理します。パーティションIDに基づいて上記のハッシュマップなどの相互参照を持たずにそれを行う方法はありますか?

0 投票する
2 に答える
1666 参照

java - 同じ JVM から kafka コンシューマーとプロデューサーを実行するとプロデューサーが遅くなる

私はkafka 0.8とspring-integration-kafka 1.2.0.RELEASEを使用しています

プライマリとセカンダリという名前の 2 つのトピックがあります。プライマリ トピックから消費する必要があり、いくつかの処理の後、次の一連の処理を後で実行するためにセカンダリ トピックを生成する必要があります。

プライマリ トピックからの消費は正常に機能しますが、セカンダリ トピックへの生成は数分後に失敗し始めます。問題は、設定した 500 ミリ秒後に kafka タイムアウトにリクエストを送信することから始まります。スレッドプールが枯渇して終了。

セカンダリ トピックのイベントを別の kafka クラスターに生成しようとしても、問題なく動作します。

それぞれ 200 のパーティションを持つ両方のトピックで実行されている 4 つのコンシューマーがあります。

私はカフカに少し慣れていません。知識が不足していることをお許しください。私が提供する必要がある不足している情報についてコメントしてください。

0 投票する
2 に答える
284 参照

apache-kafka - Kafka - Spring : kafka コンシューマーは、オフセットに基づいてトピックからメッセージを読み取ります

オフセットに基づいて Kafka トピックからメッセージを消費する方法はありますか。つまり、以前にトピックで公開したオフセット ID を持っているということです。ここで、渡したオフセット ID に基づいてトピックからメッセージを取得する必要があります。

0 投票する
1 に答える
2169 参照

spring - メッセージを「失う」ことなく @KafkaListener-method 内で確認する

現在、基本的に次の簡略化されたメカニズムでメッセージを確認しています。

基本的に、メッセージを一時的に処理できない場合 (IOExceptions の場合) はいつでも、後でもう一度受信する必要があります。

しかし、確認応答は同じパーティション内の以前のすべてのメッセージが正常に処理されたと想定しているため、これは機能しません。また、IOException の場合、失敗したメッセージはスキップされますが、同じパーティションのより高いインデックスを持つ別のメッセージによって確認応答される可能性があります。

これを修正する方法はいくつかありますが、これは、KafkaListener メソッド内で直接確認応答を呼び出すことを回避するための厄介な回避策を意味します。私たちのユースケースは非常に具体的なものですか、それともSpring Kafkaユーザーが想定する「デフォルト」の動作に似ていませんか?

この種の問題に対する春のカフカの解決策はありますか? または、これを「正しく」解決するアイデアはありますか?

0 投票する
1 に答える
181 参照

spring - Spring Integration - 宣言型アプローチを使用して SOAP Web サービスの確認応答を返す前に、トピックにメッセージを追加する方法

inbound-gateway を使用して SOAP Web サービスを使用しています。メッセージを kafka トピックに配置し、Spring 統合の宣言的な方法を使用してリクエスタに同期確認を返す必要があります。これは可能ですか?

0 投票する
1 に答える
2965 参照

spring-boot - Spring-Boot アプリで Spring-integration-kafka を使用して Kafka メッセージの手動コミットを無効にできない

私はカフカが初めてなので、私のグループはSpring-bootアプリケーションでSpring-Integration-kafka:2.0.0.RELEASEを使用しています。この例hereに基づいて、KafkaConsumer で kafka メッセージを消費することができました。

このスプリング ブート アプリケーションには、Application.java があります。

これは私の KafkaConsumerConfig.java です

そして最後に私の Listener.java

私の質問は次のとおりです。

1) ConsumerConfig 設定で、既に "ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG" をfalseに設定し、"ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG" をコメントアウトしましたが、なぜ自動的にコミットするのですか? (スプリング ブートを再起動するとメッセージが表示されなくなるため) app- コミットされるまで再送信を試みると思います)

2) 手動でコミットするには、Listener.listen 内に ack.acknowledge() を追加するだけでよいと思います (プレースホルダーを参照してください)。これが手動コミットであることを確認するために、これ以外に必要なものはありますか?

3) 私は最も単純で最もクリーンな kafkaconsumer を持ちたいと思っています。私が持っているものが最も単純な方法だと思うかどうか疑問に思います。もともと私はシングルスレッドのコンシューマーを探していましたが、そこには多くの例がないので、並行リスナーに固執します.

ご協力いただきありがとうございます。

0 投票する
1 に答える
3185 参照

docker - Kafka プロデューサーが「TimeoutException: Batch Expired」例外をスローする

twitter 用の Spring Cloud Stream アプリをテストしています。Kafka に関連する次の環境プロパティを使用して docker コンテナーを開始しました。

私のkafkaのproducerConfigの値は次のとおりです。

2017-01-12 14:47:09.985 INFO 1 --- [itterSource-1-1] oakafka.common.utils.AppInfoParser: Kafka バージョン: 0.9.0.1

しかし、プロデューサーは継続的に次の例外をスローします。

docker コンテナーからブローカー 192.168.127.188:9092 および 2181 に telnet できます。また、私の kafka サーバーは docker コンテナーではありません

「advertised.host.name」を追加するなどの解決策を見ましたが、機能しませんでした。または、env props を指定した正しい方法ですか。

何か助けはありますか?