Python から Kafka を読み込もうとしていますが、受信メッセージは None で、CLI にエラーはありません。パテ経由で宛先ホストへのポート転送を使用しており、telnet 経由でポートをテストするよりも問題ありません。さらに、私は Debian (WSL) で kafkacat を使用していますが、問題なく動作します。
kafkacat -C -b localhost:9092 -t topic1 -p 0 -o beginning -s avro -r http://localhost:28081
私は PyCharm を使用しています。私のコードは本文の下にあります。どうすればデバッグできますか?
from confluent_kafka.avro import AvroConsumer
from confluent_kafka import TopicPartition
from confluent_kafka.avro.serializer import SerializerError
topics = ['topic1', 'topic2']
c = AvroConsumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'mygroup',
'auto.offset.reset': 'smallest',
'schema.registry.url': 'http://localhost:28081',
'api.version.request': True
})
c.subscribe(topics)
tp = TopicPartition(topics[0], 0, 0)
c.assign([tp])
while True:
try:
msg = c.poll(1)
except SerializerError as e:
print("Message deserialization failed for {}: {}".format(msg, e))
break
if msg is None:
print('Message None')
continue
if msg.error():
print("AvroConsumer error: {}".format(msg.error()))
continue
print(msg.value())
c.close()
なので