1

Python で kafka トピックを消費し、prometheus クライアントを使用して http 経由で提供しようとしていますが、トピックの消費がブロックされているようです。単純にメトリックを追加するためにいくつかのプレースホルダーを配置しましたが、その部分がブロックされているようです。

import os
from pykafka import KafkaClient
import threading
from kafka import KafkaConsumer
from prometheus_client import start_http_server, Metric, REGISTRY

class CustomCollector(threading.Thread):
    daemon = True

    def collect(self):
        client = KafkaClient(hosts=os.environ['KAFKA_ADDRESS'])
        topic = client.topics[b'os.environ['KAFKA_TOPIC']
        consumer = topic.get_simple_consumer()
        for message in consumer:
            if message is not None:
                print(message.value)

        metric = Metric('test_name', 'description', 'summary')
        metric.add_sample('test_name', 'description', 'summary')
        yield metric

if __name__ == '__main__':
    start_http_server(9998)
    REGISTRY.register(CustomCollector())
    while True: time.sleep(1)

コードを実行すると、トピック データが期待どおりにコンソールにストリーミングされていることがわかります。ただし、メトリック エンドポイントにはデータが入力されず、Web サーバーへのリクエストはアプリを強制終了するまでハングします。アプリはライブラリからの標準メトリックで応答します。

4

1 に答える 1