10

以下のコードを使用して、トピックからメッセージを読み取ります。私は2つの問題に直面しています。コンシューマーを起動するたびに、キュー内のすべてのメッセージを読み取っていますか? 未読メッセージだけを読むにはどうすればよいですか?

from kafka import KafkaConsumer


consumer = KafkaConsumer('my-topic',
                         group_id='my-group',
                         bootstrap_servers=['localhost:9092'])
for message in consumer:
    consumer.commit() 
    # message value and key are raw bytes -- decode if necessary!
    # e.g., for unicode: `message.value.decode('utf-8')`
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))
4

1 に答える 1

11

@Kenjiが言ったように、でオフセットをコミットする必要がありますconsumer.commit()。手動でコミットしたくない場合は、に渡すことで自動コミットを有効にできenable_auto_commit=TrueますKafkaConsumerauto_commit_interval_ms各自動コミット間のミリ秒単位の間隔を調整することもできます。ここを参照してください: http://kafka-python.readthedocs.org/en/master/apidoc/KafkaConsumer.html .

于 2016-01-09T12:31:19.967 に答える