1

Stomp を使用して AMQ から 1 つのメッセージを読み取ると、3 つまたは 4 つのメッセージがキューから取り出されます。理由はわかりません。

AMQ を入力するためのコード:

public function populate($queue, $c = 10) {

    for($i = 0; $i < $c; $i++) {
        $this->stomp->send($queue,' Random populated:'.rand(0, PHP_INT_MAX));
    }

}

入力後の ActiveMQ キュー 入力後の ActiveMQ メッセージ

AMQ を読み取るためのコード:

public function read($queue = null) {

    if(is_null($queue)) {
        if(!$this->isSubscribed()) {
            return false;
        }
    } else {
        $this->subscribe($queue);
    }

    return $this->stomp->readFrame();

}

readFrame() コードをストンプします。

public function readFrame ()
{
    if (!$this->hasFrameToRead()) {
        return false;
    }

    $rb = 1024;
    $data = '';
    $end = false;

    do {
        $read = fread($this->_socket, $rb);
        if ($read === false) {
            $this->_reconnect();
            return $this->readFrame();
        }
        $data .= $read;
        if (strpos($data, "\x00") !== false) {
            $end = true;
            $data = rtrim($data, "\n");
        }
        $len = strlen($data);
    } while ($len < 2 || $end == false);

    list ($header, $body) = explode("\n\n", $data, 2);
    $header = explode("\n", $header);
    $headers = array();
    $command = null;
    foreach ($header as $v) {
        if (isset($command)) {
            list ($name, $value) = explode(':', $v, 2);
            $headers[$name] = $value;
        } else {
            $command = $v;
        }
    }
    $frame = new StompFrame($command, $headers, trim($body));
    if (isset($frame->headers['transformation']) && $frame->headers['transformation'] == 'jms-map-json') {
        require_once 'Stomp/Message/Map.php';
        return new StompMessageMap($frame);
    } else {
        return $frame;
    }
    return $frame;
}

コードが 1 回だけ実行されていることは 100% 確信していますが、結果は次のようになります。 1 つのメッセージを読み取った後の ActiveMQ キュー 1 つのメッセージを読み取った後の ActiveMQ メッセージ

Var_dumped メッセージ:

object(StompFrame)[4]
public 'command' => string 'MESSAGE' (length=7)
public 'headers' => 
array (size=5)
  'message-id' => string 'ID:**********_-49723-1350635513276-2:1:-1:1:1' (length=45)
  'destination' => string '/queue/test' (length=11)
  'timestamp' => string '1350635842968' (length=13)
  'expires' => string '0' (length=1)
  'priority' => string '4' (length=1)
public 'body' => string 'Random populated:1859256320' (length=27)

この動作の原因を知っている人はいますか?

注意事項:

  • メッセージの ACK がないため、メッセージを 1 つでもデキューする必要はありません :|
  • ACK はクライアント モードです
  • プリフェッチ サイズは 1 に設定されます
4

1 に答える 1

3

接続コードは表示されませんが、自動確認モードを使用して接続していると想定します。Auto AckモードのSTOMPでは、メッセージはネットワークに到達するとすぐにackされます。また、プリフェッチサイズを変更していないと想定しているため、ブローカーはメッセージのバッチを送信して、ソケットなどを送信すると、キューから削除されます。メッセージの消費をよりきめ細かく制御したい場合は、クライアントackなどの別のackモードを使用して、メッセージが到着したときに各メッセージをackする必要があります。サブスクリプションのプリフェッチウィンドウを設定して、クライアントにバッチ処理されるメッセージの数を減らすこともできます。

AMQ STOMP構成オプションについては、このページを参照してください。また、STOMP仕様をもう一度確認することもできます。

于 2012-10-19T10:15:21.427 に答える