問題タブ [apache-kafka]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-kafka - Kafka High Level Consumer を特定のパーティションに割り当てることはできますか?
Kafka の Consumer は 1 つのパーティションからのみメッセージを受信するように構成できるという事実を考えると、これに対する構成アプローチは見たことがありません。
Kafka ブローカーに伝える方法:
私がこのIDを持つCONSUMER Aであり、このGROUP Lの下で、TOPIC Xにサブスクライブし、 PARTITION Aのストリームを取得する意思があることを確認します。
次に、別のCONSUMER BをGROUP Lの下で開始します。これはTOPIC Xにサブスクライブし、 PARTITION Bのストリームを取得しますか?
つまり、カフカが記述したシナリオのように、
特定のユーザーのウォール フィードをサブスクライブするコンシューマーを開始するにはどうすればよいですか? ユーザーがログインしたときにコンシューマーを開始し、そのパーティションをトピックからコンシューマーに送信してクライアントにフィードを送信できますか?
java - Kafka ログにオフセットがありません - Simple Consumer は続行できません
3 ノードの kafka クラスターをセットアップしています。私は嵐を使ってカフカからのメッセージを読んでいます。システムの各トピックには 7 つのパーティションがあります。
今、私は奇妙な問題に直面しています。3日前までは、すべて正常に機能していました。ただし、現在、ストーム トポロジが 2 つのパーティション (#1 と #4) から読み取ることができないようです。
問題を掘り下げてみたところ、私の kafka ログで、これらのパーティションの両方で、1 つのオフセットが欠落していることがわかりました。つまり、5964511 の後に、次のオフセットは 5964512 ではなく 5964513 です。
オフセットが見つからないため、Simple Consumer は次のオフセットに進むことができません。私は何か間違ったことをしていますか、それとも既知のバグですか?
そのような行動の理由は何でしょうか?
次のコードを使用して、有効なオフセットのウィンドウを読み取ります。
これにより、次の出力が得られます。
したがって、私が提供しているオフセットは十分にオフセット範囲内にあります。
java - カフカ プロデューサー ユニット テスト (Java)
カフカ (Java コード) で最初のステップを実行するカフカ プロデューサーの簡単なテストを作成したいと思います。このようなもので、動物園の飼育係をモックできます (この実装は見栄えがしますが、そこにあるいくつかのクラス、特に EmbeddedZookeeper に到達できません)。および TestUtils)。
何か案は?
hdfs - kafka から hdfs への書き込み (cloudera cdk を使用?)
Kafka にメッセージを送信するアプリケーション ブラウザーを備えたシステムを設計したいと考えています。コンシューマーは、イベントを avro 形式で HDFS に書き込む必要があります。
これはどのように見えるべきですか?
メッセージを確認するのに適切なタイミングはいつですか?
hdfsファイルにどのように正確に追加できますか? cdk について読んだところ、良い方向に進んでいるように見えます。私ができないのは、レコードをいつどのようにフラッシュするかです。
誰かが良い例を持っていますか?
long-polling - カフカの長いポーリング
低遅延のメッセージ キューを実装するために kafka を検討しており、消費者のロング ポーリングについて調べています。ただし、実際にロング ポーリングを使用する方法や、ロング ポーリングを有効にするために設定する必要があるオプションについての例はありません。kafka Java APIを使用してロングポーリングを有効にするにはどうすればよいですか?
python - geventでpython kafkaクライアントを使用する方法 - 実際に動作するライブラリはありますか?
Python 2.7でbrodを使用して、geventを使用してkafka 0.7.2に書き込もうとしています。
ここに私が得るエラーメッセージがあります。ブロッキングが原因だと思います。brod は tornado をサポートしていますが、私は gevent を使用しています。
gevent-kakfa を使用しようとしましたが、gevent-zookeeper に依存しています。
Zookeeper に接続しようとすると、次のメッセージが表示されます。
動作する gevent を使用してメッセージを書き込むことができる Python ライブラリはありませんか?