12

KafaConsumer最初から、または他の明示的なオフセットから読み取るのに 問題があります。

同じトピックのコンシューマー向けのコマンド ライン ツールを実行すると、--from-beginningオプションを含むメッセージが表示され、それ以外の場合はハングします

$ ./kafka-console-consumer.sh --zookeeper {localhost:port} --topic {topic_name} --from-beginning

Pythonで実行するとハングします。これは、消費者の構成が正しくないことが原因であると思われます

consumer = KafkaConsumer(topic_name,
                     bootstrap_servers=['localhost:9092'],
                     group_id=None,
                     auto_commit_enable=False,
                     auto_offset_reset='smallest')

print "Consuming messages from the given topic"
for message in consumer:
    print "Message", message
    if message is not None:
        print message.offset, message.value

print "Quit"

出力:

指定されたトピックからのメッセージを消費する (その後ハングする)

私は kafka-python 0.9.5 を使用しており、ブローカーは kafka 8.2 を実行しています。正確な問題が何であるかはわかりません。

コンソール コンシューマーの動作をエミュレートするには、dpkp の提案に従って _group_id=None_ を設定します。

4

6 に答える 6

10

console-consumer と投稿した python コンシューマー コードの違いは、python コンシューマーがコンシューマー グループを使用してオフセットを保存することですgroup_id="test-consumer-group"。代わりに group_id=None を設定すると、コンソール コンシューマと同じ動作が見られるはずです。

于 2016-03-15T04:33:49.610 に答える
0

以前に同じ問題に直面したため、テストするコードを実行しているマシンでローカルに kafka-topics を実行したところ、UnknownHostException が発生しました。IP とホスト名をhostsファイルに追加したところ、kafka-topics とコードの両方で正常に機能しました。メッセージをフェッチしようとしていたようですKafkaConsumerが、例外を発生させずに失敗しました。

于 2021-05-24T14:29:13.867 に答える