問題タブ [kafka-consumer-api]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
1 に答える
5317 参照

apache-spark - Spark Streaming (Spark 1.0.0) に Kafka (Kafka Broker 0.8.1) から最新のデータを読み込ませる方法

Spark ストリーミング アプリケーションは、Kafka からデータをフェッチし、それらを処理します。

アプリケーションに障害が発生した場合、大量のデータが Kafka に保存され、Spark Streaming アプリケーションの次の起動時に、一度に大量のデータが消費されてクラッシュします。私のアプリケーションは過去のデータを気にしないので、現在 (最新) のデータのみを使用してもまったく問題ありません。

「auto.reset.offest」オプションを見つけましたが、Spark では動作が少し異なります。設定されている場合、Zookeeper に保存されているオフセットを削除します。ただし、予期しない動作にもかかわらず、削除後に最新のものからデータを取得することになっています。

しかし、そうではないことがわかりました。データを消費する前に、すべてのオフセットがクリーンアップされるのを見ました。次に、デフォルトの動作により、期待どおりにデータを取得する必要があります。しかし、データが多すぎるためにクラッシュします。

オフセットをクリーンアップし、「Kafka-Console-Consumer」を使用して最新のデータを消費し、アプリケーションを実行すると、期待どおりに動作します。

そのため、「auto.reset.offset」が機能しないように見え、スパーク ストリーミングの kafka コンシューマーは、デフォルトで「最小」オフセットからデータを取得します。

最新の Spark ストリーミングから Kafka データを使用する方法について何か考えはありますか?

spark-1.0.0 と Kafka-2.10-0.8.1 を使用しています。

前もって感謝します。

0 投票する
1 に答える
384 参照

message-queue - Kafka コンシューマー作業キュー

SOA アプリケーションには次のシナリオがあります。ServiceA は、ServiceB のインスタンスによって非同期的に処理される必要があるいくつかのジョブを生成します。基本的に、これは、各ワーカーが ServiceB のインスタンスであるワーク キューの問題につながります。メッセージブローカーとして Kafka を使用しており、次の設定があります。

5 ブローカー B1、B2、B3、B4、および B5。10 個のパーティション (P1、P2、....P10) を持つトピック (A) があり、それぞれのレプリケーション ファクターは 3 です。パーティションの割り当てが次のようであると仮定します。レプリカとしての B(i+1) および B(i+2)。

ServiceB の 3 つのインスタンスが実行されています。このセットアップが与えられた場合、C1 が 3 つのパーティションから消費する消費モデルを実現するために、高レベル コンシューマー API をどのように使用すればよいでしょうか。3 つのパーティションからの C2。残りの 4 つのパーティションからの C3

0 投票する
2 に答える
11675 参照

message-queue - 消費者のリバランスは Kafka でどのように機能しますか?

新しいコンシューマー/ブローカーが追加またはダウンすると、Kafka はリバランス操作をトリガーします。Kafka Rebalancing はブロック操作ですか。リバランス操作の進行中に Kafka コンシューマーはブロックされますか?

0 投票する
1 に答える
3244 参照

apache-kafka - Simple Consumer を使用して Apache Kafka で未処理のメッセージを読み取る

リンクをたどるのに疲れた

https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

SimpleConsumer を使用してメッセージを消費しますが、使用中に次のような突然の動作が見つかりました。

コンシューマは、特定のパーティションからメッセージを消費しています。しかし、問題は、コンシューマーが実行されていて、プロデューサーを使用してメッセージをトピックにプッシュすると、そのパーティションからのメッセージが消費されることです。しかし、コンシューマーが現在実行されておらず、いくつかのメッセージをトピックにプッシュして再度コンシューマーを起動すると、プロデューサーによってプッシュされたメッセージは消費されませんが、今すぐプッシュされるメッセージを消費する準備ができています。未処理のメッセージのみを消費したいので、EarliestTime() の代わりに LatestTime() を使用しています。

例えば

ケース-1

コンシューマーは実行中です:

プロデューサーは M1、M2、M3 メッセージをトピック 1 のパーティション 1 にプッシュしました

結果: コンシューマーは 3 つのメッセージすべてを消費します。

ケース - 2

コンシューマが実行されていません

プロデューサーは、m4、m5、m6 メッセージをトピック 1 のパーティション 1 にプッシュします。

消費者は今呼び出されます

