問題タブ [kafka-python]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
python - python-kafka: コンシューマがメッセージ属性に基づいてメッセージをスキップすることは可能ですか?
与えられた一連のメッセージが与えられ、それぞれに優先度属性があります。優先度属性が十分に高くないメッセージを消費者がスキップまたは無視することは可能ですか?
最初に、kafka-python コンシューマによって消費されるメッセージをデシリアライズし、優先度レベルをチェックして、必要な優先度レベルを満たしていないメッセージを破棄しました。
しかし、メッセージが大きい場合、それ以上処理されないメッセージを逆シリアル化するためにかなりの時間が浪費されるのではないかと心配しています。カフカキューからの読み取り時にこれを行う方法はありませんか?
apache-kafka - How to delete specific number of lines from kafka topic by using python or using any inbuilt method?
I am facing a problem while using consumer.poll() method .After fetching data by using poll() method consumer won't have any data to commit so Please help me to remove specific number of lines from the kafka topic .
python - Python: 統合テストのために Kafka をモックアウトする
私は統合テストにやや慣れていません。Kafka を使用して互いにメッセージを渡す 2 つのサービスがあります。ただし、統合テストでは、テストを実行するために必ずしも Kafka を実行する必要はありません。Kafkaをモックアウトする標準的な方法はありますか? または、これは自分で作成する必要があるものですか? MockKafka キューとアプリの適切な場所にパッチを適用しますか? さらに、これは統合テストがすべきことに違反していますか? これについての私の見解は、私は Kafka の機能をテストしていないということです。統合テストのために、モック アウトする必要があります。
apache-kafka - コンシューマーグループリバランスコールバック関数でkafkaの外部にオフセットを保存して取得したいときにグループIDを取得する方法
しかし、これらのコールバック関数で consumer_group_id を取得できません
python-2.7 - PySpark 処理ストリーム データと処理されたデータのファイルへの保存
場所の座標をストリーミングしているデバイスを複製し、データを処理してテキスト ファイルに保存しようとしています。私は Kafka と Spark ストリーミング (pyspark 上) を使用しています。これが私のアーキテクチャです。
1-Kafka プロデューサーは、次の文字列形式で test という名前のトピックにデータを発行します。
プロデューサーコード:
プロデューサーは正常に動作し、ストリーミングされたデータをコンシューマー (さらにはスパーク) で取得します
2- Spark ストリーミングはこのストリームを受信してpprint()
います。
Spark ストリーミング処理コード
エラーとして私は得る:
そして他の例外。
私が実際に望んでいるのは、各エントリ"LG float LT float"
を JSON 形式でファイルに保存することですが、最初に単に座標をファイルに保存したいのですが、それを実現できないようです。何かアイデアはありますか?
必要に応じて完全なスタック トレースを提供できます
python - 一部のブローカーが利用できない場合、python kafka プロデューサーはどのように機能しますか?
3 ノードの kafka クラスターをセットアップし、次のように python をプロデューサーとして使用しました。
"n0" と "n1" が利用可能で "n2" が利用できない場合 (ブローカーの故障またはネットワーク エラー)、プロデューサーは "n0n1" に送信することで正常に動作できず、エラーをスローします。
apache-spark - Pyspark Kafka オフセット範囲単位
Kafka からのログを処理するために Spark をバッチとして使用しています。各サイクルで、コードは kafka コンシューマーに到達するものをすべて取得する必要があります。ただし、各サイクルで kafka から取得するデータの量に制限を加えたいと考えています。5 GB または 500000 ログ行としましょう..
ドライバーに障害が発生した場合に備えて、メモリとディスクにオフセットを保存します。しかし、これらのカフカオフセットを課して、サイクルごとの最大データを制限するにはどうすればよいですか? カフカのオフセット範囲の単位は??
前もって感謝します!