問題タブ [kafka-producer-api]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
3 に答える
117828 参照

apache-kafka - Kafka にメッセージを送信する際に鍵は必要ですか?

現在、キー付きメッセージの一部としてキーなしでメッセージを送信していますが、それでも動作しdelete.retention.msますか? メッセージの一部としてキーを送信する必要がありますか? メッセージの一部としてキーを作成するのはこれでよいでしょうか?

0 投票する
2 に答える
21772 参照

json - Kafka シリアライザー JSON

Kafka、シリアライゼーション、JSON は初めてです

私が望むのは、プロデューサーが kafka を介して JSON ファイルを送信し、コンシューマーが JSON ファイルを元のファイル形式で消費して操作することです。

私はそれを得ることができたので、JSONは文字列へのコンバーターであり、文字列シリアライザーを介して送信され、消費者は文字列を解析してJSONオブジェクトを再作成しますが、これは効率的ではないか、正しい方法ではないのではないかと心配しています(フィールドタイプが失われる可能性があります) JSON の場合)

そこで、JSON シリアライザーを作成し、プロデューサーの構成でそれを設定することを検討しました。

ここで JsonEncoder を使用しました: Kafka: カスタムシリアライザーの作成

しかし、今プロデューサーを実行しようとすると、エンコーダーの toBytes 関数で、try ブロックが私が望むようなものを決して返さないようです

らしいobjectMapper.writeValueAsString(object).getBytes()。JSON obj ( {"name":"Kate","age":25}) を受け取り、何も変換しません。

これは私のプロデューサーの実行関数です

私は何が欠けていますか?元の方法 (文字列に変換して送信し、JSON obj を再構築する) は問題ありませんか? それとも正しい方法ではありませんか?

ありがとう!

0 投票する
1 に答える
5132 参照

java - Apache Kafka Producer エラー: 3 回試行した後、メッセージを送信できませんでした

Linux Ubuntu サーバーに kafka をインストールし、バッチ ファイル ( kafka-console-producer.shおよびkafka-console-consumer.sh)を使用して通信をテストしたところ、メッセージを発行および受信できることがわかりました。

同じネットワーク上で実行されているWindows マシン上。コードが示されているJavaプロデューサークライアントを作成しました

クライアントを実行すると、次のエラーが発生します

log4j:WARN ロガー (kafka.utils.VerifiableProperties) のアペンダーが見つかりませんでした。log4j:WARN log4j システムを適切に初期化してください。log4j:WARN 詳細については 、http: //logging.apache.org/log4j/1.2/faq.html#noconfigを参照してください。3 回試行してもメッセージを送信できませんでした。

次のことを試しました

  1. Windowsマシンからubuntuマシンにpingを実行したところ、正常に動作しているようです
  2. Apache Kafka example error: Failed to send message after 3 attempts の解決策を試しましたが、うまくいきませんでした

サーバー上で次のコマンドを実行したときに観察した奇妙なことの 1 つ - bin/kafka-topics.sh --list --zookeeper localhost:2181 、トピックが Java コードから作成されたことがわかりましたが、メッセージは発行されませんでした

どんな助けでも大歓迎です

0 投票する
1 に答える
1396 参照

java - コンシューマがダウンしているときにカフカ トピックを作成する

kafka-storm を統合しようとしていました。いくつかの例から始めました。

GitHub からサンプルを実行できました。次に、KAFKA PRODUCER API を使用して kafka トピックにメッセージを発行するために、Eclipse で Producer クラスを作成しようとしています。

シナリオ 1:

トピック テストと言うテストを使用してコンシューマー シェルが実行されている場合、プロデューサー クラスを実行します。公開されたすべてのメッセージを含むコンシューマー シェルを確認できます。

シナリオ2

コンシューマーシェルを開始していません (コンシューマーがダウンしているとします)。そして、プロデューサークラスを実行します。メッセージはカフカに公開されています。

メッセージが発行された場合、ダウンタイム後にコンシューマー シェルを起動すると、既に発行されたトピックのメッセージが読み取られません。

なんで?トピック消費のログを保持していると思います。メッセージを読むべきではありませんか?

言及する必要がある構成パラメーターはありますか?

または、消費者を変えるために何かする必要がありますか。パッケージで提供されているコンシューマーシェルを使用しており、それを使用して開始しています

0 投票する
4 に答える
77506 参照

file - Kafka プロデューサーにファイルを書き込む方法

Kafka で標準入力の代わりに単純なテキスト ファイルを読み込もうとしています。Kafka をダウンロードした後、次の手順を実行しました。

飼育係を開始しました:

bin/zookeeper-server-start.sh config/zookeeper.properties

起動したサーバー

bin/kafka-server-start.sh config/server.properties

「テスト」という名前のトピックを作成しました:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

プロデューサーを実行しました:

消費者が聞いた:

標準入力の代わりに、コンシューマーが直接見ることができるデータ ファイルまたは単純なテキスト ファイルをプロデューサーに渡したいと考えています。どんな助けでも本当に感謝します。ありがとう!

0 投票する
1 に答える
2516 参照

java - Kafka クラスター Zookeeper の障害処理

飼育係用に 1 台、ブローカー用に 2 台の 3 台のマシンで構成される kafka クラスターを実装します。私は約 6 台のコンシューマー マシンと約 100 台のプロデューサーを持っています。

ブローカーの 1 つが失敗した場合でも、レプリケーション機能のおかげでデータ損失が回避されます。しかし、zookeeper に障害が発生し、同じマシンを起動できない場合はどうなるでしょうか。いくつか質問があります:

  1. Zookeeper に障害が発生した後でも、プロデューサーは指定されたブローカーにメッセージをプッシュし続けていることに気付きました。しかし、消費者はそれらを取得できなくなりました。消費者が登録解除されたためです。この場合、データは永久に失われますか?
  2. 実行時にブローカー構成で Zookeeper の IP を変更するにはどうすればよいですか? Zookeeper の IP を変更するには、シャットダウンする必要がありますか?
  3. 新しい ZooKeeper マシンが何らかの方法で以前のクラスターに持ち込まれたとしても、以前のデータは失われますか?
0 投票する
1 に答える
1522 参照

apache-kafka - プロデューサー、ブローカー、コンシューマーを 3 台の異なるマシンにインストールできますか?

私はカフカにかなり慣れていません。私は次のアーキテクチャを持っています:

私は混乱しています