7

私のスタックは gevents の uwsgi です。APIエンドポイントをデコレータでラップして、すべてのリクエストデータ(URL、メソッド、ボディ、およびレスポンス)をkafkaトピックにプッシュしようとしていますが、うまくいきません。私の理論は、私がgeventsを使用していて、これらを非同期モードで実行しようとしているからです。実際にkafkaにプッシュする非同期スレッドは、geventsで実行できません。また、メソッドを同期させようとすると、それも機能しません。プロデュースワーカーで終了します。つまり、プロデュース後に呼び出しが返されません。どちらの方法もPythonシェルでうまく機能しますが、スレッドでuwsgiを実行すると.

サンプルコードに従います: 1. with kafka-python (async)

    try:
        kafka_producer = KafkaProducer(bootstrap_servers=KAFKAHOST.split(','))
    except NoBrokersAvailable:
        logger.info(u'Kafka Host not available: {}'.format(KAFKAHOST))
        kafka_producer = None


    def send_message_to_kafka(topic, key, message):
        """
        :param topic: topic name
        :param key: key to decide partition
        :param message: json serializable object to send
        :return:
        """
        if not kafka_producer:
            logger.info(u'Kafka Host not available: {}'.format(KAFKAHOST))
            return
        data = json.dumps(message)
        try:
            start = time.time()
            kafka_producer.send(topic, key=str(key), value=data)
            logger.info(u'Time take to push to Kafka: {}'.format(time.time() - start))
        except KafkaTimeoutError as e:
            logger.info(u'Message not sent: {}'.format(KAFKAHOST))
            logger.info(e)
            pass
        except Exception as e:
            logger.info(u'Message not sent: {}'.format(KAFKAHOST))
            logger.exception(e)
            pass
  1. py-kafka (同期):

    try:
        client = KafkaClient(hosts=KAFKAHOST)
    except Exception as e:
        logger.info(u'Kafka Host Not Found: {}'.format(KAFKAHOST))
        client = None
    
    
    def send_message_to_kafka(topic, key, message):
        """
        :param topic: topic name
        :param key: key to decide partition
        :param message: json serializable object to send
        :return:
        """
        if not client:
            logger.info(u'Kafka Host is None')
            return
        data = json.dumps(message)
        try:
            start = time.time()
            topic = client.topics[topic]
            with topic.get_sync_producer() as producer:
                producer.produce(data, partition_key='{}'.format(key))
            logger.info(u'Time take to push to Kafka: {}'.format(time.time() - start))
        except Exception as e:
            logger.exception(e)
            pass
    
4

1 に答える 1