10

私はすでにカフカを学び始めています。その上で基本的な操作を試しています。「ブローカー」についてのポイントに固執しました。

私のカフカは実行されていますが、パーティションを作成したいとき。

 from kafka import TopicPartition
(ERROR THERE) consumer = KafkaConsumer(bootstrap_servers='localhost:1234')
 consumer.assign([TopicPartition('foobar', 2)])
 msg = next(consumer)

トレースバック (最新の最後の呼び出し): ファイル ""、1 行目、ファイル "/usr/local/lib/python2.7/dist-packages/kafka/consumer/group.py"、284 行目、 init self._client 内= KafkaClient(metrics=self._metrics, **self.config) ファイル "/usr/local/lib/python2.7/dist-packages/kafka/client_async.py"、202 行目、init self.config['api_version '] = self.check_version(timeout=check_timeout) ファイル "/usr/local/lib/python2.7/dist-packages/kafka/client_async.py"、791 行目、check_version で Errors.NoBrokersAvailable() kafka.errors を発生させます。 NoBrokersAvailable: NoBrokersAvailable

4

6 に答える 6

27

カフカストリーミング中に同じエラーが発生しました。以下のコードは私のエラーを解決しました: KafkaProducer で API バージョンを定義する必要があります。

KafkaProducer(bootstrap_servers=['localhost:9092'],
              api_version=(0,11,5),
              value_serializer=lambda x: dumps(x).encode('utf-8'))
于 2019-06-04T18:34:32.760 に答える
5

私にとっての問題は、Google Cloudで Kafka を実行しているため、ファイアウォール ルールでした。

昨日はうまくいきましたが、今日はなぜもううまくいかないのかと頭を悩ませていました。

ローカル システムのパブリック IP アドレスは、別の LAN または WiFi に接続するたびに変わるため、ファイアウォール ルールでローカル システムのパブリック IP を許可する必要がありました。固定パブリック IP との接続を使用するか、接続を切り替える/変更するたびにこれを確認することをお勧めします。

これらの構成の小さな変更は、デバッグして修正するのに時間がかかりすぎます。このために1時間を無駄にしたように感じました。

于 2019-10-22T08:45:59.270 に答える
3

パーティションを作成する代わりに、メッセージの消費を開始したいようです。それにもかかわらず、ポート 1234 で kafka に到達できますか? 9092 は kafkas のデフォルト ポートで、これを試すことができます。適切なポートが見つかったにもかかわらずアプリケーションでエラーが発生する場合は、コンソール コンシューマーを使用してセットアップをテストできます。

bin/kafka-console-producer.sh --broker-list localhost:<yourportnumber> --topic foobar

コンソール コンシューマーは、標準の kafka ディストリビューションの一部です。そうすれば、問題の原因に少し近づくことができるかもしれません。

于 2016-08-09T20:01:04.370 に答える