メッセージが確認されず、アプリケーションが失敗した場合、メッセージは自動的に再配信redelivered
され、エンベロープのプロパティが に設定されます (フラグtrue
を付けて消費しない限り)。no-ack = true
更新:
nack
catch ブロックで再配信フラグを使用してメッセージを送信する必要があります
try {
//Do some business logic
} catch (Exception $ex) {
//Log exception
return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE);
}
RabbitMQ および AMQP プロトコルでは再配信カウントがまったく実装されていないため、メッセージが無限にナッキングされることに注意してください。
そのようなメッセージを台無しにしたくなく、単に遅延を追加したい場合は、メソッド呼び出しの前sleep()
またはusleep()
前に遅延を追加したいかもしれませnack
んが、それはまったく良い考えではありません。
サイクル再配信の問題に対処するには、複数の手法があります。
1.デッドレター交換に頼る
- 長所:信頼できる、標準的、明確
- 短所: 追加のロジックが必要
2.メッセージごとまたはキューごとの TTLを使用する
- 長所:実装が簡単で、標準的で明確
- 短所: キューが長いと、メッセージが失われる可能性があります
例 (キュー ttl には数値のみを渡し、メッセージ ttl には数値文字列になるものすべてを渡すことに注意してください):
2.1 メッセージごとの ttl:
$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declareQueue();
$queue->bind('my-exchange');
$exchange->publish(
'message at ' . microtime(true),
null,
AMQP_NOPARAM,
array(
'expiration' => '1000'
)
);
2.2. キューごとの ttl:
$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->setArgument('x-message-ttl', 1000);
$queue->declareQueue();
$queue->bind('my-exchange');
$exchange->publish('message at ' . microtime(true));
3. メッセージ本文またはヘッダーに再配信数または左再配信数 (別名、IP スタックのホップ制限または ttl) を保持します。
- 長所:アプリケーション レベルでメッセージの有効期間をさらに制御できます
- 短所:メッセージを変更して再度公開する必要がある間のかなりのオーバーヘッド、アプリケーション固有、明確でない
コード:
$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declareQueue();
$queue->bind('my-exchange');
$exchange->publish(
'message at ' . microtime(true),
null,
AMQP_NOPARAM,
array(
'headers' => array(
'ttl' => 100
)
)
);
$queue->consume(
function (AMQPEnvelope $msg, AMQPQueue $queue) use ($exchange) {
$headers = $msg->getHeaders();
echo $msg->isRedelivery() ? 'redelivered' : 'origin', ' ';
echo $msg->getDeliveryTag(), ' ';
echo isset($headers['ttl']) ? $headers['ttl'] : 'no ttl' , ' ';
echo $msg->getBody(), PHP_EOL;
try {
//Do some business logic
throw new Exception('business logic failed');
} catch (Exception $ex) {
//Log exception
if (isset($headers['ttl'])) {
// with ttl logic
if ($headers['ttl'] > 0) {
$headers['ttl']--;
$exchange->publish($msg->getBody(), $msg->getRoutingKey(), AMQP_NOPARAM, array('headers' => $headers));
}
return $queue->ack($msg->getDeliveryTag());
} else {
// without ttl logic
return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE); // or drop it without requeue
}
}
return $queue->ack($msg->getDeliveryTag());
}
);
メッセージの再配信フローをより適切に制御する方法が他にもいくつかある場合があります。
結論: 特効薬の解決策はありません。自分のニーズに最適なソリューションを決定するか、他の何かを見つける必要がありますが、ここで共有することを忘れないでください;)