問題タブ [kafka-consumer-api]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
0 に答える
1373 参照

java - OffsetCommitRequestを使用してkafkaトピックでオフセットをコミットする方法は?

https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafkaリンクを 調べたところ、特定のパーティションのオフセット処理のみを扱う新しいトピックが Kafka によって提供されていることがわかりました。トピックで。

新しいトピックを作成し、auto.commit.enable プロパティのみを false として無効にしました。Kafka Tool 1.0 Beta2 を使用して、トピックのオフセットとメッセージを確認できます。現在、オフセットはまだあり、プログラムでコミットする必要があります。

  1. 上記の OffsetCommitRequest Api の例がオフセットをコミットするかどうかを知りたかったのは、同じ例を使用してコミットできないためです。
  2. もう 1 つ、SimpleConsumer Example を使用して Kafka トピックのオフセットをコミットする必要があります。Zookeeper でコミットしませんか?
  3. 最後に、ConsumerConnecter の目的は何ですか?

メッセージを一度だけ消費してから、オフセットも追跡したいと考えています。

私は Kafka 0.8.2.1 を使用しており、Apache Camel と統合しています。camel-kafka も使用します。

0 投票する
1 に答える
6495 参照

node.js - kafka-node コンシューマーが offsetOutOfRange エラーを受け取る

私は kafka-node (kafka のノード クライアント) を使用しており、コンシューマーを使用してトピックに関するメッセージを取得しています。残念ながら、「offsetOutOfRange」状態を受け取ります (offsetOutOfRange コールバックが呼び出されます)。私のアプリケーションは、消費者が生産者から大幅に遅れて、最初のオフセットと最新のオフセットの間に多少大きなギャップが残るまで、正常に動作していました。この時点で、私は (おそらく間違って) コンシューマがメッセージを受信し続けることができる (できればプロデューサーに追いつくことができる) と想定しました。

私のカフカ コンシューマ クライアント コードは次のとおりです。

私は何か間違ったことをしていますか、それとも何かが欠けていますか?

そうでない場合は、いくつか質問があります。

(a) この状態を適切に処理するようにクライアントを記述する、認められた「最善の」方法はありますか?

(b) なぜこの条件が提起されるのでしょうか? (クライアントは中断したところからメッセージを読み続け、最終的に (理想的には) 追いつくことができたはずだと思います...)

(c) この条件を処理するコード/ロジックを記述し、コンシューマー オフセットを明示的に再配置して読み取る必要がありますか? (これは少し面倒なようです)...

どんな助けでも大歓迎です。

0 投票する
1 に答える
3284 参照

java - kafkaの__consumer_offsetsトピックでオフセットにアクセスしてコミットする方法は?

カフカのドキュメントによると

オフセット マネージャーが OffsetCommitRequest を受け取ると、__consumer_offsets という名前の特別な圧縮された Kafka トピックに要求を追加します。

プログラムで直接アクセスすることは可能ですか? Kafka Tool 1.0 Beta 2 を使用すると、__consumer_offets トピックとその中の多くのパーティションが表示され、データを持つパーティションは 1 つだけです。パーティションがどのように作成されるのか、なぜ 1 つのパーティションにしかデータがないのか理解できません。

また、このトピックの特定のパーティションでコミットするにはどうすればよいですか?

0 投票する
1 に答える
458 参照

c++ - パーティション サイズに関する rdkafka コンシューマ クエリ

対象のパーティションでコミットする一連のプロデューサーにアクセスできず、一連の C++ コンシューマーを制御できるとします。複雑なプログラムでベンチマークを実行しているので、消費者がフェッチしているオフセットとパーティションに保存されている合計オフセットとの間の広がりを知りたいです。

例えば、>> reading message #1234 of 5678 total in partition 0 of topic foo

RdKafka::Consumer->outq_len()との目的を誤解していました。RdKafka::Topic->OFFSET_ENDなぜなら、それらは常にとに等しいように見えるからです。0-1

5678私の例の値を取得するにはどうすればよいですか?

0 投票する
0 に答える
1649 参照

spring-integration - kafka でメッセージを使用してバッチ プロセスの調整を実行するために必要な推奨事項

マイクロサービス アーキテクチャを使用してバッチ プロセスを開発しています。バッチ プロセス全体で実行する必要がある一連のステップは、さまざまなマイクロ サービスによって実行されます。Kafka はメッセージ バスとして使用され、すべてのマイクロサービスは kafka を使用して対話します。

バッチ プロセスでは、最初のマイクロ サービスがデータベースからデータをプルし、プルされたレコードごとにメッセージを作成し、それが 2 番目のマイクロ サービスで処理され、再びメッセージをバスにパブリッシュします。

最終的には、最初のマイクロ サービスによって公開されたメッセージと、最後のマイクロ サービスによって公開されたメッセージの調整を実行する必要があります。

現在、Spring Integration Kafka を使用してこれら 2 つのトピックからのメッセージを読み取る別のマイクロサービスを使用して調整を実行しようとしていますが、このアプローチは次の理由で複雑に思われます: 1) 開始トピックと終了トピックは複数使用されます。バッチ処理。そのため、各プロセスの調整を実行しながら、同じメッセージを何度も読むことになります。2) これら 2 つのトピックから読み取ったメッセージを集約して調整を実行する方法がわかりませんか? 3) 調整をやり直す必要がある場合の課題は?

