1

次の例のように、RabbitMQを使用してPHPでRPCサービスを構築しようとしています。http://www.rabbitmq.com/tutorials/tutorial-six-java.html このPECL拡張機能を使用しています:http ://pecl。 php.net/package/amqp(バージョン1.0.3)

問題は、フラグAMQP_EXCLUSIVEをサーバーに追加すると、コールバックキュー(クライアントスクリプトで宣言)がサーバーに対してロックされることです。

これが私のサーバーです

// connect to server
$cnn = new AMQPConnection('...');
$cnn->connect();
$channel = new AMQPChannel($cnn);
// create exchange
$exchangeName = 'k-exchange';
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();

// declare queue to consume messages from
$queue = new \AMQPQueue($channel);
$queue->setName('tempQueue');
$queue->declare();

// start consuming messages
$queue->consume(function($envelope, $queue)
    use ($channel, $exchange) {

    // create callback queue
    $callbackQueue = new \AMQPQueue($channel);
    $callbackQueue->setName($envelope->getReplyTo());
    $callbackQueue->setFlags(AMQP_EXCLUSIVE); // set EXCLUSIVE flag

    /* WARNING: Following code line causes error. See rabbit logs below:
     *  connection <0.1224.10>, channel 1 - error:
     *  {amqp_error,resource_locked,
     *  "cannot obtain exclusive access to locked queue 'amq.gen-Q6J...' in vhost '/'",
     *  'queue.bind'}
     */
    $callbackQueue->bind($exchange->getName(), 'rpc_reply');

    // trying to publish response back to client's callback queue
    $exchange->publish(
        json_encode(array('processed by remote service!')),
        'rpc_reply',
        AMQP_MANDATORY & AMQP_IMMEDIATE
    );

    $queue->ack($envelope->getDeliveryTag());
});

そしてこれが私のClient.phpです

// connect to server
$cnn = new AMQPConnection('...');
$cnn->connect();
$channel = new AMQPChannel($cnn);
// create exchange
$exchangeName = 'k-exchange';
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();

// create a queue which we send messages to server via
$queue = new \AMQPQueue($channel);
$queue->setName('tempQueue');
$queue->declare();

// binding exchange to queue
$queue->bind($exchangeName, 'temp_action');

// create correlation_id
$correlationId = sha1(time() . rand(0, 1000000));

// create anonymous callback queue to get server response response via
$callbackQueue = new \AMQPQueue($channel);
$callbackQueue->setFlags(AMQP_EXCLUSIVE); // set EXCLUSIVE flag
$callbackQueue->declare();

// publishing message to exchange (passing it to server)
$exchange->publish(
    json_encode(array('process me!')),
    'temp_action',
    AMQP_MANDATORY,
    array(
        'reply_to' => $callbackQueue->getName(), // pass callback queue name
        'correlation_id' => $correlationId
    )
);

// going to wait for remote service complete tasks. tick once a second
$attempts = 0;
while ($attempts < 5)
{
    echo 'Attempt ' . $attempts . PHP_EOL;
    $envelope = $callbackQueue->get();
    if ($envelope) {
        echo 'Got response! ';
        print_r($envelope->getBody());
        echo PHP_EOL;
        exit;
    }

    sleep(1);
    $attempts++;
}

したがって、最終的には、RabbitMQのログにエラーが表示されます。

connection <0.1224.10>, channel 1 - error:
{amqp_error,resource_locked,
    "cannot obtain exclusive access to locked queue 'amq.gen-Q6J...' in vhost '/'", 
    'queue.bind'}

質問:Server.phpでcallbackQueueオブジェクトを作成する適切な方法は何ですか?私のServer.phpは、RabbitMQサーバーへのClient.php接続とは異なるようです。ここで何をすればいいですか?Server.php側で同じ(Client.phpの)接続を「共有」するにはどうすればよいですか。

更新 ここにいくつかのRabbitMQログがあります

私のServer.php接続(Idは:<0.22322.27>)

=INFO REPORT==== 20-Jun-2012::13:30:22 ===
    accepting AMQP connection <0.22322.27> (127.0.0.1:58457 -> 127.0.0.1:5672)

私のClient.php接続(Idは:<0.22465.27>)

=INFO REPORT==== 20-Jun-2012::13:30:38 ===
    accepting AMQP connection <0.22465.27> (127.0.0.1:58458 -> 127.0.0.1:5672)

これで、Server.phpがエラーを引き起こすことがわかります。

=ERROR REPORT==== 20-Jun-2012::13:30:38 ===
    connection <0.22322.27>, channel 1 - error:
{amqp_error,resource_locked,
"cannot obtain exclusive access to locked queue 'amq.gen-g6Q...' in vhost '/'",
'queue.bind'}

Client.phpとServer.phpは同じIDとの接続を共有していないため、私の仮定では、Client.phpで宣言された排他キューを両方で使用することは不可能です

4

3 に答える 3

0

サーバー上で、キューを排他的として宣言する必要もあります。RabbitMQ キューには同じフラグが必要です。たとえば、「durable」に設定されているキューを宣言する場合、相手側もキューを「durable」と宣言する必要があるため、サーバーで$callbackQueue->setFlags(AMQP_EXCLUSIVE);クライアントからのようなフラグを設定します。

于 2012-06-20T03:58:56.937 に答える
0

この質問に対する私の回答は、RabbitMQ 公式メーリング リストで回答されました

ここでは同じライブラリを使用していませんが、PHP に移植された公式のチュートリアルがあります。

https://github.com/rabbitmq/rabbitmq-tutorials/tree/master/php

コードの問題は、さまざまなオプションでキューを宣言していることです。

したがって、ある回答が言うように、キュー A を永続的であると宣言した場合、そのキューの他のすべての宣言は永続的でなければなりません。専用フラグも同様。

また、メッセージを発行するためにキューを再宣言する必要はありません。RPC サーバーとして、'reply_to' プロパティで送信されたアドレスが既に存在していると想定します。返信を待っているキューが既に存在することを確認するのは、RpcClientの責任だと思います。

補遺:

キューの排他性は、キューを宣言したチャネルのみがキューにアクセスできることを意味します。

于 2012-06-21T08:38:07.347 に答える