2

Producer がトピック交換にメ​​ッセージを送信しています。各メッセージにはルーティング キーが含まれています。(初歩的な図で申し訳ありません)

       P
       |
       X
     /| |\
    / | | \
   /  | |  \
   Q1 Q2 Q3 Q4
   |   / /  /
   |  / /  /
   | / /  /
   |/ /  /
      C

私はphp-amqplibを使用しており、多数のキューを消費しようとしています。私が達成しようとしているのは、各キューを順番にテストし、メッセージがあるかどうかを確認し、メッセージがある場合は処理し、そうでない場合は次のキューに移動することです。また、メッセージが見つかった場合は、再度 Q1 からチェックを開始してください。次のコードは機能しませんが、私がやりたいことのロジックを示します。

$connection = new AMQPConnection(HOST, PORT, USER, PASS, VHOST);
$channel = $connection->channel();

$channel->exchange_declare('myexchange', 'topic', false, false, false);

$channel->queue_declare("Q1", false, true, false, false);
$channel->queue_bind("Q1", 'myexchange', 'priority.1');

$channel->queue_declare("Q2", false, true, false, false);
$channel->queue_bind("Q2", 'myexchange', 'priority.2');

$channel->queue_declare("Q3", false, true, false, false);
$channel->queue_bind("Q3", 'myexchange', 'priority.3');

$channel->queue_declare("DFQ4", false, true, false, false);
$channel->queue_bind("DFQ4", 'myexchange', 'priority.4');

$queues = array('Q1','Q2','Q3','Q4');

$priority = 0;
while (1) {

    $priority = ($priority<4)? $priority+1 : 0;

    $msg = $channel->basic_consume($queues[$priority], $consumer_tag, false, false, false, false);
    if(isset($msg->body)) {
        echo ' [x] ',$msg->delivery_info['routing_key'], "\n";
        $channel->basic_ack($msg->delivery_info['delivery_tag']);
        $priority = 0;
    }
}

$channel->close();
$connection->close();
4

1 に答える 1