Kafka は、調整に使用できるメッセージ/データを格納するためのストアとして使用することを意図していますか、または調整の実行に使用できるメッセージをプッシュする必要がある他のストアを探す必要がありますか? その場合、このシナリオではどのようなストアを使用できますか?

ポインタは役に立ちますか?

0 投票する
3 に答える
12811 参照

java - Kafka コンシューマ - コンシューマ プロセスとスレッドとトピック パーティションとの関係

私は最近 Kafka を使用していますが、消費者グループの消費者に関して少し混乱しています。混乱の中心は、コンシューマーをプロセスとして実装するかスレッドとして実装するかです。この質問では、高レベルのコンシューマーを使用していると仮定します。

私が実験したシナリオを考えてみましょう。私のトピックには 2 つのパーティションがあります (簡単にするために、レプリケーション ファクターが 1 であると仮定します)。groupでconsumer ( ConsumerConnector) プロセスを作成し、次にサイズ 2 のトピック カウント マップを作成し、そのプロセスの下で2 つのコンシューマー スレッドを生成しました。パーティションを消費しているようで、パーティションを消費しています。この動作は常に決定論的ですか? 以下はコードスニペットです。Classは、私のコンシューマ スレッド クラスです。consumer1group1consumer1_thread1consumer1_thread2consumer1_thread10consumer1_thread21TestConsumer

consumer1ここで、2 つのコンシューマー プロセスを開始し、consumer2両方が同じグループgroup1を持ち、それぞれがシングル スレッド プロセスである別のシナリオ (私は実験していませんが、興味があります) を考えてみましょう。今私の質問は次のとおりです。

  1. この場合、2 つの独立したコンシューマー プロセス (同じグループの下にあるにもかかわらず) は、どのようにパーティションに関連付けられますか? 上記のシングル プロセス マルチスレッド シナリオとの違いは何ですか?

  2. 一般に、コンシューマー スレッドまたはプロセスは、トピック内のパーティションにどのようにマップまたは関連していますか?

  3. Kafka のドキュメントには、コンシューマー グループの下の各コンシューマーが 1 つのパーティションを消費すると書かれています。ただし、それは消費者スレッド (上記のコード例のように) または独立した消費者プロセスを指しますか?

  4. 消費者をプロセスとスレッドとして実装することに関して、ここで見逃している微妙なことはありますか? 前もって感謝します。

0 投票する
1 に答える
6505 参照

python - Kafka トピックのメッセージを更新

Python Kafka トピックを使用しています。

Kafka のキュー内のメッセージを更新して、キューの先頭に再度追加できるプロビジョン プロデューサーはありますか?

Kafka の仕様によると、実現可能ではないようです。

0 投票する
2 に答える
358 参照

scala - Kafka コンシューマーで複数のデコーダー (またはトピックごとに 1 つ) を指定する方法はありますか? 他の誰かがこれの必要性を感じましたか?

私は Scala ( ref )で Kafka の作業を介して Spark Streaming を行っています。

バッチ間隔ごとに、同じDStreamおよび基礎となるさまざまなタイプのメッセージ (異なるデコーダーを必要とする) を受信したいと考えています。RDD複数のトピックをリッスンし、各トピックが 1 つのメッセージ タイプに対応するため、独自のDecoder. 現在、トピックごとに提供する方法はないようですkafka.serializer.Decoder<?>(ありますか?)。トピックごとに異なるタイプのメッセージを送信する可能性がかなり高いようです (protobufシリアライズされたバイト?)。他の誰かがこの問題に遭遇しましたか?

ありがとう。

C.

ここのどこかtopicへのマッピングが役立つようです。valueDecoder

0 投票する
1 に答える
947 参照

bigdata - レプリカの 1 つがレプリケーション ファクターに追いつくためにダウンしている場合、kafka は新しいフォロワーを作成しますか

以下は、トピックxx_json_topicのパーティション情報です。これは、3 つのノードを持つ Kafka クラスターです。

すべてのノードが稼働中:

この時点で..ノード「node-1」を停止すると..以下のようになります。

私の質問は、ノード 1 がダウンしていて、レプリケーション ファクターを維持する必要があることを kafka が認識している場合、ノード 3 をパーティション 1 のレプリカにし、ノード 2 をパーティション 0 のレプリカにしないでください。 -3 と node-2 は Isr の一部ですか?

または、Kafka はそれを約束していないと思います...レプリケーション係数が 2 の場合..データが常に少なくとも 2 つのノードで利用できるという意味ではありません (---Cassandra の一貫性レベルのように) 。

0 投票する
1 に答える
1188 参照

java - Kafka で最も高速に書き込まれると見なされるデータ形式は?

Kafka には、文字列形式、バイト配列など、データを書き込むためのさまざまなオプションがあります。Kafka での書き込み中に最速と見なされるデータ形式はどれですか。

さらに、kafka は、データ全体を一度圧縮してから書き込むためのユーティリティを提供します。

また、解凍する同じメッセージを消費する際にも考慮する必要があるため、データの読み取りコストが増加します。