問題タブ [kafka-python]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
0 に答える
816 参照

python - Kafka Consumer Hangs on Startup

Due to one reason or another, we recently re-wrote our consumers and producers using different libraries than initially written in. However, I've been having some issues after the switch.

Using Kafka 0.9.0.2:
8 partitions

  • We consume from one topic, process the message, and push to another topic.

Due to extensive processing causing session timeouts before being able to commit offsets, the following config options were updated:
Consumer: session.timeout.ms: 1m40s
Broker: group.max.session.timeout.ms: 2m and group.min.session.timeout.ms: 6s

The issue I'm having is that once every several startups of my Consumer, it seems to hang while trying to fetch messages.
No errors or exceptions, it doesn't eventually timeout, it just sits. Considering my process which implements the consumer restarts every so often, this is a breaking problem, and I'm out of ideas. I don't know if it's a config update that needs to be made, or if I'm not handling shutdown properly, which is causing some sort of timeout to be exceeded on the broker.

  1. I add 100,000 messages to Kafka.
  2. I turn on this service and allow consumption of ~1000 messages.
  3. I restart the process to simulate what's happening.
  4. The process hangs, output: Consumer created

Has anyone seen their consumers hang on startup?
Is there anything I should look for in Kafka logs?

I'm also noticing consumption from the partitions to be increasingly slow.

0 投票する
4 に答える
9046 参照

python - Python カフカ コンシューマー グループ ID の問題

私の知る限り、

並列処理を実装するために、kafka のパーティションと (コンシューマー) グループの概念が導入されました。私はpythonを介してkafkaを使用しています。(たとえば) 2 つのパーティションがある特定のトピックがあります。つまり、2 つのコンシューマーを含むコンシューマー グループを開始すると、それらは異なるパーティションにマップ (サブスクライブ) されます。

しかし、kafkaPython でライブラリを使用すると、奇妙な問題が発生しました。基本的に同じグループ ID を持つ 2 つのコンシューマーを開始し、それらがメッセージを消費するためのスレッドを開始しました。

しかし、kafka-stream 内のすべてのメッセージは両方によって消費されています!! これはばかげているように思えますし、概念的にも正しくありません。コンシューマーを特定の (異なる) パーティションに手動でマップできる方法はありますか (それらが別のパーティションに自動的にマップされていない場合)。

コードは次のとおりです。

以下は、kafka-console-producer を使用して作成したいくつかのメッセージの出力です。

期待されていたのはそれぞれの1つでした。ところで、このトピックk-testには 2 つのパーティションがあります。

0 投票する
1 に答える
15079 参照

apache-kafka - Kafka に接続しようとすると、No Brokers Available エラーが発生します

CentOS で Python クライアントを使用して Kafka 0.10.0.0 にローカルで接続しようとすると、非常に奇妙な問題が発生します。

私の接続オプションは非常にシンプルでデフォルトです。

Kafka の server.properties ファイルでリスナーオプションを手動で設定すると、次のようになります。

curl やその他の Linux を使用して Kafka ブローカー サーバーに簡単に接続できるにもかかわらず、kafka.errors.NoBrokersAvailable を取得します。

アドバタイズされたリスナーやその他の非推奨のアドバタイズ オプションは、問題の解決に役立ちません。したがって、機能している構成の唯一の状態は、リスナーのない状態です。どうにかしてローカル クラスターをセットアップする必要があるため、これは確かに受け入れられません。

このばかげた問題の解決策は簡単なようで、いろいろ考えているのですが、私たち自身では解決できませんでした。

0 投票する
1 に答える
1375 参照

java - Java で Apache Kafka 0.10.0 API を使用して Kafka ブローカー クラスターを作成する

Kafka 0.10 APIPreferred withを使用してブローカー クラスターを作成したいと考えていますJava。私が読んだ限りでは、以下をkafka_2.11-0.10.0.0.jar使用したブローカーの作成をサポートしています:

しかし、そうするためのドキュメントが見つかりません。Kafka API最近、 inを使用してトピックを作成する方法を説明している [1] を読みましたJava。同様のことを行って 、ブローカー クラスターを作成し、パーティションを更新し、既存のデータ/パーティションを新しいブローカーに移行できますか? [2])

