問題タブ [pykafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
1 に答える
1179 参照

python - TypeError: unhashable type: 'TopicAndPartition' when KafkaUtils.createDirectStream の理由は何ですか?

任意のオフセットから kafka メッセージを消費したいKafkaUtils.createDirectStream.

私のソースコード:

しかし、以下のようなエラーが発生します:

pyspark ソースコード:

fromOffsets は dict である必要があり、dict のキーはTopicAndPartitionオブジェクトである必要があります。

これについて何か考えはありますか?

0 投票する
1 に答える
6524 参照

python - uwsgi と gevent で kafka-python または pykafka を非同期プロデューサーとして機能させるにはどうすればよいですか?

私のスタックは gevents の uwsgi です。APIエンドポイントをデコレータでラップして、すべてのリクエストデータ(URL、メソッド、ボディ、およびレスポンス)をkafkaトピックにプッシュしようとしていますが、うまくいきません。私の理論は、私がgeventsを使用していて、これらを非同期モードで実行しようとしているからです。実際にkafkaにプッシュする非同期スレッドは、geventsで実行できません。また、メソッドを同期させようとすると、それも機能しません。プロデュースワーカーで終了します。つまり、プロデュース後に呼び出しが返されません。どちらの方法もPythonシェルでうまく機能しますが、スレッドでuwsgiを実行すると.

サンプルコードに従います: 1. with kafka-python (async)

  1. py-kafka (同期):

    /li>
0 投票する
3 に答える
540 参照

pykafka - いくつかのコンシューマーを起動するときにエラー PartitionOwnedError および ConsumerStoppedException が発生した理由

私は pykafka を使用して kafka トピックからメッセージを取得し、いくつかのプロセスを実行して mongodb に更新します。pymongodb は毎回 1 つの項目しか更新できないため、100 のプロセスを開始します。しかし、起動時に一部のプロセスで「PartitionOwnedError および ConsumerStoppedException」というエラーが発生しました。どうしてか分かりません。ありがとうございました。

>

>

0 投票する
0 に答える
860 参照

apache-kafka - プロデューサーが DNS 経由でブローカーに接続できない

私は物理サーバーを持っており、advertized.host.name をサーバー ip に設定し、ルーターでポート転送を行います。しかし、プロデューサーは dns を使用してブローカーに接続できません。

ERROR:pykafka.connection:Failed to connect to 192.168.1.3:9092 WARNING:pykafka.producer:Broker 192.168.1.3:9092 disconnected. 再試行しています。

0 投票する
1 に答える
1802 参照

git - パブリッシャーがApache Kafkaのトピックにメッセージを発行する方法は?

私はApache Kafkaの初心者です。Apache Kafka のトピックとパーティションの構造と、プロデューサーがデータをパーティションにプッシュする方法がわかりません。

シナリオを考えてみましょう。2 つのプロデューサー PR1、PR2 と 3 つのブローカー B1、B2、B3 があります。そして、P1、P2、P3 として 3 つのパーティションを持つ 1 つのトピック T1 が 3 つのブローカーに分割されます。ここで、最初のプロデューサー PR1 が Zookeeper と調整し、Broker を見つけてメッセージをプッシュします (ログ サーバーがログ データを 1 秒あたり 1 レコードでプッシュするとします)。T1 - P1 にオフセットを設定し、オフセットを 0 に設定します。押されます。パーティション P2 または P3 にプッシュしますか? または、最初のレコード自体が 3 つのパーティションすべてに並行してプッシュされます。

これで、2 番目のパブリッシャーが参加し、メッセージをパーティションにパブリッシュします。メッセージはどこにプッシュされますか? P1 にプッシュされますか? すでに PR1 がメッセージを P1 にプッシュしている場合、PR1 と PR2 の両方が同時にメッセージを P1 に連続して追加し、オフセット 0,1,2,3,4,5.... を作成しますか?