1

キュー メッセージ サーバーとして RabbitMQ を使用し、.NET C# クライアントを使用します。キューからのメッセージの処理中にエラーが発生した場合、メッセージは認識されず、キューにスタックしたままになり、文書として理解したとおりに再処理されません。

いくつかの構成やコードブロックが欠けているかどうかはわかりません。

私の考えは、エラーが発生した場合にメッセージを手動で自動確認し、このメッセージを手動で再度キューにプッシュすることです。

別のより良い解決策があることを願っています。

どうもありがとう。

私のコード

        public void Subscribe(string queueName)
    {
        while (!Cancelled)
        {
            try
            {
                if (subscription == null)
                {
                    try
                    {
                        //try to open connection
                        connection = connectionFactory.CreateConnection();
                    }
                    catch (BrokerUnreachableException ex)
                    {
                        //You probably want to log the error and cancel after N tries, 
                        //otherwise start the loop over to try to connect again after a second or so.
                        log.Error(ex);
                        continue;
                    }

                    //crate chanel
                    channel = connection.CreateModel();
                    // This instructs the channel not to prefetch more than one message
                    channel.BasicQos(0, 1, false);
                    // Create a new, durable exchange
                    channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, true, false, null);
                    // Create a new, durable queue
                    channel.QueueDeclare(queueName, true, false, false, null);
                    // Bind the queue to the exchange
                    channel.QueueBind(queueName, exchangeName, queueName);
                    //create subscription
                    subscription = new Subscription(channel, queueName, false);
                }

                BasicDeliverEventArgs eventArgs;
                var gotMessage = subscription.Next(250, out eventArgs);//250 millisecond
                if (gotMessage)
                {
                    if (eventArgs == null)
                    {
                        //This means the connection is closed.
                        DisposeAllConnectionObjects();
                        continue;//move to new iterate
                    }

                    //process message

                   channel.BasicAck(eventArgs.DeliveryTag, false);


                }
            }
            catch (OperationInterruptedException ex)
            {
                log.Error(ex);
                DisposeAllConnectionObjects();
            }
        }

        DisposeAllConnectionObjects();
    }

    private void DisposeAllConnectionObjects()
    {
        //dispose subscription
        if (subscription != null)
        {
            //IDisposable is implemented explicitly for some reason.
            ((IDisposable)subscription).Dispose();
            subscription = null;
        }

        //dipose channel
        if (channel != null)
        {
            channel.Dispose();
            channel = null;
        }

        //check if connection is not null and dispose it
        if (connection != null)
        {
            try
            {
                connection.Dispose();
            }
            catch (EndOfStreamException ex)
            {
                log.Error(ex);
            }
            catch (OperationInterruptedException ex)//handle this get error from dispose connection 
            {
                log.Error(ex);
            }
            catch (Exception ex)
            {
                log.Error(ex);
            }
            connection = null;
        }
    }
4

1 に答える 1

9

RabbitMQ のドキュメントを誤解している可能性があります。メッセージがコンシューマから ack されない場合、Rabbit ブローカは消費のためにメッセージをキューに再キューイングします。メッセージを確認してから再キューイングするための提案された方法は良い考えではなく、問題をより複雑にするだけだと思います。

コンシューマーがメッセージの処理に問題を抱えていたためにメッセージを明示的に「拒否」したい場合は、Rabbit の Nack 機能を使用できます。

たとえば、catch 例外ブロック内では、次を使用できます。

subscription.Model.BasicNack(eventArgs.DeliveryTag, false, true);

これにより、Rabbit ブローカーにメッセージを再キューイングするように通知されます。基本的に、配信タグを渡します。複数のメッセージではない場合は false、メッセージを再キューイングする場合は true です。メッセージを拒否して再キューイングしない場合は、true を false に変更します。

さらに、サブスクリプションを作成したので、チャネル経由ではなく、これに対して直接 ack を実行する必要があると思います。

変化する:

channel.BasicAck(eventArgs.DeliveryTag, false);

に:

subscription.Ack(); 

この確認応答の方法は、既にサブスクライブしているチャンネルをいじるのではなく、サブスクリプション関連のすべてをサブスクリプション オブジェクトに保持するため、はるかにクリーンです。

于 2013-09-04T12:42:10.617 に答える