問題タブ [kafka-producer-api]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
1 に答える
4420 参照

apache-kafka - 小さなメッセージでの Kafka Producer 0.9 のパフォーマンスの問題

小さなメッセージを送信するとき、Java Kafka Producer 0.9 クライアントで非常に低いパフォーマンスが観察されています。メッセージは大きな要求バッチに蓄積されないため、小さなレコードがそれぞれ個別に送信されます。

クライアント構成の何が問題になっていますか? それとも、これは別の問題ですか?


Kafka クライアント 0.9.0.0 を使用します。Kafka の未リリースの 9.0.1 または 9.1 の修正済みリストまたは未解決リストに関連する投稿は見られなかったため、クライアント構成とサーバー インスタンスに焦点を当てています。

linger.ms によって、クライアントがレコードをバッチに蓄積する必要があることは理解しています。

linger.ms を 10 に設定しました (そして 100 と 1000 も試しました) が、これらはバッチ蓄積レコードにはなりませんでした。レコード サイズが約 100 バイトで、リクエスト バッファ サイズが 16K の場合、1 回のリクエストで約 160 個のメッセージが送信されると予想されます。

クライアントでのトレースは、新しい Bluemix Messaging Hub (Kafka Server 0.9) サービス インスタンスを割り当てたにもかかわらず、パーティションがいっぱいである可能性があることを示しているようです。テスト クライアントは、他の I/O なしで複数のメッセージをループで送信しています。


ログには、「トピック mytopic パーティション 0 がいっぱいになっているか、新しいバッチを取得しているため、送信者を目覚めさせる」という疑わしい行を含む繰り返しシーケンスが表示されます。

テスト ケースでは、新しく割り当てられたパーティションは本質的に空である必要があります。


次のプロパティ ファイルが用意されています。


Kafka クライアントは、展開/マージされた構成リストを表示します (および linger.ms 設定を表示します)。


100 レコードを送信した後の Kafka メトリクス:

ありがとう

0 投票する
1 に答える
1507 参照

apache-kafka - Kafka コンシューマは、範囲外のスロー インデックスをデシリアライズできません

プロデューサーコードは以下

消費者コードは以下です

メッセージをオブジェクトに変換しようとすると失敗します。投稿の 1 つからカスタム シリアライザー コードを取得しました。以下のようになります。誰でも実装の問題点を指摘できますか? カスタムシリアライザーから FromBytes を使用してみましたが、役に立ちませんでした。シリアライザーが null オブジェクトを返しています

カスタムシリアライザ

PltResultPage は以下です。

0 投票する
1 に答える
812 参照

apache-kafka - Kafka で複数のコンシューマーを作成して、プロデューサーから同じメッセージを読み取り、それらを受信した後に各コンシューマーで異なるタスクを実行する方法

プロデューサーによって送信された 1 つ以上のパーティションからすべてのメッセージを読み取ることができる 3 つのコンシューマーを作成する必要があるユース ケースがあります。同じメッセージを受信した後、3 つのコンシューマーが 3 つの異なるタスクを実行する必要があります。

これを行う良い方法の 1 つは、異なる group.id を使用してコンシューマー グループを作成することです。

props.put("group.id", UUID.randomUUID().toString())

このアイデアについては、次のリンクを参照しました。

パーティションの Kafka 複数のコンシューマ

これを達成するために ConsumerGroupExample コードをどのように調整するかを知るのに行き詰まっていますか? 複数のコンシューマーを作成するにはどうすればよいですか? メッセージを受信した後、それらを個別に管理するにはどうすればよいですか? ConsumerGroupExample の複数のオブジェクトを作成する必要がありますか?

0 投票する
2 に答える
992 参照

java - 接続タイムアウトの問題を処理するためのアウトバウンド アダプターを使用したサーキット ブレーカーの構成

上記の構成では、次のことを試みています。

1.失敗したメッセージが error-channel="failedChannel2" に伝播されることを期待していますが、これは発生していません。変換された出力がコンソールに表示されなかったためです。

2.CircuitBreaker は ServiceActivator (上記のアプリケーション関連の例外) に対して機能していますが、アウトバウンド アダプターの失敗した場合に CB を構成するにはどうすればよいですか。例:SIチャネルから外部(kafka)サーバーにメッセージを送信する前に、接続がタイムアウトしたか、サーバーが突然ダウンした場合/ネットワーク接続の問題/環境の問題。

Circuit Breaker Advice に関する SI doc に従って、以下を参照してください。

「通常、このアドバイスは外部サービスに使用される可能性があり、失敗するまでに時間がかかる場合があります (ネットワーク接続の試行のタイムアウトなど)」。

これを達成する方法を提案してください。ありがとうございます。

更新された構成:

ログ:

