0

STOMP 経由で ActiveMQ と対話しています。メッセージを発行する 1 つのプロセスと、メッセージをサブスクライブして処理する複数のプロセス (約 10 の並列インスタンス) があります。

メッセージを読んだ後、何らかの理由でアプリケーションが失敗/クラッシュした場合でも、メッセージが失われないことを確認したいと思います。当然のことながら、私はトランザクションに目を向けました。残念ながら、コンシューマーがトランザクションの一部としてメッセージを読み取ると、トランザクションが終了するまで、後続のすべてのメッセージが他のコンシューマーに送信されないことがわかりました。

テスト ケース:abcキューには 100 個のメッセージがあります。2 つの異なるブラウザー タブで次のコードをアクティブにすると、最初のコードは 10 秒で返され、2 番目のコードは 20 秒で返されます。

<?php
// Reader.php
$con = new Stomp("tcp://localhost:61613");
$con->connect();

$con->subscribe(
    "/queue/abc",
    array()
);

$tx = "tx3".microtime();
echo "TX:$tx<BR>";
$con->begin($tx);
$messages = array();
for ($i = 0; $i < 10; $i++) {
    $t = microtime(true);
    $msg = $con->readFrame();
    if (!$msg) {
        die("FAILED!");
    }
    $t = microtime(true)-$t; echo "readFrame() took $t MS to complete<BR>";
    array_push($messages, $msg);
    $con->ack($msg, $tx);
    sleep(1);
}
$con->abort($tx);

コード的に欠けているものはありますか? トランザクションがキューからアイテムを削除し、他のプロセスが他のメッセージを消費できるようにし、トランザクションが失敗またはタイムアウトした場合にアイテムを元に戻す ActiveMQ を構成する (またはヘッダーを送信する) 方法はありますか? ?

PS: 読み取りプロセスごとに DetentionQueue という別のキューを作成することを考えましたが、選択肢がある場合は実際には作成しません。

4

1 に答える 1

1

クライアント 2 がメッセージを取得する前に、ActiveMQ がキュー上のメッセージをクライアント 1 に送信しないように、サブスクリプションのプリフェッチ サイズを調整することをお勧めします。デフォルトでは 1000 に設定されているため、ユース ケースに合わせて調整することをお勧めします。

サブスクライブ フレームの「activemq.prefetchSize=1」ヘッダーを介してプリフェッチ サイズを設定できます。すべてのフレーム オプションについては、 ActiveMQ Stompページを参照してください。

于 2013-02-27T11:48:55.700 に答える