問題タブ [kafka-consumer-api]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
1 に答える
918 参照

java - カフカスパウトがデータを送信していません

Kafka を Storm と統合しようとしています。Kafka Spout を使用して Kafka トピックからデータを取得し、それをさらに処理するために Storm Bolt に供給しています。トポロジーを正常に送信できますが、Spout はデータを出力していません。エラーもスローしません。私は Kafka と Storm に非常に慣れていないため、この問題の背後にある理由を理解できません。変更を提案してください。事前に感謝します!!

トポロジを送信した後の Storm UI のスクリーン ショット

私のトポロジ:

0 投票する
2 に答える
4393 参照

kafka-consumer-api - カフカの消費者イテレータはどのように機能しますか

Kafkaトピックを正常に読み取り、各メッセージを画面に出力するこのコードを見つけました。画面に印刷するだけでなく、文字列に対して他の操作を実行するように拡張したいと思います。このために、メッセージを反復する while ループで何が起こっているのかを理解したいと思います。it.hasNext() は何をしますか? 次のメッセージまたは新しいメッセージのリストを探しますか。このwhileループから抜け出すのはいつですか?

0 投票する
4 に答える
62602 参照

apache-kafka - トピックの Kafka コンシューマ フェッチ メタデータが失敗しました

サードパーティの Kafka および ZooKeeper サーバー用の Java クライアントを作成しようとしています。トピックを一覧表示して説明することはできますが、読み込もうとすると aClosedChannelExceptionが発生します。ここでは、コマンド ライン クライアントを使用してそれらを再現します。

代替コマンドは成功します:

(ips は編集され、255.255.255.255 に置き換えられました)

この例外をグーグルで検索すると、プロデューサー側の問題が表示されます。実際、ソースは、ClientUtils.fetchTopicMetadataこれが主にプロデューサーによって使用されていることを示唆しています。

私が懸念していることの 1 つは、これがネットワーク レイアウトの産物である可能性があることです。パケットは Haproxy によって破壊され、VPN 経由で送信されます。

ここで正確に何が働いているのですか?

0 投票する
2 に答える
10065 参照

apache-kafka - パーティションの Kafka 複数のコンシューマ

トピック/パーティションにメッセージを書き込むプロデューサーがあります。順序を維持するために、単一のパーティションを使用したいと思います。12 のコンシューマーがこの単一のパーティションからすべてのメッセージを読み取るようにします (コンシューマー グループはなく、すべてのメッセージはすべてのコンシューマーに送信される必要があります)。これは達成可能ですか?パーティションごとに 1 人の消費者しか読めないフォーラムをいくつか読みました。

0 投票する
1 に答える
1700 参照

apache-kafka - kafka:「soTimeout」、「bufferSize」、および「minBytes」はSimpleConsumerに対して何を意味しますか?

Kafka 0.8.2.1 SimpleConsumer を使用しています。SimpleConsumer と FetchRequestBuilder のいくつかの構成パラメーターの意味を誰かが明確にすることはできますか? KAfka のソース コードを読んでも、ドキュメントは見つかりませんでした。(この質問をkafkaユーザーグループに投稿しようとしましたが、うまくいきませんでした):

-- Q1: SimpleConsumer コンストラクターの署名に、Int ' soTimeout'パラメーターがあります。このタイムアウトの意味は何ですか? これは Kafka ブローカーに接続するためのタイムアウトですか? [または特定の??] Kafkaへのリクエスト(FetchRequestなど)からのレスポンスを取得する際のタイムアウト? 他の何か?

-- Q2: また、SimpleConsumer コンストラクターは Int の 'bufferSize' パラメーターを取ります。その意味は何ですか?これは、fetchRequest が発行されたときに SimpleConsumer が読み取るバイト数ですか? それとも、Kafka からの 1 回のフェッチごとに読み取られる最大バイト数ですか?さらに多くのデータが利用可能な場合、複数のフェッチが発生しますか?

-- FetchRequestBuilder を介して FetchRequest を構築する場合 (以下を参照)、' fetchSize 'も指定する必要があります。

FetchRequestBuilder のソース コードを見ると、(私は Scala のプロではありませんが) これらの呼び出しは以下のメソッド呼び出しに変換されると思います。FetchRequest に渡される最後のパラメーターは「minBytes」と呼ばれ、これがおそらく正確なフェッチサイズ?. 少なくとも「minBytes」のデータが利用可能でない限り、何もフェッチしないということですか?

だから、私の最後の質問は:

-- Q3: ' bufferSize ' と ' fetchSize/minBytes ' はどのように関連していますか? 彼らは正確に何を定義していますか?一方が他方よりも小さいか大きいかを確認する必要がありますか?

ありがとう、

マリーナ

0 投票する
1 に答える
12720 参照