01-05@23:44:18,598 DEBUG org.springframework.integration.config.ServiceActivatorFactoryBean$1 - org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6 受信メッセージ: GenericMessage [payload=hello, headers={timestamp=1452017658598, id =e0591162-3b93-9bb6-0699-89b15b20e904}] DEBUG: - com.XXX.ProducerMessageHandler#0 受信メッセージ: GenericMessage [payload=hello, headers={timestamp=1452017658598, id=e0591162-3b93-9bb6-0699-89b15b20e90} ] 例外を取得しました: org.springframework.messaging.MessageHandlingException: メッセージ ハンドラでエラーが発生しました [com.XXX.ProducerMessageHandler#0]; ネストされた例外は java.lang.RuntimeException: test foo 01-05@23:44:18,606 DEBUG org.springframework.integration.channel.PublishSubscribeChannel - チャネル 'toKafka' で preSend、メッセージ: GenericMessage [payload=hello, headers={timestamp=1452017658605, id=61597941-b2f8-314d-141d-8f2c058dda4d}] 01-05@23:44:18,606 DEBUG org.springframework.integration.config.ServiceActivatorFactoryBean$1 - org.springframework.integration.config.ServiceActivatorFactoryBean $1@6a0ef4b6 受信メッセージ: GenericMessage [payload=hello, headers={timestamp=1452017658605, id=61597941-b2f8-314d-141d-8f2c058dda4d}] DEBUG: - com.XXX.ProducerMessageHandler#0 受信メッセージ: GenericMessage [payload=hello , headers={timestamp=1452017658605, id=61597941-b2f8-314d-141d-8f2c058dda4d}] 例外が発生しました: org.springframework.messaging.MessageHandlingException: メッセージハンドラでエラーが発生しました [com.XXX.ProducerMessageHandler#0]; ネストされた例外は java.lang.RuntimeException: test foo 01-05@23:44:18,606 DEBUG org.springframework.integration.channel です。PublishSubscribeChannel - チャネル「toKafka」で preSend、メッセージ: GenericMessage [payload=hello, headers={timestamp=1452017658606, id=119afbf1-6104-feb1-eb44-f646aa932277}] 01-05@23:44:18,606 DEBUG org.springframework .integration.config.ServiceActivatorFactoryBean$1 - org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6 受信メッセージ: GenericMessage [payload=hello, headers={timestamp=1452017658606, id=119afbf1-6104-feb1-eb44-f646aa932277}] 得ました例外: org.springframework.messaging.MessageHandlingException: メッセージハンドラでエラーが発生しました [org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6]; ネストされた例外は org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice$CircuitBreakerOpenException: Circuit Breaker is Open for org.springframework です。integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6 01-05@23:44:18,606 デバッグid=8dafe2e0-8efe-c827-e745-1387e6045e7d}] 01-05@23:44:18,606 DEBUG org.springframework.integration.config.ServiceActivatorFactoryBean$1 - org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6 受信メッセージ: GenericMessage [payload=hello, headers={timestamp=1452017658606, id=8dafe2e0-8efe-c827-e745-1387e6045e7d}] 例外が発生しました: org.springframework.messaging.MessageHandlingException: メッセージハンドラでエラーが発生しました [org.springframework.integration.config. ServiceActivatorFactoryBean$1@6a0ef4b6]; ネストされた例外は org.springframework.integration です。

0 投票する
0 に答える
559 参照

apache-kafka - kafka パーティション再割り当てツールを使用して kafka を実行した後に、「パーティションが存在しません」という警告/失敗が表示される

私はカフカ 0.8.1.1 を使用しています。3 ノードの kafka クラスターがあり、いくつかのトピックには約 5 つのパーティションがあります。クラスター内のノード数を 5 に増やし、いくつかのパーティションを既存のトピックから新しいブローカーに移動することを計画しました。

表示されるエラー メッセージ:

検索しましたが、関連する回答が見つかりませんでした。これを整理するためのガイダンス/ヘルプに感謝します。

0 投票する
1 に答える
416 参照

scala - 1 秒あたりのメッセージ数を調整できる Kafka プロデューサー

安定しているが調整可能な出力で Apache Kafka プロデューサーを作成する最良の方法は何ですか。

例:プロデューサーは、一定の 1000 メッセージ/秒をブローカーに送信する必要があります。ランタイム中、出力は 10 または 10000 メッセージ/秒に調整できる必要があります。

1 つの方法として、1 秒ごとに実行されるスケジューラを設定し、事前定義された量のメッセージをバッチで送信する方法があります。

追加:このプロデューサはパフォーマンス テスト フレームワークの一部である必要があるため、送信する必要があるメッセージの量は非常に多くなります。非常に高い負荷をどのように処理しますか? そのためにAkkaを使用することは有益でしょうか?

ターゲット言語は Scala ですが、どの言語のサンプル コードも大歓迎です。

0 投票する
1 に答える
1417 参照

java - バイナリ ファイル (.mp3 ファイルなど) を単一のメッセージとして Kafka に入れることは可能ですか? はいの場合、どのように?

私はApache Kafkaに比較的慣れていません。小さなプロジェクトの一環として、テキスト ログ ファイルを単一のメッセージとして Kafka に配置しようとしていました。いくつかのエンコード エラーが発生しました。Kafka の Java API (私が理解しているように) には、主に文字列エンコーディングの規定が含まれています。

回避策として、各行が 1 つのメッセージを表す Kafka でログ ファイルを 1 行ずつスローしましたが、これでは最初の問題ステートメント (1 つのファイルが 1 つのメッセージ) は解決しません。

0 投票する
1 に答える
995 参照

java - pojo を confluent.io の汎用レコードに変換して KafkaProducer 経由で送信する

私は Kafka と avro がまったく初めてで、confluent パッケージを使用しようとしています。JPA に使用する既存の POJO があり、各値を汎用レコードに手動で反映することなく、POJO のインスタンスを簡単に生成できるようにしたいと考えています。ドキュメントでこれがどのように行われるかが欠けているようです。

例では汎用レコードを使用し、次のように各値を 1 つずつ設定します。

クラスからスキーマを取得する例がいくつかあり、必要に応じてそのスキーマを変更するための注釈を見つけました。では、POJO のインスタンスを取得し、それをそのままシリアライザーに送信して、クラスからスキーマを照合し、値を汎用レコードにコピーする作業をライブラリに行わせるにはどうすればよいでしょうか? 私はこれについてすべて間違っていますか?私がやりたいことは次のようなものです:

ありがとう!