0

以下に、Azure Service Bus キューを使用するかなり単純な .NET コンソール アプリを示します。

ご覧のとおり、Task.Factory を使用して 25 個のレシーバー タスクを起動し、APM スタイルの BeginMessageReceive メソッドを呼び出しています。次に、EndMessageReceive の最後で、もう一度 BeginMessageReceive を呼び出してループを続行します。

私の質問は、APM スタイルの BeginMessageReceive/EndMessageReceive から、再帰タスクを使用し、場合によっては C# 5.0 async/await を利用する TPL/TAP アプローチに切り替えて、同じようなことを達成するにはどうすればよいかということです。

using System;
using System.Configuration;
using System.Threading.Tasks;
using Microsoft.ServiceBus.Messaging;


namespace ServiceBusConsumer
{
    class Program
    {
        private static QueueClient _queueClient;

        private static void Main(string[] args)
        {    
            var connectionString = ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"];
            _queueClient = QueueClient.CreateFromConnectionString(connectionString, "MyQueue");

            for (var i = 0; i < 25; i++ )
            {
                Task.Factory.StartNew(BeginMessageReceive);
            }

            Console.WriteLine("Waiting for messages...");
            Console.ReadKey();

            _queueClient.Close();

        } //end private static void Main(string[] args)

        private static void BeginMessageReceive()
        {
            _queueClient.BeginReceive(TimeSpan.FromMinutes(5), EndMessageReceive, null);
        }

        private static void EndMessageReceive(IAsyncResult iar)
        {
            var message = _queueClient.EndReceive(iar);
            try
            {
                if (message != null)
                {
                    var msg = message.GetBody<string>();
                    Console.WriteLine("Message: " + msg);

                    if (_queueClient.Mode == ReceiveMode.PeekLock)
                    {
                        // Mark brokered message as completed at which point it's removed from the queue.
                        message.Complete();
                    }
                }
            }
            catch (Exception ex)
            {
                if (_queueClient.Mode == ReceiveMode.PeekLock)
                {
                    // unlock the message and make it available 
                    message.Abandon();
                }

                Console.WriteLine("Exception was thrown: " + ex.GetBaseException().Message);
            }
            finally
            {
                if (message != null)
                {
                    message.Dispose();
                }
            }
            BeginMessageReceive();
        }

    }
}

MessageReceive タイムアウトの期限が切れた場合に再帰的に自分自身を再度呼び出すための新しい変更されたコード:

private static async Task MessageReceiveAsync()
{
    while (true)
    {
        using (var message = await _queueClient.ReceiveAsync(TimeSpan.FromMinutes(5)))try
        {
            if (message != null)
            {               
                try
                {

                    var msg = message.GetBody<string>();
                    Console.WriteLine("Message: " + msg);

                    if (_queueClient.Mode == ReceiveMode.PeekLock)
                    {
                        // Mark brokered message as completed at which point it's removed from the queue.
                        await message.CompleteAsync();
                    }
                }
                catch (Exception ex)
                {
                    message.AbandonAsync();
                    Console.WriteLine("Exception was thrown: " + ex.GetBaseException().Message);
                }
            }
        }
    }
}
4

1 に答える 1

4

Azure クライアント ライブラリはまだ TAP API で更新されていないようです。そこに何のホールドアップがあるのか​​ わかりません...

とにかく、次のように、を使用して独自のAPM->TAP ラッパーを作成できますTaskFactory.FromAsync

public static class MyAzureExtensions
{
  public static Task<BrokeredMessage> ReceiveAsync(this QueueClient @this,
      TimeSpan serverWaitTime)
  {
    return Task<BrokeredMessage>.Factory.FromAsync(
        @this.BeginReceive, @this.EndReceive, serverWaitTime, null);
  }

  public static Task CompleteAsync(this BrokeredMessage @this)
  {
    return Task.Factory.FromAsync(@this.BeginComplete, @this.EndComplete, null);
  }
}

Azure 呼び出しを TAP 対応 API にラップしたら、次のように使用できます。

private static void Main(string[] args)
{    
  var connectionString = ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"];
  _queueClient = QueueClient.CreateFromConnectionString(connectionString, "MyQueue");

  for (var i = 0; i < 25; i++ )
    MyMessageReceiveAsync();

  Console.WriteLine("Waiting for messages...");
  Console.ReadKey();

  _queueClient.Close();
}

private static async Task MyMessageReceiveAsync()
{
  while (true)
  {
    using (var message = await _queueClient.ReceiveAsync(TimeSpan.FromMinutes(5)))
    {
      try
      {
        var msg = message.GetBody<string>();
        Console.WriteLine("Message: " + msg);

        if (_queueClient.Mode == ReceiveMode.PeekLock)
        {
          // Mark brokered message as completed at which point it's removed from the queue.
          await message.CompleteAsync();
        }
      }
      catch (Exception ex)
      {
        if (_queueClient.Mode == ReceiveMode.PeekLock)
        {
          // unlock the message and make it available 
          message.Abandon();
        }

        Console.WriteLine("Exception was thrown: " + ex.GetBaseException().Message);
      }
    }
  }
}

このように使用する利点の 1 つasyncは、スレッド プールのスレッドを不必要に拘束しないことです。オリジナルではリッスンに 25 スレッドを使用していました。私のサンプルは、リッスンするためにスレッドを使用しません。サンプルでスレッド プール スレッドが拘束されるのは、(エラー処理ブランチで) メッセージが破棄されるときだけです。

元のコードとの大きな意味上の違い1 つあります。QueueClient元のコードでは、「受信」で例外が発生すると、プロセスがクラッシュします。私の例では、例外は無視されます。

于 2013-02-07T13:56:25.973 に答える