17

を使用してキューにサブスクライブする単純なパブリッシャーとコンシューマーを作成しましたbasic.consume

ジョブが例外なく実行されると、コンシューマーはメッセージを確認します。例外が発生するたびに、メッセージに確認応答せず、早期に戻ります。承認されたメッセージのみがキューから消えるため、正しく機能します。
ここで、コンシューマーに失敗したメッセージを再度取得してもらいたいのですが、それらのメッセージを再利用する唯一の方法は、コンシューマーを再起動することです。

このユースケースにどのようにアプローチする必要がありますか?

セットアップコード

$channel = new AMQPChannel($connection);

$exchange = new AMQPExchange($channel);

$exchange->setName('my-exchange');
$exchange->setType('fanout');
$exchange->declare();

$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declare();
$queue->bind('my-exchange');

消費者コード

$queue->consume(array($this, 'callback'));

public function callback(AMQPEnvelope $msg)
{
    try {
        //Do some business logic
    } catch (Exception $ex) {
        //Log exception
        return;
    }
    return $queue->ack($msg->getDeliveryTag());
}

生産者コード

$exchange->publish('message');
4

2 に答える 2

24

メッセージが確認されず、アプリケーションが失敗した場合、メッセージは自動的に再配信redeliveredされ、エンベロープのプロパティが に設定されます (フラグtrueを付けて消費しない限り)。no-ack = true

更新:

nackcatch ブロックで再配信フラグを使用してメッセージを送信する必要があります

    try {
        //Do some business logic
    } catch (Exception $ex) {
        //Log exception
        return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE);
    }

RabbitMQ および AMQP プロトコルでは再配信カウントがまったく実装されていないため、メッセージが無限にナッキングされることに注意してください。

そのようなメッセージを台無しにしたくなく、単に遅延を追加したい場合は、メソッド呼び出しの前sleep()またはusleep()前に遅延を追加したいかもしれませnackんが、それはまったく良い考えではありません。

サイクル再配信の問題に対処するには、複数の手法があります。

1.デッドレター交換に頼る

  • 長所:信頼できる、標準的、明確
  • 短所: 追加のロジックが必要

2.メッセージごとまたはキューごとの TTLを使用する

  • 長所:実装が簡単で、標準的で明確
  • 短所: キューが長いと、メッセージが失われる可能性があります

例 (キュー ttl には数値のみを渡し、メッセージ ttl には数値文字列になるものすべてを渡すことに注意してください):

2.1 メッセージごとの ttl:

$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declareQueue();
$queue->bind('my-exchange');

$exchange->publish(
    'message at ' . microtime(true),
    null,
    AMQP_NOPARAM,
    array(
        'expiration' => '1000'
    )
);

2.2. キューごとの ttl:

$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->setArgument('x-message-ttl', 1000);
$queue->declareQueue();
$queue->bind('my-exchange');

$exchange->publish('message at ' . microtime(true));

3. メッセージ本文またはヘッダーに再配信数または左再配信数 (別名、IP スタックのホップ制限または ttl) を保持します。

  • 長所:アプリケーション レベルでメッセージの有効期間をさらに制御できます
  • 短所:メッセージを変更して再度公開する必要がある間のかなりのオーバーヘッド、アプリケーション固有、明確でない

コード:

$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declareQueue();
$queue->bind('my-exchange');

$exchange->publish(
    'message at ' . microtime(true),
    null,
    AMQP_NOPARAM,
    array(
        'headers' => array(
            'ttl' => 100
        )
    )
);

$queue->consume(
    function (AMQPEnvelope $msg, AMQPQueue $queue) use ($exchange) {
        $headers = $msg->getHeaders();
        echo $msg->isRedelivery() ? 'redelivered' : 'origin', ' ';
        echo $msg->getDeliveryTag(), ' ';
        echo isset($headers['ttl']) ? $headers['ttl'] : 'no ttl' , ' ';
        echo $msg->getBody(), PHP_EOL;

        try {
            //Do some business logic
            throw new Exception('business logic failed');
        } catch (Exception $ex) {
            //Log exception
            if (isset($headers['ttl'])) {
                // with ttl logic

                if ($headers['ttl'] > 0) {
                    $headers['ttl']--;

                    $exchange->publish($msg->getBody(), $msg->getRoutingKey(), AMQP_NOPARAM, array('headers' => $headers));
                }

                return $queue->ack($msg->getDeliveryTag());
            } else {
                // without ttl logic
                return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE); // or drop it without requeue
            }

        }

        return $queue->ack($msg->getDeliveryTag());
    }
);

メッセージの再配信フローをより適切に制御する方法が他にもいくつかある場合があります。

結論: 特効薬の解決策はありません。自分のニーズに最適なソリューションを決定するか、他の何かを見つける必要がありますが、ここで共有することを忘れないでください;)

于 2013-07-15T14:57:46.767 に答える
1

コンシューマを再起動したくない場合は、basic.recoverAMQP コマンドが必要な場合があります。AMQP プロトコルによると:

basic.recover(bit requeue)

Redeliver unacknowledged messages.

This method asks the server to redeliver all unacknowledged messages on a specified channel. 
Zero or more messages may be redelivered. This method replaces the asynchronous Recover. 
于 2013-07-15T15:43:58.113 に答える