0

概要

切断された接続を検出できるように、コンシューマーでハートビートを設定しようとしています。たとえば、ブローカーが再起動された場合、フェイルオーバーに再接続する可能性があります。

Consumer/Dispatcher をローカルで実行し、キューを AWS で実行してコードをテストしたところ、すべて正常に動作しました。ただし、コードを AWS に移動すると、コンシューマーはサーバー/ブローカーでハートビートを設定しますが、ハートビートはサーバー/ブローカーによって送信されたり、クライアント/コンシューマーによって受信されたりすることはありません。その結果、HeartbeatException要求されたサーバーのハートビート間隔が経過するとすぐに がスローされます。

私のコードは、stomp-php-examples github の例に基づいています。

なぜこれが機能しないのかについての次の最良の推測は、AWS によって提供されるデフォルトの構成を使用しているため、キュー構成に関連するものです (と思います)。ハートビートの構成設定についてグーグルで検索しましたが、これは新しいトピックであるため、あまり進んでいません。どんな助けでも大歓迎です!

設定

  • アマゾン MQ (ActiveMQ 5.15.14)
  • stomp-php 5.0 (本日現在の最新バージョン)

セットアップの詳細を提供できることをうれしく思います。

コード

コンシューマー (簡易版)

abstract class AmqConsumerAbstract
{
    /** @var StatefulStomp */
    protected $consumer;

    const HEARTBEAT = 5000;

    public function listen(): void
    {
        $observer = new ServerAliveObserver();
        $this->client->getConnection()->getObservers()->addObserver($observer);
        $this->client->setHeartbeat(0, self::HEARTBEAT); // heartbeats setup here
        // Note: the heartbeat above is longer than the read-timeout below

        $this->client->getConnection()->setReadTimeout(2, 0);

        $this->client->connect();

        $this->consumer = new StatefulStomp($this->client);
        $this->consumer->subscribe(
            $this->getQueueName(),
            null,
            'client'
        );

        if (!$observer->isEnabled()) {
            // we never get here so I assume everything is working OK
            echo 'The Server is not supporting heartbeats.');
            exit(1);
        } else {
            echo sprintf('The Server should send us signals every %d ms.', $observer->getInterval() * 1000);
        }

        try {
            while (true) {
                $frame = $this->consumer->read(); // I assumed this line would read the heartbeat?

                // there is then some logic that deals with payload and does
                // $this->consumer->begin();
                // $this->consumer->ack($frame);
                // $this->consumer->commit();

                if ($observer->isDelayed()) {
                    $this->echoLog('ServerAliveObserver: Server has been delayed.');
                }
            }
        } catch (HeartbeatException $e) {
            echo 'AMQ (STOMP) Error, the server failed to send us heartbeats within the defined interval: ' . $e->getMessage()
            $this->reconnect();
        } catch (ConnectionException $e) {
            echo $e->getMessage();
            $this->reconnect();
        } catch (Exception $e) {
            echo 'AMQ (STOMP) Queue error: ' . $e->getMessage();
            exit(1);
        }
    }
}

バージョン 5.15.14 の単一インスタンス ブローカーで再現しました。

ブローカーの STOMP デバッグをオンに切り替えようとしましたが、uri下の属性<transportConnector>は許可されていないようです。構成は保存されますが、属性が引き出されuriます。

<transportConnector>Amazon MQ 設定で許可される要素とその属性

Spring XML 構成ファイル (xsd ファイル) の操作

4

0 に答える 0