apache-kafka - 複数のトピックから読み取る Kafka コンシューマ

私はカフカが初めてです。私は 2 つのトピックを作成し、2 人のプロデューサーからこれら 2 つのトピックについて公開しています。両方のトピックからのメッセージを消費する 1 つのコンシューマーがあります。優先度に応じて処理したいからです。

両方のトピックからストリームを取得していますが、ストリームの繰り返しを開始するとすぐConsumerItreatorにブロックされます。ドキュメントに書かれている通り、新しいメッセージを受け取るまでブロックされます。

単一の Kafka Consumer から 2 つのトピックと 2 つのストリームを読み取る方法を知っている人はいますか?

0 投票する
1 に答える
3577 参照

java - Eclipse でカスタム Kafka プロデューサーを使用して NoClassDefFoundError を修正するには?

プロジェクトに Kafka モジュールを含めようとしています。

次の jar を eclipse の外部 jar ライブラリとして追加し、build.xml を更新して jar への参照を含めました。

  • kafka-clients-0.8.2.0.jar
  • kafka_2.10-0.8.2.0.jar
  • scala-library-2.10.4.jar

サンプル Producer クラスを作成しました

このプロジェクトを開始する前に、Zookeeper と Kafka ブローカーが実行されていることを確認しました。ただし、NoClassDefFoundError()インスタンス化しようとしているときに表示されますnew KafkaProducer()

明らかな何かが欠けていますか?

0 投票する
2 に答える
7749 参照

apache-kafka - Kafka 0.8.2 のコンシューマー API を使用するには?

最新の Kafka ドキュメントhttp://kafka.apache.org/documentation.htmlを使い始めています。しかし、新しい Consumer API を使用しようとすると、いくつかの問題が発生します。私は次の手順で仕事をしました:

1. 新しい依存関係を追加する

2.構成を追加する

3. KafkaConsumer API を使用する

ただし、ブローカーからメッセージをポーリングしようとすると、null しか得られませんでした。

そして、ソースコードを確認した後、消費者の何が問題なのかがわかります。

さらに悪いことに、0.8.2 API に関するその他の有用な情報を見つけることができません。Kafka に関するすべての使用法が最新バージョンと互換性がないためです。誰でも私を助けることができますか?どうもありがとう。

0 投票する
4 に答える
79042 参照

apache-kafka - Apache Kafka のコンテキストで「リバランス」とはどういう意味ですか?

私は Kafka の新しいユーザーで、現在約 2 ~ 3 週間試しています。現時点では、Kafka の大部分がどのように機能するかをよく理解していると思いますが、API を自分の Kafka コンシューマに適合させようとした後 (これはあいまいですが、想定されている新しい KafkaConsumer のガイドラインに従っています) v 0.9 で利用可能で、これは「trunk」レポ atm にあります) 同じグループ ID を持つ複数のコンシューマーがある場合、トピックから消費する際に遅延の問題が発生しました。

このセットアップでは、私のコンソールは一貫して「リバランス トリガー」に関する問題をログに記録します。新しいコンシューマーをコンシューマー グループに追加するとリバランスが発生しますか? 同じ groupID 内のどのコンシューマー インスタンスがどのパーティションを取得するかを把握するためにトリガーされますか?

また、 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Designからこの一節に出くわしましたが、理解できないようです。その意味は非常に高く評価されます:

リバランスとは、(同じグループに属する) コンシューマー インスタンスのグループが、グループがサブスクライブしているトピックのパーティションの相互に排他的なセットを所有するように調整するプロセスです。コンシューマ グループのリバランス操作が正常に終了すると、サブスクライブされたすべてのトピックのすべてのパーティションが、グループ内の単一のコンシューマ インスタンスによって所有されます。リバランスの仕組みは以下の通りです。すべてのブローカーは、コンシューマー グループのサブセットのコーディネーターとして選出されます。グループのコーディネーター ブローカーは、サブスクライブされたトピックのコンシューマー グループ メンバーシップの変更またはパーティションの変更に対するリバランス操作の調整を担当します。また、リバランス操作中のグループのすべてのコンシューマーに、結果のパーティション所有権の構成を伝える役割もあります。

0 投票する
1 に答える
859 参照

kafka-consumer-api - Kafka ConsumerConnector のトピック数パラメーターとは

私はApache Kafkaが初めてで、与えられた例を試してみてください。

次のコード スニペットは、ConsumerConnector を初期化するために使用されます。トピック カウント パラメーターに混乱しています。そのトピックに対応する数のストリームをカフカが配布するようです。ただし、何度か試してみましたが、最初のストリームのみがメッセージを生成します。では、2 つの質問があります。1. トピックのカウント数を特定するにはどうすればよいですか? 2. 分割されたメッセージはどのようにストリームを越えますか?

前もって感謝します。