キュー メッセージ サーバーとして RabbitMQ を使用し、.NET C# クライアントを使用します。キューからのメッセージの処理中にエラーが発生した場合、メッセージは認識されず、キューにスタックしたままになり、文書として理解したとおりに再処理されません。
いくつかの構成やコードブロックが欠けているかどうかはわかりません。
私の考えは、エラーが発生した場合にメッセージを手動で自動確認し、このメッセージを手動で再度キューにプッシュすることです。
別のより良い解決策があることを願っています。
どうもありがとう。
私のコード
public void Subscribe(string queueName)
{
while (!Cancelled)
{
try
{
if (subscription == null)
{
try
{
//try to open connection
connection = connectionFactory.CreateConnection();
}
catch (BrokerUnreachableException ex)
{
//You probably want to log the error and cancel after N tries,
//otherwise start the loop over to try to connect again after a second or so.
log.Error(ex);
continue;
}
//crate chanel
channel = connection.CreateModel();
// This instructs the channel not to prefetch more than one message
channel.BasicQos(0, 1, false);
// Create a new, durable exchange
channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, true, false, null);
// Create a new, durable queue
channel.QueueDeclare(queueName, true, false, false, null);
// Bind the queue to the exchange
channel.QueueBind(queueName, exchangeName, queueName);
//create subscription
subscription = new Subscription(channel, queueName, false);
}
BasicDeliverEventArgs eventArgs;
var gotMessage = subscription.Next(250, out eventArgs);//250 millisecond
if (gotMessage)
{
if (eventArgs == null)
{
//This means the connection is closed.
DisposeAllConnectionObjects();
continue;//move to new iterate
}
//process message
channel.BasicAck(eventArgs.DeliveryTag, false);
}
}
catch (OperationInterruptedException ex)
{
log.Error(ex);
DisposeAllConnectionObjects();
}
}
DisposeAllConnectionObjects();
}
private void DisposeAllConnectionObjects()
{
//dispose subscription
if (subscription != null)
{
//IDisposable is implemented explicitly for some reason.
((IDisposable)subscription).Dispose();
subscription = null;
}
//dipose channel
if (channel != null)
{
channel.Dispose();
channel = null;
}
//check if connection is not null and dispose it
if (connection != null)
{
try
{
connection.Dispose();
}
catch (EndOfStreamException ex)
{
log.Error(ex);
}
catch (OperationInterruptedException ex)//handle this get error from dispose connection
{
log.Error(ex);
}
catch (Exception ex)
{
log.Error(ex);
}
connection = null;
}
}