8

私は Python Kafka コンシューマを実行しています ( http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.htmlで kafka.consumer.SimpleConsumer または kafka.consumer.simple.SimpleConsumer を使用しようとしています)。次のコードを実行すると、すべてのメッセージが消費されたとしても、常に実行されます。すべてのメッセージを消費する場合、消費者が停止することを願っています。どうやってするの?また、stop() 関数 (基本クラス kafka.consumer.base.Consumer にあります) の使用方法もわかりません。

アップデート

シグナル ハンドラを使用して、consumer.stop() を呼び出しました。一部のエラー メッセージが画面に出力されました。しかし、プログラムはまだ for ループでスタックしていました。新しいメッセージが届くと、消費者はそれらを消費して印刷しました。client.close() も試しました。でも同じ結果。

for ループを適切に停止するには、いくつかの方法が必要です。

        client = KafkaClient("localhost:9092")
        consumer = SimpleConsumer(client, "test-group", "test")

        consumer.seek(0, 2)# (0,2) and (0,0)

        for message in consumer:
            print "Offset:", message.offset
            print "Value:", message.message.value

どんな助けでも大歓迎です。ありがとう。

4

4 に答える 4

2

待機時間を設定するには、iter_timeout パラメータを使用します。次のコードのように 10 に設定すると、10 秒以内に新しいメッセージが来なければ終了します。デフォルト値は None です。これは、新しいメッセージが来なくても、コンシューマーがここでブロックすることを意味します。

        self.consumer = SimpleConsumer(self.client, "test-group", "test",
                iter_timeout=10)

アップデート

上記は良い方法ではありません。大量のメッセージが入ってくると、停止を保証するのに十分小さい iter_timeout を設定するのは困難です。そのため、現在、1 つのメッセージを消費して停止しようとする get_message() 関数を使用しています。新しいメッセージがない場合は、None が返されます。

于 2015-08-06T15:52:22.980 に答える
1

Mohitの答えと同様の解決策end_offsetsですが、消費者の機能を使用しています。

from kafka import KafkaConsumer, TopicPartition

# settings
client = "localhost:9092"
topic = 'test'

# prepare consumer
tp = TopicPartition(topic,0)
consumer = KafkaConsumer(client)
consumer.assign([tp])
consumer.seek_to_beginning(tp)  

# obtain the last offset value
lastOffset = consumer.end_offsets([tp])[tp]

for message in consumer:
    print "Offset:", message.offset
    print "Value:", message.message.value
    if message.offset == lastOffset - 1:
        break
于 2019-01-10T10:27:44.690 に答える