3

メッセージを消費するために24時間年中無休で利用したい非常に単純なクライアントがあります。Windows プロセスで実行されています。

サーバーとメッセージの受信に問題はありません。それは単なるクライアントです。

動作は次のとおりです。

接続を新たに開始すると機能します。しばらくして、おそらく数時間後、私のクライアントは奇妙な状態になりました。含まれる接続は、未確認のメッセージを「保留」します。

つまり、Web 管理インターフェイスを使用すると、合計で 2 つの確認応答されていないメッセージがあることがわかります。接続を見ると、2 つの未確認メッセージが広がっているのがわかります。

しかし、処理は行われていません。

そして最終的に、私の接続は強制終了され、例外もログ メッセージもトリガーされません。これにより、すべてのメッセージが準備完了状態になります。

この問題を解決するための最初の試みは、IModel、IChannel、および QueueingBasicConsumer の i-var の状態をチェックする単純な外部ループを追加することでした。ただし、IModel/IChannel の IsOpen は、Web 管理者がアクティブな接続がないと報告した後でも常に true を報告し、QueueingBasicConsumer の IsRunning も常に true を報告します。

明らかに、接続が「アクティブ」かどうかを確認する別の方法が必要です。

要約すると、最初はうまく機能します。最終的に、診断チェックが無意味になり、サーバーに送信されたメッセージが確認されず、既存の接続全体に分散するという奇妙な状態になります。すぐに、デバッグや例外がスローされずに接続が切断され、診断チェックでは問題がないと報告されます。

ヘルプやベストプラクティスをいただければ幸いです。ハートビートと、BasicQos を使用して例外をチェックすることが提案されている IsOpen の「競合」状態を読みましたが、まず何が起こっているのかを理解したいと思います。

これが私が物事を始めるところです:

private void StartMessageLoop(string uri, string queueName) {
    this.serverUri = uri;
    this.queueName = queueName;

    Connect(uri);
    Task.Factory.StartNew(()=> MessageLoopTask(queueName));
}

接続方法は次のとおりです。

private void Connect(string serverAddress) {
    ConnectionFactory cf = new ConnectionFactory();

    cf.Uri = serverAddress;

    this.connection = cf.CreateConnection();

    this.connection.ConnectionShutdown += new ConnectionShutdownEventHandler(LogConnClose);
    this.channel = this.connection.CreateModel();
}

ここから無限ループが始まります:

private void MessageLoopTask(string queueName) {
    consumer = new QueueingBasicConsumer(channel);
    String consumerTag = channel.BasicConsume(queueName, false, consumer);

    while (true) {
        try {       
            BasicDeliverEventArgs e = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
            IBasicProperties props = e.BasicProperties;
            byte[] body = e.Body;       

            string messageContent = Encoding.UTF8.GetString(body);
            bool result = this.messageProcessor.ProcessMessage(messageContent);

            if(result){
                channel.BasicAck(e.DeliveryTag, false);
            }
            else{
                channel.BasicNack(e.DeliveryTag, false, true);
                // log
            }
        }
        catch (OperationInterruptedException ex) {
            // log
            break;
        }    
        catch(Exception e) {            
            // log      
            break;
        }
    }
    // log
}

よろしく、デーン

4

0 に答える 0