問題タブ [kafka-python]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-kafka - KafkaConsumer の利用可能な最も古いメッセージに巻き戻す際の問題
次のコードで、グループ G1 の 1 つの Kafka コンシューマのフェッチ オフセットを off1 に巻き戻しました。
上記の行で、off1 は、対応するパーティション内のトピックで使用可能な最も古いメッセージのオフセットです。
今、次のように別のグループ G2 で Kafka コンシューマーをインスタンス化しています。
ここでは、フェッチ オフセットを off1 として読み取っています。これは、グループ G1 の consumer1 に対してリセットしたものと同じです。異なるグループのオフセットは異なるはずなので、それは起こらないはずだと思います。誰かが明らかにしたら、私は本当に感謝します. 前もって感謝します。
python - Kafka トピックのメッセージを更新
Python Kafka トピックを使用しています。
Kafka のキュー内のメッセージを更新して、キューの先頭に再度追加できるプロビジョン プロデューサーはありますか?
Kafka の仕様によると、実現可能ではないようです。
docker - トピックを作成するが、Kubernetes 上の Python で Kafka FailedPayloadsError を取得する
Python kafka-library で SimpleProducer を使用しています。このスクリプトは、私が試した他のよりハードに構成された kafka セットアップで以前に問題なく動作しました。
このスクリプトを 1 回実行すると、python-console にこの応答が表示されます。
次に、zookeeper.log のノードに移動して、次を確認できます。
これは、Zookeeper がトピック用の新しい Znode を作成しているだけのようです。これは以前から存在していなかったからです。また、Kafka server.log には次のように出力されます。
ただし、私のメッセージはトピックに投稿されることはなく、次に python-script を実行すると、常に次のようになります。
私がそれを機能させた場合、advertized.host.name は常にノードの外部 IP でしたが、Kubernetes を介してそれを機能させることができないようです。外部 IP をコンテナー parhaps から呼び出し可能にすることは可能でしょうか?
私の kafka/config/server.properties は、すべてのブローカーで次のようになります。
message-queue - Kafka トピックまたはパーティション レベルでの並列処理
キーに基づいてデータを分離するには、同じトピック内で複数のトピックまたは複数のパーティションを使用する必要がありますか? サーバーで発生するオーバーヘッド、計算、データストレージ、および負荷に基づいて質問しています。
python - Python Kafkaトピックの下のすべてのメッセージを削除する方法
私はカフカが初めてです。csv ファイルから Kafka にデータをインポートしようとしています。前日のデータが廃止されている間、毎日インポートする必要があります。PythonでKafkaトピックの下にあるすべてのメッセージを削除するにはどうすればよいですか? または、PythonでKafkaトピックを削除するにはどうすればよいですか? または、誰かがデータの有効期限が切れるまで待つことを提案しているのを見ました。可能であれば、データの有効期限を設定するにはどうすればよいですか? どんな提案でも大歓迎です!
ありがとう
python - PubMed データを Kafka にプッシュする
PubMed データ ソースでは、出力を Kafka キューにプッシュする必要があります。各ソースは Kafka トピックとして表示できます。(私は Kafka の概念を知っており、Python を使用して Kafka を探索しました)
FireFTP を介して PubMed データを表示できます。
誰でも前進する方法を助けることができますか?
java - Kafka の低レベル API を使用して、データのフェッチが完了したらオフセットをコミットする必要がありますか?
このメソッドは Kafka ソースで見つけました。使うべきですか?
python - Python で kafka コンシューマー デーモンの無限ループから抜け出せない
kafka イベントを消費するプログラムを作成しました。10秒後に終了したいデーモンがあります。
インデントは無視してください。
しかし、このプログラムは 10 秒後に終了しません。ここで何が欠けているのか知りたいですか?
python - 次のコードのパフォーマンスを改善して、1 秒あたり 100 万レコードを取り込むにはどうすればよいですか?
次のコードは、毎秒 10k ~ 20k のレコードを取り込み、パフォーマンスを改善したいと考えています。json 形式を読み取り、Kafka を使用してデータベースに取り込みます。- Zookeeper と Kafka がインストールされた 5 つのノードのクラスターで実行しています。
改善するためのヒントを教えていただけますか?