問題タブ [kafka-python]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
1 に答える
3551 参照

python - 消費するためのpython kafkaが機能しません

私はカフカに書くことができます。ただし、消費は機能しません

0 投票する
1 に答える
4005 参照

python - kafka-python のクライアントでメッセージを消費する

私はカフカが初めてです。kafka-python のいくつかのオンライン チュートリアルの助けを借りて、次のコードを作成しました。

しかし、問題は、その最後の for ループ コードの実行がスタックしていて、私には理解できないことです。

0 投票する
2 に答える
24588 参照

python - PythonクライアントからJSONオブジェクトをkafkaに送信する方法

次のような単純な JSON オブジェクトがあります。

以下は、Kafka にメッセージを送信している私の python コードです。

メッセージが受信されていることをストーム ログで確認できますが、tuple { json structure in here } に対して Transformation null をスローしています。これを修正するために何をする必要があるかわかりません。

0 投票する
4 に答える
12515 参照

python - プログラムでPython Kafka Consumerを停止するには?

私は Python Kafka コンシューマを実行しています ( http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.htmlで kafka.consumer.SimpleConsumer または kafka.consumer.simple.SimpleConsumer を使用しようとしています)。次のコードを実行すると、すべてのメッセージが消費されたとしても、常に実行されます。すべてのメッセージを消費する場合、消費者が停止することを願っています。どうやってするの?また、stop() 関数 (基本クラス kafka.consumer.base.Consumer にあります) の使用方法もわかりません。

アップデート

シグナル ハンドラを使用して、consumer.stop() を呼び出しました。一部のエラー メッセージが画面に出力されました。しかし、プログラムはまだ for ループでスタックしていました。新しいメッセージが届くと、消費者はそれらを消費して印刷しました。client.close() も試しました。でも同じ結果。

for ループを適切に停止するには、いくつかの方法が必要です。

どんな助けでも大歓迎です。ありがとう。

0 投票する
1 に答える
1773 参照

python - Kafka を継続的に使用し、マルチプロセッシングを使用して特定の間隔でキューを更新する

カフカからのイベントを継続的に消費しようとしています。同じアプリケーションも、この消費されたデータを使用して、分析を実行し、n 秒間隔でデータベースを更新します (n = 60 秒と仮定)。

同じアプリケーションで、process1 = Kafka Consumer , process2= Data Analysis and database update logic.

process2計算とデータベースの更新に関係しているため、実行に 5 ~ 10 秒かかります。process1実行中にストールしたくありませんprocess2。したがって、私はmultiprocessing module( Pythonでモジュールを使用していた場合になりprocess1,process2ますが、GILについて読んだことと、モジュールがマルチコアアーキテクチャを活用できないため、モジュールを使用することにしました。)を使用しています。この場合の同時性。(上記のモジュールの制限についての私の理解が間違っている場合は、お詫び申し上げます。お気軽に訂正してください)。thread1,thread2ThreadingThreadingmultiprocessingGILThreading

私が使用しているアプリケーションでは、2 つのプロセス間で非常に単純な対話が行われprocess1、60 秒で受信したすべてのメッセージでキューがいっぱいになり、60 秒の終わりにすべてのメッセージが に転送されますprocess2

この転送ロジックに問題があります。キューの内容を から に転送するにはどうすればよいprocess1ですprocess2か (それはメイン プロセスまたは別のプロセスであると思いますか?それは私が持っている別の質問です。メイン プロセスに加えて 2 つのプロセスをインスタンス化する必要がありますか?) 60 秒の終わりにその後、キューの内容をクリアして、別の反復で再び開始します。

これまでのところ、次のものがあります。

どんな助けでも大歓迎です。