0

大きなファイルを Kafka に保存し、レコードに関するメタデータを使用して将来それらを取得したいと考えています。

そこで、トピック、partition_id、offset を含むメッセージを送信してから、次の方法でファイルを取得しようとします。

def retrieve_file_from_kafka(topic_name, partition_id, offset):
    client = KafkaClient(hosts=BROKER_ADDRESS, broker_version="0.10.1.0")
    topic = client.topics[bytes(topic_name, "UTF-8")]

    consumer = topic.get_balanced_consumer(
    consumer_group=bytes("file_retrieve" + topic_name + str(partition_id) + str(offset), "UTF-8"))
    consumer.reset_offsets([(topic.partitions[partition_id], offset)])
    return consumer.consume()

ただし、機能せず、次のように出力されます。

Offset reset for partition 0 to timestamp 8 failed. Setting partition 0's internal counter to 8

このエラーは非常にわかりにくく、reset_offsets で発生します。消費しようとすると、プロセスは rebalancing_lock を待ってスタックします。私は何を間違っていますか?

4

0 に答える 0