6

In my test application I can see messages that were processed with an exception being automatically inserted into the default EasyNetQ_Default_Error_Queue, which is great. I can then successfully dump or requeue these messages using the Hosepipe, which also works fine, but requires dropping down to the command line and calling against both Hosepipe and the RabbitMQ API to purge the queue of retried messages.

So I'm thinking the easiest approach for my application is to simply subscribe to the error queue, so I can re-process them using the same infrastructure. But in EastNetQ, the error queue seems to be special. We need to subscribe using a proper type and routing ID, so I'm not sure what these values should be for the error queue:

bus.Subscribe<WhatShouldThisBe>("and-this", ReprocessErrorMessage);

Can I use the simple API to subscribe to the error queue, or do I need to dig into the advanced API?

If the type of my original message was TestMessage, then I'd like to be able to do something like this:

bus.Subscribe<ErrorMessage<TestMessage>>("???", ReprocessErrorMessage);

where ErrorMessage is a class provided by EasyNetQ to wrap all errors. Is this possible?

4

2 に答える 2

3

簡単な API を使用してエラー キューをサブスクライブすることはできません。これは、EasyNetQ キュー タイプの命名規則に従っていないためです。これは修正する必要があるかもしれません ;)

しかし、高度な API は問題なく動作します。元のメッセージを取り戻すことはできませんが、(Newtonsoft.JSON を使用して) 非常に簡単に逆シリアル化できる JSON 表現を簡単に取得できます。サブスクリプション コードの例を次に示します。

[Test]
[Explicit("Requires a RabbitMQ server on localhost")]
public void Should_be_able_to_subscribe_to_error_messages()
{
    var errorQueueName = new Conventions().ErrorQueueNamingConvention();
    var queue = Queue.DeclareDurable(errorQueueName);
    var autoResetEvent = new AutoResetEvent(false);

    bus.Advanced.Subscribe<SystemMessages.Error>(queue, (message, info) =>
    {
        var error = message.Body;

        Console.Out.WriteLine("error.DateTime = {0}", error.DateTime);
        Console.Out.WriteLine("error.Exception = {0}", error.Exception);
        Console.Out.WriteLine("error.Message = {0}", error.Message);
        Console.Out.WriteLine("error.RoutingKey = {0}", error.RoutingKey);

        autoResetEvent.Set();
        return Task.Factory.StartNew(() => { });
    });

    autoResetEvent.WaitOne(1000);
}

これが機能する前に、EasyNetQ でコードを記述しているエラー メッセージの小さなバグを修正する必要があったため、試してみる前にバージョン >= 0.9.2.73 を取得してください。ここでコード例を見ることができます

于 2013-02-25T10:15:45.660 に答える
0

動作するコード: (私は推測しました)

「foo」の厄介な点は、その関数 HandleErrorMessage2 を Consume 呼び出しに渡すだけでは、それが Task ではなく void を返すことを理解できないため、どのオーバーロードを使用するかを理解できないためです。(VS 2012) var に代入すると満足します。オブジェクトを破棄してサブスクライブを解除できるようにするには、呼び出しの戻り値をキャッチする必要があります。

また、誰かが EasyNetQueue などにする代わりに System Object 名 (キュー) を使用したことにも注意してください。そのため、コンパイラの使用に関する説明を追加するか、それを完全に指定する必要があります。

 using Queue = EasyNetQ.Topology.Queue;

  private const string QueueName = "EasyNetQ_Default_Error_Queue";
  public static void Should_be_able_to_subscribe_to_error_messages(IBus bus)
  {
     Action <IMessage<Error>, MessageReceivedInfo> foo = HandleErrorMessage2;

     IQueue queue = new Queue(QueueName,false);
     bus.Advanced.Consume<Error>(queue, foo);
  }

  private static void HandleErrorMessage2(IMessage<Error> msg, MessageReceivedInfo info)
 {
 }
于 2014-07-17T19:56:32.337 に答える