問題タブ [kafka-python]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
python - 消費するためのpython kafkaが機能しません
私はカフカに書くことができます。ただし、消費は機能しません
python - kafka-python のクライアントでメッセージを消費する
私はカフカが初めてです。kafka-python のいくつかのオンライン チュートリアルの助けを借りて、次のコードを作成しました。
しかし、問題は、その最後の for ループ コードの実行がスタックしていて、私には理解できないことです。
python - PythonクライアントからJSONオブジェクトをkafkaに送信する方法
次のような単純な JSON オブジェクトがあります。
以下は、Kafka にメッセージを送信している私の python コードです。
メッセージが受信されていることをストーム ログで確認できますが、tuple { json structure in here } に対して Transformation null をスローしています。これを修正するために何をする必要があるかわかりません。
python - プログラムでPython Kafka Consumerを停止するには?
私は Python Kafka コンシューマを実行しています ( http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.htmlで kafka.consumer.SimpleConsumer または kafka.consumer.simple.SimpleConsumer を使用しようとしています)。次のコードを実行すると、すべてのメッセージが消費されたとしても、常に実行されます。すべてのメッセージを消費する場合、消費者が停止することを願っています。どうやってするの?また、stop() 関数 (基本クラス kafka.consumer.base.Consumer にあります) の使用方法もわかりません。
アップデート
シグナル ハンドラを使用して、consumer.stop() を呼び出しました。一部のエラー メッセージが画面に出力されました。しかし、プログラムはまだ for ループでスタックしていました。新しいメッセージが届くと、消費者はそれらを消費して印刷しました。client.close() も試しました。でも同じ結果。
for ループを適切に停止するには、いくつかの方法が必要です。
どんな助けでも大歓迎です。ありがとう。
python - Kafka を継続的に使用し、マルチプロセッシングを使用して特定の間隔でキューを更新する
カフカからのイベントを継続的に消費しようとしています。同じアプリケーションも、この消費されたデータを使用して、分析を実行し、n 秒間隔でデータベースを更新します (n = 60 秒と仮定)。
同じアプリケーションで、process1 = Kafka Consumer , process2= Data Analysis and database update logic.
process2
計算とデータベースの更新に関係しているため、実行に 5 ~ 10 秒かかります。process1
実行中にストールしたくありませんprocess2
。したがって、私はmultiprocessing module
( Pythonでモジュールを使用していた場合になりprocess1,process2
ますが、GILについて読んだことと、モジュールがマルチコアアーキテクチャを活用できないため、モジュールを使用することにしました。)を使用しています。この場合の同時性。(上記のモジュールの制限についての私の理解が間違っている場合は、お詫び申し上げます。お気軽に訂正してください)。thread1,thread2
Threading
Threading
multiprocessing
GIL
Threading
私が使用しているアプリケーションでは、2 つのプロセス間で非常に単純な対話が行われprocess1
、60 秒で受信したすべてのメッセージでキューがいっぱいになり、60 秒の終わりにすべてのメッセージが に転送されますprocess2
。
この転送ロジックに問題があります。キューの内容を から に転送するにはどうすればよいprocess1
ですprocess2
か (それはメイン プロセスまたは別のプロセスであると思いますか?それは私が持っている別の質問です。メイン プロセスに加えて 2 つのプロセスをインスタンス化する必要がありますか?) 60 秒の終わりにその後、キューの内容をクリアして、別の反復で再び開始します。
これまでのところ、次のものがあります。
どんな助けでも大歓迎です。