AzureでServiceBusを実行しており、1秒あたり約10〜100のメッセージを送信しています。
最近、私は.net 4.5に切り替え、すべてのコードをリファクタリングして、各行で「async」と「await 」を少なくとも2回実行し、「正しく」行われたことを確認しました:)
今、私はそれが実際に良いのか悪いのか疑問に思っています。コードスニペットを見て、あなたの考えを教えてください。私は特に、スレッドコンテキストの切り替えが、すべての非同期性から、利益よりも悲しみを与えていないかどうかを心配していました...(!dumpheapを見ると、それは間違いなく要因です)
少し説明します-2つのメソッドを投稿します-1つはConcurrentQueueでwhileループを実行し、新しいメッセージを待機し、もう1つは一度に1つのメッセージを送信します。また、Azure博士が規定したとおりに、一時的な障害処理ブロックを使用しています。
送信ループ(最初から開始し、新しいメッセージを待機しています):
private async void SendingLoop()
{
try
{
await this.RecreateMessageFactory();
this.loopSemaphore.Reset();
Buffer<SendMessage> message = null;
while (true)
{
if (this.cancel.Token.IsCancellationRequested)
{
break;
}
this.semaphore.WaitOne();
if (this.cancel.Token.IsCancellationRequested)
{
break;
}
while (this.queue.TryDequeue(out message))
{
try
{
using (message)
{
//only take send the latest message
if (!this.queue.IsEmpty)
{
this.Log.Debug("Skipping qeued message, Topic: " + message.Value.Topic);
continue;
}
else
{
if (this.Topic == null || this.Topic.Path != message.Value.Topic)
await this.EnsureTopicExists(message.Value.Topic, this.cancel.Token);
if (this.cancel.Token.IsCancellationRequested)
break;
await this.SendMessage(message, this.cancel.Token);
}
}
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
ex.LogError();
}
}
}
}
catch (OperationCanceledException)
{ }
catch (Exception ex)
{
ex.LogError();
}
finally
{
if (this.loopSemaphore != null)
this.loopSemaphore.Set();
}
}
メッセージの送信:
private async Task SendMessage(Buffer<SendMessage> message, CancellationToken cancellationToken)
{
//this.Log.Debug("MessageBroadcaster.SendMessage to " + this.GetTopic());
bool entityNotFound = false;
if (this.MessageSender.IsClosed)
{
//this.Log.Debug("MessageBroadcaster.SendMessage MessageSender closed, recreating " + this.GetTopic());
await this.EnsureMessageSender(cancellationToken);
}
try
{
await this.sendMessageRetryPolicy.ExecuteAsync(async () =>
{
message.Value.Body.Seek(0, SeekOrigin.Begin);
using (var msg = new BrokeredMessage(message.Value.Body, false))
{
await Task.Factory.FromAsync(this.MessageSender.BeginSend, this.MessageSender.EndSend, msg, null);
}
}, cancellationToken);
}
catch (MessagingEntityNotFoundException)
{
entityNotFound = true;
}
catch (OperationCanceledException)
{ }
catch (ObjectDisposedException)
{ }
catch (Exception ex)
{
ex.LogError();
}
if (entityNotFound)
{
if (!cancellationToken.IsCancellationRequested)
{
await this.EnsureTopicExists(message.Value.Topic, cancellationToken);
}
}
}
上記のコードは、1メッセージ/秒を送信する「Sender」クラスからのものです。常に約50〜100のインスタンスが実行されているため、かなりの数のスレッドになる可能性があります。
ところで、EnsureMessageSender、RecreateMessageFactory、EnsureTopicExistsについてはあまり心配しないでください。これらは、それほど頻繁には呼び出されません。
必要なのは一度に1つのメッセージを送信するだけで、非同期のものを気にせず、それに伴うオーバーヘッドを回避することを条件として、メッセージキューを介して1つのバックグラウンドスレッドを処理し、メッセージを同期的に送信する方がよいのではないでしょうか。
通常、1つのメッセージをAzure Service Busに送信するのは数ミリ秒であり、それほど高価ではないことに注意してください。(低速、タイムアウト、またはService Busバックエンドに問題がある場合を除いて、送信しようとしてしばらくハングする可能性があります)。
長い投稿をありがとう、ごめんなさい、
ステボ
提案された解決策
この例は私の状況の解決策になりますか?
static void Main(string[] args)
{
var broadcaster = new BufferBlock<int>(); //queue
var cancel = new CancellationTokenSource();
var run = Task.Run(async () =>
{
try
{
while (true)
{
//check if we are not finished
if (cancel.IsCancellationRequested)
break;
//async wait until a value is available
var val = await broadcaster.ReceiveAsync(cancel.Token).ConfigureAwait(false);
int next = 0;
//greedy - eat up and ignore all the values but last
while (broadcaster.TryReceive(out next))
{
Console.WriteLine("Skipping " + val);
val = next;
}
//check if we are not finished
if (cancel.IsCancellationRequested)
break;
Console.WriteLine("Sending " + val);
//simulate sending delay
await Task.Delay(1000).ConfigureAwait(false);
Console.WriteLine("Value sent " + val);
}
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
}, cancel.Token);
//simulate sending messages. One every 200mls
for (int i = 0; i < 20; i++)
{
Console.WriteLine("Broadcasting " + i);
broadcaster.Post(i);
Thread.Sleep(200);
}
cancel.Cancel();
run.Wait();
}