[1] API を使用して IDE から Kafka でトピックを作成する方法

[2] https://kafka.apache.org/0100/ops.html#basic_ops_cluster_expansion

0 投票する
1 に答える
1164 参照

apache-kafka - uwsgi pythonアプリでのpython-kafkaプロデューサーのタイムアウト

フラスコ アプリケーションが uwsgi/nginx で実行されているときに、kafka サーバーとの通信に問題があります。コマンドラインで python を使用してアプリケーションを起動すると、すべてがうまくいきます。しかし、uwsgi エンペラーで実行すると、プロデューサーが応答待ちでタイムアウトになります。プロデューサーの作成は期待どおりに機能します。新しいメッセージを送信しようとすると問題が発生するだけです。メッセージがサーバーに届かないことを確認しましたが、応答を待っているときにスローされる例外は、単に「5 秒間待機した後のタイムアウト」です。

この問題のトラブルシューティング方法を教えてください。アプリケーションのすべてが正常に動作しますが、kafka メッセージを送信できません。アプリケーションが他のソケットを介して通信できるようにするために特別な構成が必要ですか?

私が試したこと:

  • 私のini設定でclose-on-execを設定する
  • スレッド化の無効化
  • 単一のプロセスに制限する
0 投票する
6 に答える
41872 参照

python - NoBrokersAvailable: NoBrokersAvailable-Kafka エラー

私はすでにカフカを学び始めています。その上で基本的な操作を試しています。「ブローカー」についてのポイントに固執しました。

私のカフカは実行されていますが、パーティションを作成したいとき。

トレースバック (最新の最後の呼び出し): ファイル ""、1 行目、ファイル "/usr/local/lib/python2.7/dist-packages/kafka/consumer/group.py"、284 行目、 init self._client 内= KafkaClient(metrics=self._metrics, **self.config) ファイル "/usr/local/lib/python2.7/dist-packages/kafka/client_async.py"、202 行目、init self.config['api_version '] = self.check_version(timeout=check_timeout) ファイル "/usr/local/lib/python2.7/dist-packages/kafka/client_async.py"、791 行目、check_version で Errors.NoBrokersAvailable() kafka.errors を発生させます。 NoBrokersAvailable: NoBrokersAvailable

0 投票する
3 に答える
4781 参照

apache-kafka - KafkaException: 間違ったリクエスト タイプ 18

最近、Kafka トピックからのメッセージを消費しようとしましたが、次のエラー メッセージが表示されました。

おそらくエラー メッセージからわかるように、kafka ブローカーは localhost にマップされています。

他の誰かがこの問題を抱えていますか?

0 投票する
1 に答える
623 参照

python - 2 つの DStreams(pyspark) を結合するには?

いくつかの入力トピックを含むカフカ ストリームがあります。これは、kafka ストリームを受け入れるために私が書いたコードです。

次に、元のストリームのキーと値の 2 つの DStream を作成します。

次に、値 DStream でいくつかの計算を実行します。例えば、

ここで、キーと val DStream を組み合わせて、結果を Kafka ストリームの形式で返す必要があります。

対応するキーに val を結合する方法は?

0 投票する
1 に答える
1689 参照

python - python kafka:各メッセージを最初からグループで1回だけ消費する方法

ここではKafka コンシューマ(バージョン 1.3.1)を使用しています。

私が達成しようとしているもの:

  • パーティションは 10 個あります。各パーティションはオフセット 0 から始まります。

  • コンシューマーのグループ (1、2、3 など) があります。

  • 場合によっては、1 つのコンシューマーが停止または稼働していることがあります。

  • そのため、グループのメンバーは変わる可能性があります。しかし、各パーティションの各メッセージは、グループによって 1 回だけ (1 または 2 または 3) 消費される必要があります。

私のコードは次のとおりです。

上記の構成で十分ですか?どんなコメントも歓迎します。ありがとう

アップデート

次のコードを試しました。パーティション 760 では毎回、各メッセージが 1 つのグループの 2 人のコンシューマーによって 2 回消費される可能性があります。なんで?何か問題でも?

出力 1:

別のウィンドウで同じファイルを実行すると、次のように出力されます。