1

私は 2 つのサーバーを持っており、それらを A と B と呼びます。B は RabbitMQ を実行し、A は Kombu 経由で RabbitMQ に接続します。B で RabbitMQ を再起動すると、kombu 接続が切断され、メッセージが配信されなくなります。次に、A のプロセスをリセットして、接続を再確立する必要があります。より良いアプローチはありますか?つまり、RabbitMQ プロセスが再起動された場合でも、Kombu が自動的に再接続する方法はありますか?

私の基本的なコードの実装は以下のとおりです。よろしくお願いします。:)

def start_consumer(routing_key, incoming_exchange_name, outgoing_exchange_name):
    global rabbitmq_producer

    incoming_exchange = kombu.Exchange(name=incoming_exchange_name, type='direct')
    incoming_queue = kombu.Queue(name=routing_key+'_'+incoming_exchange_name, exchange=incoming_exchange, routing_key=routing_key)#, auto_delete=True)

    outgoing_exchange = kombu.Exchange(name=outgoing_exchange_name, type='direct')
    rabbitmq_producer = kombu.Producer(settings.rabbitmq_connection0, exchange=outgoing_exchange, serializer='json', compression=None, auto_declare=True)

    settings.rabbitmq_connection0.connect()
    if settings.rabbitmq_connection0.connected:
        callbacks=[]
        queues=[]

        callbacks.append(callback)
        # if push_queue:
        #   callbacks.append(push_message_callback)
        queues.append(incoming_queue)

        print 'opening a new *incoming* rabbitmq connection to the %s exchange for the %s queue' % (incoming_exchange.name, incoming_queue.name)
        incoming_exchange(settings.rabbitmq_connection0).declare()
        incoming_queue(settings.rabbitmq_connection0).declare()

        print 'opening a new *outgoing* rabbitmq connection to the %s exchange' % outgoing_exchange.name
        outgoing_exchange(settings.rabbitmq_connection0).declare()

        with settings.rabbitmq_connection0.Consumer(queues=queues, callbacks=callbacks) as consumer:
            while True:
                settings.rabbitmq_connection0.drain_events()
4

1 に答える 1

0

コンシューマー側では、kombu.mixins.ConsumerMixinは、接続が切断されたときに再接続を処理します (また、ハートビートなども実行し、コードの記述を減らすことができます)。残念ながらはないようですがProducerMixin、コードを掘り下げて適応させることができる可能性があります...?

于 2016-08-09T14:17:48.287 に答える