1

その情報(ドキュメントを含む)を検索しましたが、見つかりません。

RabbitMQ v. 2.7.1 で最新バージョンのphp-amqplibを使用しています。3 つのキューと 3 つの交換があります。

// Declare the exchanges
$this->channel->exchange_declare(self::EXCHANGE_TO_PROCESS, 'direct', false, true, false, false, false);
$this->channel->exchange_declare(self::EXCHANGE_WAITING, 'direct', false, true, false, false, false);
$this->channel->exchange_declare(self::EXCHANGE_TO_CLEAN, 'direct', false, true, false, false, false);

// Messages in the to_process queue are sent to to_clean after 24 hours without being processed
$this->channel->queue_declare(self::QUEUE_TO_PROCESS, false, true, false, false, false, array(
    'x-dead-letter-exchange' => array('S', self::EXCHANGE_TO_CLEAN),
    'x-message-ttl' => array('I', 86400000), // 1 day in milli-seconds
));

// Messages in the waiting queue are sent to to_process after 5 minutes (wait period before retry)
$this->channel->queue_declare(self::QUEUE_WAITING, false, true, false, false, false, array(
    'x-dead-letter-exchange' => array('S', self::EXCHANGE_TO_PROCESS),
    'x-message-ttl' => array('I', 300000), // 5 minutes in milli-seconds
));

// Messages in the to_clean queue are kept until they are processed
$this->channel->queue_declare(self::QUEUE_TO_CLEAN, false, true, false, false, false);

// Bind the queues to the exchanges
$this->channel->queue_bind(self::QUEUE_TO_PROCESS, self::EXCHANGE_TO_PROCESS);
$this->channel->queue_bind(self::QUEUE_TO_CLEAN, self::EXCHANGE_TO_CLEAN);
$this->channel->queue_bind(self::QUEUE_WAITING, self::EXCHANGE_WAITING);

動作は非常に簡単です。メッセージは .xml ファイルに発行されますEXCHANGE_TO_PROCESS。外部ワーカーがメッセージを処理します。処理が A-OK になった場合、メッセージは単純に ACK され、キューから削除されます (この部分は完全に機能します)。処理がうまくいかない場合、メッセージは代わりに に挿入され、 5 分間の TTL の後、再処理EXCHANGE_WAITINGのために DLX を介して に再挿入されます。EXCHANGE_TO_PROCESSただし、3 回目の失敗の後、cron ジョブが来る場所に挿入され、EXCHANGE_TO_CLEANメッセージやログ エラーなどをクリーンアップします。

ただし、私が遭遇した問題は、コードが (予想どおり) を明確に にバインドしQUEUE_WAITINGているEXCHANGE_WAITINGことですが、RabbitMQ 管理ページを調べると、2 つのキューがその交換にバインドされていることに気付きました。注文。5 分が経過すると、メッセージは消えます。理由はよくわかりません。QUEUE_TO_PROCESSQUEUE_WAITING

これらすべてが私の質問に私たちをもたらします:デッドレター交換は暗黙的にパラメータの交換をキューにバインドしますか? そして : 失われたメッセージに何が起こっている可能性がありますか?

編集

私は私よりもさらに混乱しています。私は次の非常に基本的なコードを試しました:

    $this->channel->exchange_declare('exchangeA', 'fanout', false, true, false, false, false);
    $this->channel->exchange_declare('exchangeB', 'fanout', false, true, false, false, false);
    $this->channel->queue_declare('queueA', false, true, false, false, false, array(
        'x-dead-letter-exchange' => array('S', 'exchangeB'),
        'x-message-ttl' => array('I', 5000)
    ));
    $this->channel->queue_declare('queueB', false, true, false, false, false);
    $this->channel->queue_bind('queueA', 'exchangeA');
    $this->channel->queue_bind('queueB', 'exchangeB');

    $msg = new AMQPMessage('hello!');
    $this->channel->basic_publish($msg, 'exchangeA');

これにより、2 つのキューと 2 つの exchange が作成され (ルーティング キーの煩わしさを避けるために、私は見てきましたfanout)、queueA を exchangeA に、queueB を exchangeB にバインドし、queueA に TTL を設定し、その DLX を exchangeB に設定します。管理ページで何が起こっているかを観察すると、予想どおり、メッセージが queueA で 5 秒間費やされた後、上記のより複雑なコードのようにメッセージが消えます。

4

2 に答える 2

3

メッセージ フローが循環する可能性があるようです。その場合、RabbitMQ は公式ドキュメント (配信不能メッセージのルーティングセクション)で指定されているようにメッセージを黙ってドロップします。

配信不能キューのサイクルを形成することができます。たとえば、デッドレタールーティングキーを指定せずに、キューがデフォルトの交換にデッドレターメッセージを送信すると、これが発生する可能性があります。このようなサイクルのメッセージ (つまり、同じキューに 2 回到達するメッセージ) は、サイクル全体がメッセージの期限切れによるものである場合、ドロップされます。

サイクリングの問題に対処するには、1 つの選択肢を選択する必要があります。

  1. サイクルを完全に断ち切ります (ある時点でメッセージをドロップします)。
  2. または、停止したキューからメッセージを消費し、ワークフローに従って手動で再発行します。

これにより、アプリケーションが多少複雑になりますが、パフォーマンスと安定性のために支払わなければならない代償です。

PS:

RabbitMQ メーリング リストを調べたところ、Dead Letter、TTL、Cycle のような同様の質問が見つかりました。

現時点では、RabbitMQ の外部でヘッダーを消去するには、使用して再公開する必要があります。

于 2014-06-05T15:28:03.740 に答える
0

このブログに出くわし、私たちと非常によく似たケースのポスターを見て、問題なく動作していると彼が言ったとき、何かがおかしいのではないかと疑いました...それで、もう少し掘り下げ始めました。

私たちが抱えていた問題は、単純にバージョンの問題でした。RabbitMQ パッケージは最新であると言われていましたが、Ubuntu 12.04 LTS を使用しているため、「最新」バージョンは 2.7.1 でした。これは 3 年以上前のバージョンです。

私たちと同じケース (古いディストリビューションを使用している) の場合は、RabbitMQ のダウンロード ページをチェックアウトし、ディストリビューションに適したものを選択してください。Ubuntuの場合、公式レポを追加し (単に .dpkg ファイルをダウンロードすることもできます)、実行しapt-get updateて、サーバーが再起動するのを待ちました。その後、上記のコードはほとんどそのまま機能しました。

于 2014-06-09T15:50:59.593 に答える