0

公式の Confluent Kafka Python API の基本的な使用法でエラーが発生します。

私は購読します:

kafka_consumer.subscribe(topics=["my-avro-topic"], on_assign=on_assign_callback, on_revoke=on_revoke_callback)

コールバックの使用:

def on_assign_callback(consumer, topic_partitions):
for topic_partition in topic_partitions:
    print("without position. topic={}. partition={}. offset={}. error={}".format(topic_partition.topic, topic_partition.partition,
                                   topic_partition.offset, topic_partition.error))

topic_partitions_with_offsets = consumer.position(topic_partitions)
print("assigned to {}->{} partitions".format(len(topic_partitions), len(topic_partitions_with_offsets)))

for topic_partition in topic_partitions_with_offsets:
    print("with position. topic={}. partition={}. offset={}. error={}".format(topic_partition.topic, topic_partition.partition,
                                   topic_partition.offset, topic_partition.error))

これにより、コンソール出力が生成されます。

without position. topic=my-avro-topic. partition=0. offset=-1001. error=None
assigned to 1->1 partitions
with position. topic=my-avro-topic. partition=0. offset=-1001. error=KafkaError{code=_UNKNOWN_PARTITION,val=-190,str="(null)"}

誰かがこれを説明できますか?不明なパーティションでコールバック通知を受け取るのはなぜですか? 同様のコードは、Java API を使用して完全に機能します。

4

1 に答える 1

5

これは、基礎となる C ライブラリ librdkafka のバグです。アップストリームの問題を参照してください。

保存されたオフセットから消費を開始したい場合、それらを取得するために実際に position() を呼び出す必要はありません。デフォルトのオフセット -1001 を変更しなければ、クライアントは自動的にこれを行います。

于 2016-06-15T09:17:39.540 に答える