結果:消費者はメッセージm4、m5、m6を消費しませんが、オフセットを確認すると7に設定されています。これは、プロデューサーがメッセージの生成中にオフセットを7に進めたことを意味します。その結果、コンシューマーは現在オフセット7からメッセージを消費します

コンシューマーが再び起動したときに m4 からメッセージを読み取る必要がある場合に、理想的に助けてください。

0 投票する
3 に答える
38410 参照

java - Kafka - シャットダウン (kafka.server.KafkaServer)、Kafka-Server-Start の起動に関する問題

最初から始めます。私は openSuse 13.2 を持っています。jdk_1.7.0_51、scala-2.11.4、gradle-2.2.1 も持っています。kafka-0.8.2-bet-src のソースをダウンロードし、readme ファイルに書かれているとおりに ./gradlew コマンドを実行しました。テストを除くすべてのコマンドが成功しました (成功した場合は 93%、失敗した場合は 19%)。すべてがうまくいき、zookeeper を実行すると正しく起動しますが、kafka-server-start を実行すると次のようになります。

これは私のserver.propertiesファイルです:

そして私の /etc/hosts ファイル:

誰でも私を助けることができますか?

0 投票する
2 に答える
5515 参照

java - ベンチマーク Kafka - 平凡なパフォーマンス

EC2 サーバーで 1k サイズのメッセージをストリーミングして、Kafka 0.8.1.1 のベンチマークを行っています。

2 つの m3.xlarge サーバーに Zookeeper をインストールし、次の構成を使用しました。

次に、32Gb RAM と追加の 6 つの SSD ドライブを備えた i2.2xlarge マシンに単一の Kafka サーバーをインストールし、各ディスクは としてパーティション分割されまし/mnt/a , mnt/b, etc....た。サーバーには、ブローカーが 1 つ、ポート 9092 にトピックが 1 つ、レプリケーション ファクターが 1 の 8 つのパーティションがあります。

すべてのテストは別のインスタンスから実行され、インスタンス間のレイテンシは 1 ミリ秒未満です。パーティション キーが 0 から 7 までの乱数である場合、1 つのスレッド プロデューサーと 8 つのスレッド コンシューマーを使用してプロデューサー/コンシューマー Java クライアントを作成しました。カスタム エンコーダーを提供することで、Json を使用して各メッセージをシリアル化しました。

私の消費者プロデューサーのプロパティは次のとおりです。

100k メッセージを送信すると、1 秒あたり 10k メッセージの容量と約 1 ミリ秒の遅延が発生します。

これは毎秒 10 メガバイト、つまり 80Mb/s であることを意味します。これは悪くありませんが、同じゾーンに配置されたインスタンスからはより良いパフォーマンスが期待できます。

構成に何か不足していますか?

0 投票する
4 に答える
14392 参照

apache-kafka - すべてのメッセージが読み取られたら、Kafka コンソール コンシューマを終了します

これを行う方法が必要であることは知っていますが、これを理解することはできません。キューからすべてのメッセージを読み取ったら、kafka コンシューマーを停止する必要があります。

誰かがこれに関する情報を提供できますか?

0 投票する
2 に答える
13916 参照

apache-kafka - kafka 消費者セッションのタイムアウト

コンシューマーがメッセージを読み取り、スレッドがデータベースへのアクセスなど、メッセージが別のトピックに生成される前にさまざまなことを行うアプリケーションがあります。スレッドでメッセージを消費してから生成するまでの時間は、数分かかる場合があります。メッセージが新しいトピックに生成されると、コンシューマー キュー メッセージの作業が完了したことを示すためにコミットが行われます。このため、自動コミットは無効になっています。

私は高レベルのコンシューマーを使用していますが、私が気づいているのは、ズーキーパーとカフカのセッションがタイムアウトすることです。これは、コンシューマー キューで何かを行う前に時間がかかりすぎて、スレッドが戻ってコンシューマーからより多くを読み取るたびにカフカがリバランスすることになるためです。しばらくすると、コンシューマーが新しいメッセージを読み取るまでに長い時間がかかり始めます。

Zookeeper セッションのタイムアウトを非常に高く設定して問題にならないようにすることはできますが、それに応じてリバランス パラメータを調整する必要があり、kafka は他の副作用の中でもしばらくの間、新しいコンシューマを取得しません。

この問題を解決するためのオプションは何ですか? 両方を幸せに保つためにカフカと飼育係にハートビートする方法はありますか? 単純なコンシューマを使用した場合でも、これらと同じ問題がありますか?