問題タブ [apache-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
1 に答える
3144 参照

apache-kafka - Kafka High Level Consumer を特定のパーティションに割り当てることはできますか?

Kafka の Consumer は 1 つのパーティションからのみメッセージを受信するように構成できるという事実を考えると、これに対する構成アプローチは見たことがありません。

Kafka ブローカーに伝える方法:

私がこのIDを持つCONSUMER Aであり、このGROUP Lの下で、TOPIC Xにサブスクライブし、 PARTITION Aのストリームを取得する意思があることを確認します。

次に、別のCONSUMER BをGROUP Lの下で開始します。これはTOPIC Xにサブスクライブし、 PARTITION Bのストリームを取得しますか?

つまり、カフカが記述したシナリオのように、

特定のユーザーのウォール フィードをサブスクライブするコンシューマーを開始するにはどうすればよいですか? ユーザーがログインしたときにコンシューマーを開始し、そのパーティションをトピックからコンシューマーに送信してクライアントにフィードを送信できますか?

0 投票する
1 に答える
1800 参照

java - Kafka ログにオフセットがありません - Simple Consumer は続行できません

3 ノードの kafka クラスターをセットアップしています。私は嵐を使ってカフカからのメッセージを読んでいます。システムの各トピックには 7 つのパーティションがあります。

今、私は奇妙な問題に直面しています。3日前までは、すべて正常に機能していました。ただし、現在、ストーム トポロジが 2 つのパーティション (#1 と #4) から読み取ることができないようです。

問題を掘り下げてみたところ、私の kafka ログで、これらのパーティションの両方で、1 つのオフセットが欠落していることがわかりました。つまり、5964511 の後に、次のオフセットは 5964512 ではなく 5964513 です。

オフセットが見つからないため、Simple Consumer は次のオフセットに進むことができません。私は何か間違ったことをしていますか、それとも既知のバグですか?

そのような行動の理由は何でしょうか?

次のコードを使用して、有効なオフセットのウィンドウを読み取ります。

これにより、次の出力が得られます。

したがって、私が提供しているオフセットは十分にオフセット範囲内にあります。

0 投票する
2 に答える
5193 参照

java - カフカ プロデューサー ユニット テスト (Java)

カフカ (Java コード) で最初のステップを実行するカフカ プロデューサーの簡単なテストを作成したいと思います。このようなもので、動物園の飼育係をモックできます (この実装は見栄えがしますが、そこにあるいくつかのクラス、特に EmbeddedZookeeper に到達できません)。および TestUtils)。

何か案は?

0 投票する
1 に答える
8861 参照

hdfs - kafka から hdfs への書き込み (cloudera cdk を使用?)

Kafka にメッセージを送信するアプリケーション ブラウザーを備えたシステムを設計したいと考えています。コンシューマーは、イベントを avro 形式で HDFS に書き込む必要があります。

これはどのように見えるべきですか?

メッセージを確認するのに適切なタイミングはいつですか?

hdfsファイルにどのように正確に追加できますか? cdk について読んだところ、良い方向に進んでいるように見えます。私ができないのは、レコードをいつどのようにフラッシュするかです。

誰かが良い例を持っていますか?

0 投票する
1 に答える
10783 参照

long-polling - カフカの長いポーリング

低遅延のメッセージ キューを実装するために kafka を検討しており、消費者のロング ポーリングについて調べています。ただし、実際にロング ポーリングを使用する方法や、ロング ポーリングを有効にするために設定する必要があるオプションについての例はありません。kafka Java APIを使用してロングポーリングを有効にするにはどうすればよいですか?

0 投票する
2 に答える
2767 参照

python - geventでpython kafkaクライアントを使用する方法 - 実際に動作するライブラリはありますか?

Python 2.7でbrodを使用して、geventを使用してkafka 0.7.2に書き込もうとしています。

ここに私が得るエラーメッセージがあります。ブロッキングが原因だと思います。brod は tornado をサポートしていますが、私は gevent を使用しています。

gevent-kakfa を使用しようとしましたが、gevent-zookeeper に依存しています。

Zookeeper に接続しようとすると、次のメッセージが表示されます。

動作する gevent を使用してメッセージを書き込むことができる Python ライブラリはありませんか?

0 投票する
1 に答える
3997 参照

streaming - Storm と Kafka を統合する方法

私はStormで作業し、ローカル テキスト ファイルを入力ソースとして使用する基本的なプログラムを開発しました。しかし今は、外部システムから継続的に送られてくるデータのストリーミングに取り組まなければなりません。この目的には、Kafka が最適です。

問題は、Spout に Kafka からストリーミング データを取得させる方法です。または、Storm を Kafka と統合する方法。Kafkaからのデータを処理できるようにするにはどうすればよいですか?

0 投票する
2 に答える
14976 参照

compression - Kafka メッセージ コーデック - 圧縮と解凍

kafkaを使用する場合、kafka プロデューサーの kafka.compression.codec プロパティを設定することでコーデックを設定できます。

プロデューサーでスナッピー圧縮を使用するとします。カフカコンシューマーを使用してカフカからメッセージを消費する場合、スナッピーからデータをデコードするために何かをする必要がありますか、それともカフカコンシューマーの組み込み機能ですか?

関連ドキュメントでは、kafka コンシューマーのエンコーディングに関連するプロパティを見つけることができませんでした (プロデューサーのみに関連しています)。

誰かこれクリアできる?

0 投票する
1 に答える
1042 参照

apache-kafka - Kafka がメッセージにプレフィックスを追加する

kafka 7.2を使用して、プロデューサーを使用してメッセージを送信すると、メッセージを消費すると、メッセージの先頭に追加のセクションが追加されて到着することがわかります。

たとえば、単純な文字列「King Daniel」を kafka に送信する場合、バイト配列では次のようになります。

しかし、何らかの理由でそれを消費すると、次のようになります。

「………………|King Daniel」という文字列はどれ?

したがって、メッセージの先頭に 12 文字が追加されます。これは何かのヘッダーですか?元のメッセージを取得するにはどうすればよいですか?

これが私の消費者コードです:

をファイルに書き込んでいmsg.message().payload().array()て、このファイルを開くと、最初に 12 文字が追加された元のコンテンツが表示されます。

元のメッセージを正確に取得するにはどうすればよいですか?

0 投票する
1 に答える
783 参照

hbase - ストームを使用してカフカからメッセージを取得する

Zookeeper から最後のオフセット時間を取得するには? ストーム スパウトを使用してカフカからメッセージを読み取る場合。コンテキスト: Kafka はメッセージを継続的に取得し、コンシューマーはしばらくの間読み取り、その後何らかの理由でシャットダウンし、コンシューマーは最新のメッセージのみを読み取りますが、最後に読み取られたオフセットからは読み取りません。