35

再接続できるように、サブスクリプションの問題がいつ発生したかを判断する方法についての指針はありますか?

私のサービスは、サブスクリプションにRabbitMQ.Client.MessagePatterns.Subscriptionを使用しています。しばらくすると、クライアントはメッセージの受信を黙って停止します。VPN接続の信頼性が最も低いため、ネットワークの問題が疑われます。

私はしばらくの間ドキュメントを読み、ネットワークの問題が原因でこのサブスクリプションが壊れている可能性があることを確認するための鍵を探していました。接続とチャネルがまだ開いていることを確認しようとしましたが、常に開いていると報告されているようです。

それが処理するメッセージは非常にうまく機能し、キューに戻されて確認されるので、「ack」の問題ではないと思います。

単純なものが欠けているに違いないと思いますが、まだ見つけていません。

public void Run(string brokerUri, Action<byte[]> handler)
{
    log.Debug("Connecting to broker: {0}".Fill(brokerUri));
    ConnectionFactory factory = new ConnectionFactory { Uri = brokerUri };

    using (IConnection connection = factory.CreateConnection())
    {
        using (IModel channel = connection.CreateModel())
        {
            channel.QueueDeclare(queueName, true, false, false, null);

            using (Subscription subscription = new Subscription(channel, queueName, false))
            {
                while (!Cancelled)
                {
                    BasicDeliverEventArgs args;

                    if (!channel.IsOpen)
                    {
                        log.Error("The channel is no longer open, but we are still trying to process messages.");
                        throw new InvalidOperationException("Channel is closed.");
                    }
                    else if (!connection.IsOpen)
                    {
                        log.Error("The connection is no longer open, but we are still trying to process message.");
                        throw new InvalidOperationException("Connection is closed.");
                    }

                    bool gotMessage = subscription.Next(250, out args);

                    if (gotMessage)
                    {
                        log.Debug("Received message");
                        try
                        {
                            handler(args.Body);
                        }
                        catch (Exception e)
                        {
                            log.Debug("Exception caught while processing message. Will be bubbled up.", e);
                            throw;
                        }

                        log.Debug("Acknowledging message completion");
                        subscription.Ack(args);
                    }
                }
            }
        }
    }
}

アップデート:

仮想マシンでサーバーを実行してネットワーク障害をシミュレートしました、接続を十分に長く切断すると例外(RabbitMQ.Client.Exceptions.OperationInterruptedException:AMQP操作が中断されました)が発生するため、ネットワークではない可能性があります問題。今はどうなるかわかりませんが、数時間実行すると失敗します。

4

1 に答える 1

64

編集:私はこれに賛成票を投じているので、.NET RabbitMQクライアントにこの機能が組み込まれていることを指摘する必要があります:https ://www.rabbitmq.com/dotnet-api-guide.html#connection-recovery

理想的には、これを使用して、再接続ロジックを手動で実装することを回避できる必要があります。


最近、ほぼ同じことを実装する必要がありました。私の知る限り、RabbitMQで入手可能な情報のほとんどは、ネットワークが非常に信頼できるか、メッセージを送受信するクライアントと同じマシンでRabbitMQブローカーを実行し、Rabbitが接続の問題に対処できることを前提としています。

接続の切断に対して堅牢になるようにRabbitクライアントを設定することは実際にはそれほど難しくありませんが、対処する必要のあるいくつかの特異性があります。

最初に行う必要があるのは、ハートビートをオンにすることです。

ConnectionFactory factory = new ConnectionFactory() 
{
  Uri = brokerUri,
  RequestedHeartbeat = 30,
}; 

「RequestedHeartbeat」を30に設定すると、接続がまだ有効かどうかをクライアントが30秒ごとにチェックします。これをオンにしないと、メッセージサブスクライバーは、接続が悪くなったという手がかりなしに、別のメッセージが着信するのを喜んで待っています。

