2

kafka イベントを消費するプログラムを作成しました。10秒後に終了したいデーモンがあります。

def kafkaConsumer():
consumer = KafkaConsumer(sys.argv[1],group_id='test-consumer-group',bootstrap_servers=sys.argv[2].split(','))
schema_path=sys.argv[3]
schema = avro.schema.parse(open(schema_path).read())

for msg in consumer:
    value = bytearray(msg.value)
    bytes_reader = io.BytesIO(value[9:])
    decoder = avro.io.BinaryDecoder(bytes_reader)
    reader = avro.io.DatumReader(schema)
    try:
        event = reader.read(decoder)
    except:
        pass
    eventInJsonFormat=json.dumps(event)
    print(eventInJsonFormat)

if __name__ == '__main__':
run_thread = Thread(target=kafkaConsumer())
run_thread.daemon = True
run_thread.start()
time.sleep(10)

インデントは無視してください。
しかし、このプログラムは 10 秒後に終了しません。ここで何が欠けているのか知りたいですか?

4

1 に答える 1

0

コンシューマーに 10 秒のタイムアウトを追加します。

consumer = KafkaConsumer(sys.argv[1],
consumer_timeout_ms=10000
group_id='test-consumer-group',
bootstrap_servers=sys.argv[2].split(','),
)
于 2018-01-12T09:25:46.597 に答える