問題タブ [apache-kafka]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-kafka - Kafka High Level Consumer を特定のパーティションに割り当てることはできますか?
Kafka の Consumer は 1 つのパーティションからのみメッセージを受信するように構成できるという事実を考えると、これに対する構成アプローチは見たことがありません。
Kafka ブローカーに伝える方法:
私がこのIDを持つCONSUMER Aであり、このGROUP Lの下で、TOPIC Xにサブスクライブし、 PARTITION Aのストリームを取得する意思があることを確認します。
次に、別のCONSUMER BをGROUP Lの下で開始します。これはTOPIC Xにサブスクライブし、 PARTITION Bのストリームを取得しますか?
つまり、カフカが記述したシナリオのように、
特定のユーザーのウォール フィードをサブスクライブするコンシューマーを開始するにはどうすればよいですか? ユーザーがログインしたときにコンシューマーを開始し、そのパーティションをトピックからコンシューマーに送信してクライアントにフィードを送信できますか?
java - Kafka ログにオフセットがありません - Simple Consumer は続行できません
3 ノードの kafka クラスターをセットアップしています。私は嵐を使ってカフカからのメッセージを読んでいます。システムの各トピックには 7 つのパーティションがあります。
今、私は奇妙な問題に直面しています。3日前までは、すべて正常に機能していました。ただし、現在、ストーム トポロジが 2 つのパーティション (#1 と #4) から読み取ることができないようです。
問題を掘り下げてみたところ、私の kafka ログで、これらのパーティションの両方で、1 つのオフセットが欠落していることがわかりました。つまり、5964511 の後に、次のオフセットは 5964512 ではなく 5964513 です。
オフセットが見つからないため、Simple Consumer は次のオフセットに進むことができません。私は何か間違ったことをしていますか、それとも既知のバグですか?
そのような行動の理由は何でしょうか?
次のコードを使用して、有効なオフセットのウィンドウを読み取ります。
これにより、次の出力が得られます。
したがって、私が提供しているオフセットは十分にオフセット範囲内にあります。
java - カフカ プロデューサー ユニット テスト (Java)
カフカ (Java コード) で最初のステップを実行するカフカ プロデューサーの簡単なテストを作成したいと思います。このようなもので、動物園の飼育係をモックできます (この実装は見栄えがしますが、そこにあるいくつかのクラス、特に EmbeddedZookeeper に到達できません)。および TestUtils)。
何か案は?
hdfs - kafka から hdfs への書き込み (cloudera cdk を使用?)
Kafka にメッセージを送信するアプリケーション ブラウザーを備えたシステムを設計したいと考えています。コンシューマーは、イベントを avro 形式で HDFS に書き込む必要があります。
これはどのように見えるべきですか?
メッセージを確認するのに適切なタイミングはいつですか?
hdfsファイルにどのように正確に追加できますか? cdk について読んだところ、良い方向に進んでいるように見えます。私ができないのは、レコードをいつどのようにフラッシュするかです。
誰かが良い例を持っていますか?
long-polling - カフカの長いポーリング
低遅延のメッセージ キューを実装するために kafka を検討しており、消費者のロング ポーリングについて調べています。ただし、実際にロング ポーリングを使用する方法や、ロング ポーリングを有効にするために設定する必要があるオプションについての例はありません。kafka Java APIを使用してロングポーリングを有効にするにはどうすればよいですか?
python - geventでpython kafkaクライアントを使用する方法 - 実際に動作するライブラリはありますか?
Python 2.7でbrodを使用して、geventを使用してkafka 0.7.2に書き込もうとしています。
ここに私が得るエラーメッセージがあります。ブロッキングが原因だと思います。brod は tornado をサポートしていますが、私は gevent を使用しています。
gevent-kakfa を使用しようとしましたが、gevent-zookeeper に依存しています。
Zookeeper に接続しようとすると、次のメッセージが表示されます。
動作する gevent を使用してメッセージを書き込むことができる Python ライブラリはありませんか?
streaming - Storm と Kafka を統合する方法
私はStormで作業し、ローカル テキスト ファイルを入力ソースとして使用する基本的なプログラムを開発しました。しかし今は、外部システムから継続的に送られてくるデータのストリーミングに取り組まなければなりません。この目的には、Kafka が最適です。
問題は、Spout に Kafka からストリーミング データを取得させる方法です。または、Storm を Kafka と統合する方法。Kafkaからのデータを処理できるようにするにはどうすればよいですか?
apache-kafka - Kafka がメッセージにプレフィックスを追加する
kafka 7.2を使用して、プロデューサーを使用してメッセージを送信すると、メッセージを消費すると、メッセージの先頭に追加のセクションが追加されて到着することがわかります。
たとえば、単純な文字列「King Daniel」を kafka に送信する場合、バイト配列では次のようになります。
しかし、何らかの理由でそれを消費すると、次のようになります。
「………………|King Daniel」という文字列はどれ?
したがって、メッセージの先頭に 12 文字が追加されます。これは何かのヘッダーですか?元のメッセージを取得するにはどうすればよいですか?
これが私の消費者コードです:
をファイルに書き込んでいmsg.message().payload().array()
て、このファイルを開くと、最初に 12 文字が追加された元のコンテンツが表示されます。
元のメッセージを正確に取得するにはどうすればよいですか?
hbase - ストームを使用してカフカからメッセージを取得する
Zookeeper から最後のオフセット時間を取得するには? ストーム スパウトを使用してカフカからメッセージを読み取る場合。コンテキスト: Kafka はメッセージを継続的に取得し、コンシューマーはしばらくの間読み取り、その後何らかの理由でシャットダウンし、コンシューマーは最新のメッセージのみを読み取りますが、最後に読み取られたオフセットからは読み取りません。