ハートビートをオンにすると、サーバーは接続がまだ確立されているかどうかを確認します。これは非常に重要です。サブスクライバーがメッセージを取得した後、メッセージが確認される前に接続が切断された場合、サーバーはクライアントが長時間かかっていると見なし、メッセージが閉じられるまで、切断された接続で「スタック」します。ハートビートをオンにすると、サーバーは接続が不良になったときにそれを認識して閉じ、別のサブスクライバーがメッセージを処理できるようにメッセージをキューに戻します。ハートビートがない場合は、手動でアクセスし、Rabbit管理UIで接続を閉じて、スタックしたメッセージをサブスクライバーに渡す必要がありました。

次に、を処理する必要がありますOperationInterruptedException。お気づきのとおり、これは通常、接続が中断されたことに気付いたときにRabbitクライアントがスローする例外です。接続が中断されたときにが呼び出された場合IModel.QueueDeclare()、これは例外です。サブスクリプション、チャネル、接続を破棄し、新しいものを作成して、この例外を処理します。

最後に、閉じた接続からメッセージを消費しようとするときに、コンシューマーが行うことを処理する必要があります。残念ながら、Rabbitクライアントのキューからメッセージを消費する方法はそれぞれ異なるようです。 閉じた接続 を呼び出すとQueueingBasicConsumerスローされます。メッセージを待っているだけなので、何もしません。試してみたところ、使用しているクラスはへの呼び出しからtrueを返しているようですが、の値はnullです。繰り返しになりますが、接続、チャネル、サブスクリプションを破棄して再作成することで、これを処理します。EndOfStreamExceptionQueueingBasicConsumer.Queue.DequeueEventingBasicConsumerSubscriptionSubscription.Nextargs

ハートビートがオンの状態で接続が失敗すると、の値がconnection.IsOpenFalseに更新されるため、必要に応じて確認できます。subscription.Next()ただし、ハートビートは別のスレッドで実行されるため、接続を確認すると接続が開いているが、呼び出される前に閉じている場合を処理する必要があります。

最後に注意すべきことはですIConnection.Dispose()EndOfStreamException接続が閉じられた後にdisposeを呼び出すと、この呼び出しはaをスローします。これは私にはバグのように思えます。オブジェクトに対してdisposeを呼び出さないのは好きではないIDisposableので、それを呼び出して例外を飲み込みます。

それをすべてまとめて、簡単で汚い例にしましょう。

public bool Cancelled { get; set; }

IConnection _connection = null;
IModel _channel = null;
Subscription _subscription = null;

public void Run(string brokerUri, string queueName, Action<byte[]> handler)
{
    ConnectionFactory factory = new ConnectionFactory() 
    {
        Uri = brokerUri,
        RequestedHeartbeat = 30,
    };

    while (!Cancelled)
    {               
        try
        {
            if(_subscription == null)
            {
                try
                {
                    _connection = factory.CreateConnection();
                }
                catch(BrokerUnreachableException)
                {
                    //You probably want to log the error and cancel after N tries, 
                    //otherwise start the loop over to try to connect again after a second or so.
                    continue;
                }

                _channel = _connection.CreateModel();
                _channel.QueueDeclare(queueName, true, false, false, null);
                _subscription = new Subscription(_channel, queueName, false);
            }

            BasicDeliverEventArgs args;
            bool gotMessage = _subscription.Next(250, out args);
            if (gotMessage)
            {
                if(args == null)
                {
                    //This means the connection is closed.
                    DisposeAllConnectionObjects();
                    continue;
                }

                handler(args.Body);
                _subscription.Ack(args);
            }
        }
        catch(OperationInterruptedException ex)
        {
            DisposeAllConnectionObjects();
        }
    }
    DisposeAllConnectionObjects();
}

private void DisposeAllConnectionObjects()
{
    if(_subscription != null)
    {
        //IDisposable is implemented explicitly for some reason.
        ((IDisposable)_subscription).Dispose();
        _subscription = null;
    }

    if(_channel != null)
    {
        _channel.Dispose();
        _channel = null;
    }

    if(_connection != null)
    {
        try
        {
            _connection.Dispose();
        }
        catch(EndOfStreamException) 
        {
        }
        _connection = null;
    }
}
于 2012-10-04T20:23:51.817 に答える