STOMP 経由で ActiveMQ と対話しています。メッセージを発行する 1 つのプロセスと、メッセージをサブスクライブして処理する複数のプロセス (約 10 の並列インスタンス) があります。
メッセージを読んだ後、何らかの理由でアプリケーションが失敗/クラッシュした場合でも、メッセージが失われないことを確認したいと思います。当然のことながら、私はトランザクションに目を向けました。残念ながら、コンシューマーがトランザクションの一部としてメッセージを読み取ると、トランザクションが終了するまで、後続のすべてのメッセージが他のコンシューマーに送信されないことがわかりました。
テスト ケース:abc
キューには 100 個のメッセージがあります。2 つの異なるブラウザー タブで次のコードをアクティブにすると、最初のコードは 10 秒で返され、2 番目のコードは 20 秒で返されます。
<?php
// Reader.php
$con = new Stomp("tcp://localhost:61613");
$con->connect();
$con->subscribe(
"/queue/abc",
array()
);
$tx = "tx3".microtime();
echo "TX:$tx<BR>";
$con->begin($tx);
$messages = array();
for ($i = 0; $i < 10; $i++) {
$t = microtime(true);
$msg = $con->readFrame();
if (!$msg) {
die("FAILED!");
}
$t = microtime(true)-$t; echo "readFrame() took $t MS to complete<BR>";
array_push($messages, $msg);
$con->ack($msg, $tx);
sleep(1);
}
$con->abort($tx);
コード的に欠けているものはありますか? トランザクションがキューからアイテムを削除し、他のプロセスが他のメッセージを消費できるようにし、トランザクションが失敗またはタイムアウトした場合にアイテムを元に戻す ActiveMQ を構成する (またはヘッダーを送信する) 方法はありますか? ?
PS: 読み取りプロセスごとに DetentionQueue という別のキューを作成することを考えましたが、選択肢がある場合は実際には作成しません。