問題タブ [kafka-producer-api]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-kafka - Kafka にメッセージを送信する際に鍵は必要ですか?
現在、キー付きメッセージの一部としてキーなしでメッセージを送信していますが、それでも動作しdelete.retention.ms
ますか? メッセージの一部としてキーを送信する必要がありますか? メッセージの一部としてキーを作成するのはこれでよいでしょうか?
json - Kafka シリアライザー JSON
Kafka、シリアライゼーション、JSON は初めてです
私が望むのは、プロデューサーが kafka を介して JSON ファイルを送信し、コンシューマーが JSON ファイルを元のファイル形式で消費して操作することです。
私はそれを得ることができたので、JSONは文字列へのコンバーターであり、文字列シリアライザーを介して送信され、消費者は文字列を解析してJSONオブジェクトを再作成しますが、これは効率的ではないか、正しい方法ではないのではないかと心配しています(フィールドタイプが失われる可能性があります) JSON の場合)
そこで、JSON シリアライザーを作成し、プロデューサーの構成でそれを設定することを検討しました。
ここで JsonEncoder を使用しました: Kafka: カスタムシリアライザーの作成
しかし、今プロデューサーを実行しようとすると、エンコーダーの toBytes 関数で、try ブロックが私が望むようなものを決して返さないようです
らしいobjectMapper.writeValueAsString(object).getBytes()
。JSON obj ( {"name":"Kate","age":25}
) を受け取り、何も変換しません。
これは私のプロデューサーの実行関数です
私は何が欠けていますか?元の方法 (文字列に変換して送信し、JSON obj を再構築する) は問題ありませんか? それとも正しい方法ではありませんか?
ありがとう!
java - Apache Kafka Producer エラー: 3 回試行した後、メッセージを送信できませんでした
Linux Ubuntu サーバーに kafka をインストールし、バッチ ファイル ( kafka-console-producer.shおよびkafka-console-consumer.sh)を使用して通信をテストしたところ、メッセージを発行および受信できることがわかりました。
同じネットワーク上で実行されているWindows マシン上。コードが示されているJavaプロデューサークライアントを作成しました
クライアントを実行すると、次のエラーが発生します
log4j:WARN ロガー (kafka.utils.VerifiableProperties) のアペンダーが見つかりませんでした。log4j:WARN log4j システムを適切に初期化してください。log4j:WARN 詳細については 、http: //logging.apache.org/log4j/1.2/faq.html#noconfigを参照してください。3 回試行してもメッセージを送信できませんでした。
次のことを試しました
- Windowsマシンからubuntuマシンにpingを実行したところ、正常に動作しているようです
- Apache Kafka example error: Failed to send message after 3 attempts の解決策を試しましたが、うまくいきませんでした
サーバー上で次のコマンドを実行したときに観察した奇妙なことの 1 つ - bin/kafka-topics.sh --list --zookeeper localhost:2181 、トピックが Java コードから作成されたことがわかりましたが、メッセージは発行されませんでした
どんな助けでも大歓迎です
java - コンシューマがダウンしているときにカフカ トピックを作成する
kafka-storm を統合しようとしていました。いくつかの例から始めました。
GitHub からサンプルを実行できました。次に、KAFKA PRODUCER API を使用して kafka トピックにメッセージを発行するために、Eclipse で Producer クラスを作成しようとしています。
シナリオ 1:
トピック テストと言うテストを使用してコンシューマー シェルが実行されている場合、プロデューサー クラスを実行します。公開されたすべてのメッセージを含むコンシューマー シェルを確認できます。
シナリオ2
コンシューマーシェルを開始していません (コンシューマーがダウンしているとします)。そして、プロデューサークラスを実行します。メッセージはカフカに公開されています。
メッセージが発行された場合、ダウンタイム後にコンシューマー シェルを起動すると、既に発行されたトピックのメッセージが読み取られません。
なんで?トピック消費のログを保持していると思います。メッセージを読むべきではありませんか?
言及する必要がある構成パラメーターはありますか?
または、消費者を変えるために何かする必要がありますか。パッケージで提供されているコンシューマーシェルを使用しており、それを使用して開始しています
file - Kafka プロデューサーにファイルを書き込む方法
Kafka で標準入力の代わりに単純なテキスト ファイルを読み込もうとしています。Kafka をダウンロードした後、次の手順を実行しました。
飼育係を開始しました:
bin/zookeeper-server-start.sh config/zookeeper.properties
起動したサーバー
bin/kafka-server-start.sh config/server.properties
「テスト」という名前のトピックを作成しました:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
プロデューサーを実行しました:
消費者が聞いた:
標準入力の代わりに、コンシューマーが直接見ることができるデータ ファイルまたは単純なテキスト ファイルをプロデューサーに渡したいと考えています。どんな助けでも本当に感謝します。ありがとう!
java - Kafka クラスター Zookeeper の障害処理
飼育係用に 1 台、ブローカー用に 2 台の 3 台のマシンで構成される kafka クラスターを実装します。私は約 6 台のコンシューマー マシンと約 100 台のプロデューサーを持っています。
ブローカーの 1 つが失敗した場合でも、レプリケーション機能のおかげでデータ損失が回避されます。しかし、zookeeper に障害が発生し、同じマシンを起動できない場合はどうなるでしょうか。いくつか質問があります:
- Zookeeper に障害が発生した後でも、プロデューサーは指定されたブローカーにメッセージをプッシュし続けていることに気付きました。しかし、消費者はそれらを取得できなくなりました。消費者が登録解除されたためです。この場合、データは永久に失われますか?
- 実行時にブローカー構成で Zookeeper の IP を変更するにはどうすればよいですか? Zookeeper の IP を変更するには、シャットダウンする必要がありますか?
- 新しい ZooKeeper マシンが何らかの方法で以前のクラスターに持ち込まれたとしても、以前のデータは失われますか?
apache-kafka - プロデューサー、ブローカー、コンシューマーを 3 台の異なるマシンにインストールできますか?
私はカフカにかなり慣れていません。私は次のアーキテクチャを持っています:
私は混乱しています