2

https://github.com/videlvaro/php-amqplibを使用して、rabbitmq の作業を行っています。

メッセージの準備が整うまでブロックし、何もない場合は null を返す代わりにそれを返す、basic_get のブロッキング バージョン (または、繰り返し呼び出して毎回 1 つのメッセージしか取得できない basic_consume のバージョン) を作成しようとしています。待ち行列に。

basic_consume を使用して単一のメッセージを取得しようとすると、問題が発生し、「準備ができていません」が未確認のメッセージが大量に表示されます。(この方法でメッセージを 1 つだけ取得すると、毎回機能します。2 つのメッセージを取得しようとすると、ハングアップすることもあれば、機能することもあります)

class Foo {
    ...
    private function blockingGet() {
            /*
                queue: Queue from where to get the messages
                consumer_tag: Consumer identifier
                no_local: Don't receive messages published by this consumer.
                no_ack: Tells the server if the consumer will acknowledge the messages.
                exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
                nowait:
                callback: A PHP Callback
             */
            $this->ch->basic_consume($this->queueName, "consumer_".$this->consumerNum++, false, false, false, false, function($msg) {
                    $this->msgCache = json_decode($msg->body);
                    $this->ch->basic_ack($msg->delivery_info['delivery_tag']);
                    $this->ch->basic_cancel($msg->delivery_info['consumer_tag']);
            });
            while (count($this->ch->callbacks)) {
                    $this->ch->wait();
            }
            return $this->msgCache;
    }
}

$q = new Foo();
for ($i = 0; $i < 5; $i++) {
    print $q->blockingGet();
}
4

1 に答える 1