0

私は助けを探しています。メッセージが実際にサーバーに配信されなかった場合にinsert_order_queue()メッセージを再送信できるように機能を修正したいと考えています。RabbitMQ

これは私の現在のコードです:

def insert_order_queue(self, msg):
    ''' Insert message into the queue '''
    if msg:
        msg_props = pika.BasicProperties(delivery_mode=conf.rabbit_msg_props_delivery_mode,
                                         content_type=conf.rabbit_msg_props_content_type)
        logger.info('Message : %s' % msg) 
        try:
            self.channel.basic_publish(body=json.dumps(msg),
                                       exchange=conf.rabbit_exchange_name,
                                       properties=msg_props,
                                       routing_key=conf.rabbit_exchange_routing_key) 
        except (pika.exceptions.AMQPConnectionError, pika.exceptions.AMQPChannelError), error:
            logger.error('AMQP Connection failed. Trying again... %s' % error)
            self._connect()
            return
    else:
        logger.error('Something wrong') 

そして、これが私の_connect()方法です:

def _connect(self):
    ''' Connecting to the RabbitMQ, and declare queue '''
    logger.info('Trying to connect to RabbitMQ')
    while True:
        try:
            conn_broker = pika.BlockingConnection(
                pika.ConnectionParameters(
                    host=conf.rabbit_server,
                    port=conf.rabbit_port,
                    virtual_host=conf.rabbit_vhost,
                    ssl=conf.rabbit_ssl, # do not set it to True if there is no ssl!
                    heartbeat_interval=conf.rabbit_heartbeat_interval,
                    credentials=pika.PlainCredentials(
                        conf.rabbit_user,
                        conf.rabbit_pass)))
            logger.info('Successfully connected to Rabbit at %s:%s' % (conf.rabbit_server, conf.rabbit_port)) 
            channel = conn_broker.channel()
            # Don't dispatch a new message to a worker until it has processed and acknowledged the previous one
            channel.basic_qos(prefetch_count=conf.rabbit_prefetch_count)
            status = channel.queue_declare(queue=conf.rabbit_queue_name,
                                           durable=conf.rabbit_queue_durable,
                                           exclusive=conf.rabbit_queue_exclusive,
                                           passive=conf.rabbit_queue_passive)
            if status.method.message_count == 0:
                logger.info("Queue empty")
            else:
                logger.info('Queue status: %s' % status)                  
            channel.queue_bind(
                queue=conf.rabbit_queue_name,
                exchange=conf.rabbit_exchange_name,
                routing_key=conf.rabbit_exchange_routing_key)  
            return channel
        except (pika.exceptions.AMQPConnectionError, pika.exceptions.AMQPChannelError), error:
            time.sleep(3)
            logger.error('Exception while connecting to Rabbit %s' % error)
        else:
            break 
4

1 に答える 1