公式の Confluent Kafka Python API の基本的な使用法でエラーが発生します。
私は購読します:
kafka_consumer.subscribe(topics=["my-avro-topic"], on_assign=on_assign_callback, on_revoke=on_revoke_callback)
コールバックの使用:
def on_assign_callback(consumer, topic_partitions):
for topic_partition in topic_partitions:
print("without position. topic={}. partition={}. offset={}. error={}".format(topic_partition.topic, topic_partition.partition,
topic_partition.offset, topic_partition.error))
topic_partitions_with_offsets = consumer.position(topic_partitions)
print("assigned to {}->{} partitions".format(len(topic_partitions), len(topic_partitions_with_offsets)))
for topic_partition in topic_partitions_with_offsets:
print("with position. topic={}. partition={}. offset={}. error={}".format(topic_partition.topic, topic_partition.partition,
topic_partition.offset, topic_partition.error))
これにより、コンソール出力が生成されます。
without position. topic=my-avro-topic. partition=0. offset=-1001. error=None
assigned to 1->1 partitions
with position. topic=my-avro-topic. partition=0. offset=-1001. error=KafkaError{code=_UNKNOWN_PARTITION,val=-190,str="(null)"}
誰かがこれを説明できますか?不明なパーティションでコールバック通知を受け取るのはなぜですか? 同様のコードは、Java API を使用して完全に機能します。