0

Python から Kafka を読み込もうとしていますが、受信メッセージは None で、CLI にエラーはありません。パテ経由で宛先ホストへのポート転送を使用しており、telnet 経由でポートをテストするよりも問題ありません。さらに、私は Debian (WSL) で kafkacat を使用していますが、問題なく動作します。

kafkacat -C -b localhost:9092 -t topic1 -p 0 -o beginning -s avro -r http://localhost:28081

私は PyCharm を使用しています。私のコードは本文の下にあります。どうすればデバッグできますか?

from confluent_kafka.avro import AvroConsumer
from confluent_kafka import TopicPartition
from confluent_kafka.avro.serializer import SerializerError

topics = ['topic1', 'topic2']
c = AvroConsumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'mygroup',
    'auto.offset.reset': 'smallest',
    'schema.registry.url': 'http://localhost:28081',
    'api.version.request': True
})

c.subscribe(topics)
tp = TopicPartition(topics[0], 0, 0)
c.assign([tp])

while True:
    try:
        msg = c.poll(1)

    except SerializerError as e:
        print("Message deserialization failed for {}: {}".format(msg, e))
        break

    if msg is None:
        print('Message None')
        continue

    if msg.error():
        print("AvroConsumer error: {}".format(msg.error()))
        continue

    print(msg.value())

c.close()

なので

4

1 に答える 1