問題タブ [kafka-consumer-api]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
4 に答える
79042 参照

apache-kafka - Apache Kafka のコンテキストで「リバランス」とはどういう意味ですか?

私は Kafka の新しいユーザーで、現在約 2 ~ 3 週間試しています。現時点では、Kafka の大部分がどのように機能するかをよく理解していると思いますが、API を自分の Kafka コンシューマに適合させようとした後 (これはあいまいですが、想定されている新しい KafkaConsumer のガイドラインに従っています) v 0.9 で利用可能で、これは「trunk」レポ atm にあります) 同じグループ ID を持つ複数のコンシューマーがある場合、トピックから消費する際に遅延の問題が発生しました。

このセットアップでは、私のコンソールは一貫して「リバランス トリガー」に関する問題をログに記録します。新しいコンシューマーをコンシューマー グループに追加するとリバランスが発生しますか? 同じ groupID 内のどのコンシューマー インスタンスがどのパーティションを取得するかを把握するためにトリガーされますか?

また、 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Designからこの一節に出くわしましたが、理解できないようです。その意味は非常に高く評価されます:

リバランスとは、(同じグループに属する) コンシューマー インスタンスのグループが、グループがサブスクライブしているトピックのパーティションの相互に排他的なセットを所有するように調整するプロセスです。コンシューマ グループのリバランス操作が正常に終了すると、サブスクライブされたすべてのトピックのすべてのパーティションが、グループ内の単一のコンシューマ インスタンスによって所有されます。リバランスの仕組みは以下の通りです。すべてのブローカーは、コンシューマー グループのサブセットのコーディネーターとして選出されます。グループのコーディネーター ブローカーは、サブスクライブされたトピックのコンシューマー グループ メンバーシップの変更またはパーティションの変更に対するリバランス操作の調整を担当します。また、リバランス操作中のグループのすべてのコンシューマーに、結果のパーティション所有権の構成を伝える役割もあります。

0 投票する
1 に答える
859 参照

kafka-consumer-api - Kafka ConsumerConnector のトピック数パラメーターとは

私はApache Kafkaが初めてで、与えられた例を試してみてください。

次のコード スニペットは、ConsumerConnector を初期化するために使用されます。トピック カウント パラメーターに混乱しています。そのトピックに対応する数のストリームをカフカが配布するようです。ただし、何度か試してみましたが、最初のストリームのみがメッセージを生成します。では、2 つの質問があります。1. トピックのカウント数を特定するにはどうすればよいですか? 2. 分割されたメッセージはどのようにストリームを越えますか?

前もって感謝します。

0 投票する
2 に答える
368 参照

java - Kafka メッセージ - Java のプロデューサーおよびコンシューマー クライアント

事前にご協力いただきありがとうございます。

Kafka ブローカーで次の形式のメッセージをプッシュおよびプルしたいと考えています。

Java のプロデューサー クライアントはこの形式でプッシュできる必要があり、コンシューマー クライアントはこのメッセージを読み取って解析できる必要があります。これどうやってするの?Kafka Java API でこれを行う特定の方法はありますか?

簡単なテキスト メッセージをプッシュおよびプルするプロデューサーおよびコンシューマー Java クライアントを既に作成しました。

本当にありがとうございました。

再度、感謝します。

0 投票する
0 に答える
1854 参照

java - Kafka メッセージを読み取る HBase に挿入します。

私はKafka + Hbaseを初めて使用します。あなたの助けに感謝。

Kafka メッセージ ストリームを取得し、Kafka から読み取る Hbase にデータを挿入したいと考えています。

トピックからメッセージを読み取る Java コンシューマー (単一トピック、単一パーティション、および 1 つのスレッド) を作成しました。すべて良好。スレッドで、Hbase に接続してメッセージをテーブルに挿入しようとしていますが、機能していません。hbase に接続しようとする (HTable のインスタンスを作成する) 瞬間に、ストリームを読み取る kafka スレッドが強制終了されます。これを克服してHbaseにデータを挿入するにはどうすればよいですか? この問題に関するあなたの考えと助けに大いに感謝します。

Kafka 高レベル コンシューマー コード

}

0 投票する
1 に答える
7996 参照

scala - Scala:クラスを別のクラスに渡すときに、クラスは型パラメータを取りますか?

スレッドで使用する実行可能なクラスを作成しようとしています。このクラスの目的は、KafkaStreamを取得し、ストリームを介して受信するすべてのメッセージに関数を適用することですが、クラスにストリームを渡そうとすると、次のコンパイラ エラーが発生します: "scala:45: class KafkaStream takes typeパラメーター"。

これがクラスです。この問題は、Scala のクラスにパラメーターを渡す方法に関係しているに違いありませんが、私はまだ Scala に慣れていません。

さらに、関数を実行可能にすることは可能ですか? クラスよりも関数に渡す方が簡単だと思います。

0 投票する
1 に答える
441 参照

java - Java で Kafka サーバー (Zookeeper ではない) からすべての ConsumerGroup のリストを取得する方法

すべての消費者グループのリストと、それらが消費しているトピック/パーティションを提供するJava ApiがKafkaにありますか?また、kafkaサーバー側からzookeeperリストをフェッチできる方法はありますか? 注: Zookeeper から上記の情報を取得できました。しかし、Kafka Server から取得したいのです。これについて私を助けてくれませんか!!

前もって感謝します!!

0 投票する
1 に答える
159 参照

apache-spark - SimpleConsumerを介してKafkaでフェッチリクエストなしでメッセージのサイズ(メタデータ)を取得する方法は?

私は SimpleConsumer を使用しており、spark を使用してメッセージ サイズ (バイト) をプルしようとしています。

メタデータ リクエストを使用して最も古いオフセットと最新のオフセットを取得できますが、kafka (0.8.0) でバイト数を取得する方法がわかりません。

十分なデータをチェックしたいので、アクションを実行するためにsparkジョブ(ストリーミングではなく)のみを実行するため、フェッチリクエストを使用したくありません。

0 投票する
1 に答える
230 参照

apache-kafka - DSE Spark ストリーミング + Kafka NoSuchMethodError

Kafka トピックから文字列の行を読み取るだけの Spark Streaming + Kafka ジョブを送信しようとしています。ただし、次の例外が発生しています

15/07/24 22:39:45 エラー TaskSetManager: ステージ 2.0 のタスク 0 が 4 回失敗しました。スレッド「Thread-49」でジョブ例外を中止します。 112.93): java.lang.NoSuchMethodException: kafka.serializer.StringDecoder.(kafka.utils.VerifiableProperties) java.lang.Class.getConstructor0(Class.java:2892) java.lang.Class.getConstructor(Class.java:1723) org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:106) org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) org.apache.spark.streaming.receiver. ReceiverSupervisor.start(ReceiverSupervisor.scala:106) org.apache.spark.

DSE で使用される spark jar ファイルを確認したところ、そのコンストラクターを持つ kafka_2.10-0.8.0.jar を使用していることがわかります。エラーの原因がわかりません。ここに私の消費者コードがあります

更新この例外は、ジョブを送信したときにのみ発生するようです。コードを貼り付けてスパークシェルを使用してジョブを実行すると、正常に動作します