問題タブ [kafka-python]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
2 に答える
2509 参照

python - パーティションとレプリケーションを使用してpykafkaで新しいトピックを作成するには?

pykafkaを使用して Kafka でプログラムでトピックを作成できるようにしたいと考えています。TopicDict にアクセスすると、トピックが存在しない場合にトピックが自動的に作成されることはわかっていますが、それを使用してパーティション/レプリカの数を制御する方法がわかりません。また、Kafka がダウンすると無限ループになってしまうという厄介なバグがあります。基本的に私は次のようなことをしたい:

0 投票する
1 に答える
9953 参照

python - 起動時にすべてのメッセージを読み取る Kafka python コンシューマ

以下のコードを使用して、トピックからメッセージを読み取ります。私は2つの問題に直面しています。コンシューマーを起動するたびに、キュー内のすべてのメッセージを読み取っていますか? 未読メッセージだけを読むにはどうすればよいですか?

0 投票する
2 に答える
372 参照

apache-kafka - Spark Streaming を使用して Kafka で受信したトピックを区別する方法

次のコードを使用して、カフカからメッセージを取得しています

スカラコード:

これが私のサンプル プロデューサー コードです。

ストリーミング レシーバーの出力

質問:

編集:keyedProducerを使用

これは私にエラーを投げています

0 投票する
1 に答える
4334 参照

apache-kafka - pykafkaはkafkaブローカーに接続できません

次のコードを介してクラスターpykafkaを接続するために使用する場合:kafka

次のように例外が発生しました。

raise Exception('ブローカに接続してメタデータを取得できません。')

例外: ブローカに接続してメタデータをフェッチできません。

しかし、次のようなコマンドラインを使用していたとき:

kafka-console-producer --broker-list 10.0.0.101:9092 --topic userCND

正常に動作しますが、警告メッセージが表示されます。

WARN プロパティのトピックが無効です (kafka.utils.VerifiableProperties)

0 投票する
0 に答える
1686 参照

kafka-python - 例外 AttributeError: 「'KafkaProducer' オブジェクトには属性 '_closed' がありません」

私はPythonを使用してメッセージを生成していましたが、スクリプトの下にあるこの例外を取得しました..

`from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='192.168.1.6:9092') producer.send('test', b'Welcome Nagarajan').get(timeout=60)

エラーメッセージを取得しています...

トレースバック (最後の最後の呼び出し): ファイル ""、1 行目、ファイル "/usr/local/lib/python2.7/dist-packages/kafka/producer/kafka.py"、245 行目、init self.config ['api_version'] = client.check_version() ファイル "/usr/local/lib/python2.7/dist-packages/kafka/client_async.py"、607 行目、check_version connect() ファイル "/usr/local/ lib/python2.7/dist-packages/kafka/client_async.py"、575 行目、接続で Errors.NodeNotReadyError(node_id) kafka.common.NodeNotReadyError: None を発生させます

0 投票する
6 に答える
17230 参照

python - kafka-python コンシューマーがメッセージを受信しない

KafaConsumer最初から、または他の明示的なオフセットから読み取るのに 問題があります。

同じトピックのコンシューマー向けのコマンド ライン ツールを実行すると、--from-beginningオプションを含むメッセージが表示され、それ以外の場合はハングします

Pythonで実行するとハングします。これは、消費者の構成が正しくないことが原因であると思われます

出力:

指定されたトピックからのメッセージを消費する (その後ハングする)

私は kafka-python 0.9.5 を使用しており、ブローカーは kafka 8.2 を実行しています。正確な問題が何であるかはわかりません。

コンソール コンシューマーの動作をエミュレートするには、dpkp の提案に従って _group_id=None_ を設定します。

0 投票する
2 に答える
9905 参照

python - Flask - ライブ ストリーム kafka データのプル - Kafka と Python Flask の統合

このプロジェクトはreal time search engine - log analysisパフォーマンスのためのものです。

Spark 処理から Kafka へのライブ ストリーミング データがあります。

Kafka の出力を使用して、get the data from the Kafka using Flask.. およびvisualize it using Chartjsまたはその他の視覚化を行いたい..

からライブ ストリーミング データを取得するにはどうすればよいKafka using the python flaskですか?

どのように始めればよいですか?

どんな助けでも大歓迎です!

ありがとう!

0 投票する
1 に答える
2168 参照

python - Kafka コンシューマがカフカ プロデューサーよりも遅いのはなぜですか?

完全にクロールできるデータのストリームを取得します。データはすべて Kafka に入れられ、その後 Cassandra に送信されます。現在、kafka コンシューマーは非常に遅く、プロデューサーよりもはるかに遅いです。私はそれらがまったく同じであることを望みます。この結果を得るにはどうすればよいですか、またはコードの何が問題なのですか?

Python での私の Kafka コンシューマー コードは次のとおりです。

編集:

プロファイル結果も追加しましたが、コードの遅い行は次のようです

お返事ありがとうございます。