問題タブ [pykafka]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
python - TypeError: unhashable type: 'TopicAndPartition' when KafkaUtils.createDirectStream の理由は何ですか?
任意のオフセットから kafka メッセージを消費したいKafkaUtils.createDirectStream
.
私のソースコード:
しかし、以下のようなエラーが発生します:
pyspark ソースコード:
fromOffsets は dict である必要があり、dict のキーはTopicAndPartition
オブジェクトである必要があります。
これについて何か考えはありますか?
python - uwsgi と gevent で kafka-python または pykafka を非同期プロデューサーとして機能させるにはどうすればよいですか?
私のスタックは gevents の uwsgi です。APIエンドポイントをデコレータでラップして、すべてのリクエストデータ(URL、メソッド、ボディ、およびレスポンス)をkafkaトピックにプッシュしようとしていますが、うまくいきません。私の理論は、私がgeventsを使用していて、これらを非同期モードで実行しようとしているからです。実際にkafkaにプッシュする非同期スレッドは、geventsで実行できません。また、メソッドを同期させようとすると、それも機能しません。プロデュースワーカーで終了します。つまり、プロデュース後に呼び出しが返されません。どちらの方法もPythonシェルでうまく機能しますが、スレッドでuwsgiを実行すると.
サンプルコードに従います: 1. with kafka-python (async)
py-kafka (同期):
/li>
pykafka - いくつかのコンシューマーを起動するときにエラー PartitionOwnedError および ConsumerStoppedException が発生した理由
私は pykafka を使用して kafka トピックからメッセージを取得し、いくつかのプロセスを実行して mongodb に更新します。pymongodb は毎回 1 つの項目しか更新できないため、100 のプロセスを開始します。しかし、起動時に一部のプロセスで「PartitionOwnedError および ConsumerStoppedException」というエラーが発生しました。どうしてか分かりません。ありがとうございました。
>
>
apache-kafka - プロデューサーが DNS 経由でブローカーに接続できない
私は物理サーバーを持っており、advertized.host.name をサーバー ip に設定し、ルーターでポート転送を行います。しかし、プロデューサーは dns を使用してブローカーに接続できません。
ERROR:pykafka.connection:Failed to connect to 192.168.1.3:9092 WARNING:pykafka.producer:Broker 192.168.1.3:9092 disconnected. 再試行しています。
git - パブリッシャーがApache Kafkaのトピックにメッセージを発行する方法は?
私はApache Kafkaの初心者です。Apache Kafka のトピックとパーティションの構造と、プロデューサーがデータをパーティションにプッシュする方法がわかりません。
シナリオを考えてみましょう。2 つのプロデューサー PR1、PR2 と 3 つのブローカー B1、B2、B3 があります。そして、P1、P2、P3 として 3 つのパーティションを持つ 1 つのトピック T1 が 3 つのブローカーに分割されます。ここで、最初のプロデューサー PR1 が Zookeeper と調整し、Broker を見つけてメッセージをプッシュします (ログ サーバーがログ データを 1 秒あたり 1 レコードでプッシュするとします)。T1 - P1 にオフセットを設定し、オフセットを 0 に設定します。押されます。パーティション P2 または P3 にプッシュしますか? または、最初のレコード自体が 3 つのパーティションすべてに並行してプッシュされます。
これで、2 番目のパブリッシャーが参加し、メッセージをパーティションにパブリッシュします。メッセージはどこにプッシュされますか? P1 にプッシュされますか? すでに PR1 がメッセージを P1 にプッシュしている場合、PR1 と PR2 の両方が同時にメッセージを P1 に連続して追加し、オフセット 0,1,2,3,4,5.... を作成しますか?