1

これが事です。

送信されたすべての電子メールの重要な情報を処理するために、PHP AMQP を使用して Rabbitmq から結果キューを読み取っています。これが完了したら、そのメッセージを削除するか、書き込み済みとしてマークする必要があるため、次にキューを読み取ったときにメッセージが処理済みにならないようにします。

Rabbitmq サーバーは 1 時間に 10,000 通を超えるメールを送信しているため、結果の送信を処理するためにキューを読み取るたびに、スクリプトはキュー内のすべてのメッセージを処理するために少なくとも 5 分間実行できます。この 5 分間に数百件の新しいメッセージが送信されました。これにより、スクリプトの実行中に処理されていないメッセージの場所が削除されるため、スクリプトの終了後にキューをパージすることができなくなります。

それは私に1つの選択肢しか残していません。AMQP スクリプトによって処理または読み取られた直後に、メッセージをマークまたは削除します。

それを行う方法はありますか?(スクリプトはこちら)

<?php
/**
 *  result.php
 *  Script that connects to RabbitMQ, and takes the result message from
 *  the result message queue.
 */

// include the settings
 require_once('settings.php');

// try to set up a connection to the RabbitMQ server
try
{
    // construct the connection to the RabbitMQ server
    $connection = new AMQPConnection(array(
        'host'      =>  $hostname,
        'login'     =>  $username,
        'password'  =>  $password,
        'vhost'     =>  $vhost
    ));

    // connect to the RabbitMQ server
    $connection->connect();
}
catch (AMQPException $exception)
{
    echo "Could not establish a connection to the RabbitMQ server.\n";
}

// try to create the channel
try
{
    // open the channel
    $channel = new AMQPChannel($connection);
}
catch (AMQPConnectionException $exception)
{
    echo "Connection to the broker was lost (creating channel).\n";
}

// try to create the queue
try
{
    // create the queue and bind the exchange
    $queue   = new AMQPQueue($channel);
    $queue->setName($resultbox);
    $queue->setFlags(AMQP_DURABLE);
    $queue->bind('exchange1', 'key1');
    $queue->declare();
}
catch (AMQPQueueException $exception)
{
    echo "Channel is not connected to a broker (creating queue).\n";
}
catch (AMQPConnectionException $exception)
{
    echo "Connection to the broker was lost. (creating queue)/\n";
}

// Get the message from the queue. 
while ($envelope = $queue->get()) {
    //Function that processes the message
    process_message($envelope->getBody());
}
    $queue->purge();

// done, close the connection to RabbitMQ
$connection->disconnect();
?>
4

1 に答える 1

3

処理が成功した後にメッセージを確認するか、フラグ$queue->ack()を付けてそれらを消費/取得します。AMQP_AUTOACK

更新:

あなたのコードに基づいて:

1. メッセージの確認
while ($envelope = $queue->get()) {
    //Function that processes the message
    process_message($envelope->getBody());
    $queue->ack($envelope->getDeliveryTag());
}
2.AMQP_AUTOACKフラグで取得:
while ($envelope = $queue->get(AMQP_AUTOACK)) {
    //Function that processes the message
    process_message($envelope->getBody());
}

PS:

AMQPQueue::consumeのドキュメントを確認してください。こちらの方が適しているようです。

3. 処理後にメッセージを消費して確認できます。
$queue->consume(function ($envelope, $queue) {
        process_message($envelope->getBody());
        $queue->ack($envelope->getDeliveryTag());
});
4.またはAMQP_AUTOACKフラグを使用して消費しますが、処理が失敗すると、メッセージを再度処理できなくなります。
$queue->consume(function ($envelope, $queue) {
        process_message($envelope->getBody());
        $queue->ack($envelope->getDeliveryTag());
}, AMQP_AUTOACK);

結論: #3 の解決策を使用することをお勧めしますが、それはあなた次第です。

于 2013-07-25T14:00:46.893 に答える