OpenStack は、RabbitMQ をメッセージング システムとして使用します。この目的のために、いくつかの交換とキューがあります。「トピック」タイプの「nova」という名前の交換がメッセージ転送に使用されていることがわかりました。Exchange は、ルーティング キーを使用してメッセージをキューにルーティングします ( http://www.rabbitmq.com/tutorials/amqp-concepts.html )。 ( http://www.rabbitmq.com/img/tutorials/intro/hello-world-example-routing.pngの有用な画像- ここに投稿するには評判が不十分です) OpenStack には、compute、cert、network などのいくつかのキューがあります。同じ名前のルーティング キーを使用します。そこで、メッセージを処理するコンシューマーにバインドするために、これらのルーティング キーを使用していくつかの新しいキューを作成しました。たとえば、「compute」という名前のルーティング キーを使用する「compute」という名前のキューがあります。同じルーティング キーを使用する新しいキュー "my_compute" を作成しました。私はそれがうまくいくはずだと思うので、私はメッセージを受け取ります。
交換に接続し、キューとコンシューマーを作成するコードがいくつかあります。
def connect(params):
connection = kombu.Connection(hostname=params['host'])
exchange = kombu.entity.Exchange(name=params['exchange_name'],
type=params['exchange_type'],
durable=params['exchange_durable'],
auto_delete=params['exchange_auto_delete'],
internal=params['exchange_internal'])
queue_list = []
for queue in params['queues_params']:
queue_list.append(kombu.messaging.Queue(name=queue['name'],
exchange=exchange,
routing_key=queue['routing_key'],
channel=connection.channel(),
durable=queue['durable'],
auto_delete=queue['auto_delete']))
consumer = kombu.messaging.Consumer(channel=connection.channel(),
queues=queue_list,
no_ack=True,
callbacks=[self._process_message])
consumer.consume()
return connection
引数「params」は、json ファイルから取得したマップです。
{
"host" : "xxx",
"exchange_name" : "nova",
"exchange_type" : "topic",
"exchange_durable" : false,
"exchange_auto_delete" : false,
"exchange_internal" : false,
"queues_params" : [
{
"name" : "my_compute",
"routing_key" : "compute",
"durable" : false,
"auto_delete" : false,
"arguments" : [ ]
},
{
"name" : "my_network",
"routing_key" : "network",
"durable" : false,
"auto_delete" : false,
"arguments" : [ ]
},
.
.
.
それは働いています。しかし、 network queue のメッセージしか取得しません。他にメッセージがあるかどうかはわかりませんが、あるようです。私は正しいですか?それとも何かが間違っていますか?他のメッセージはありますか?どうすれば入手できますか?