5

AzureでServiceBusを実行しており、1秒あたり約10〜100のメッセージを送信しています。

最近、私は.net 4.5に切り替え、すべてのコードをリファクタリングして、各行で「async」と「await 」を少なくとも2回実行し、「正しく」行われたことを確認しました:)

今、私はそれが実際に良いのか悪いのか疑問に思っています。コードスニペットを見て、あなたの考えを教えてください。私は特に、スレッドコンテキストの切り替えが、すべての非同期性から、利益よりも悲しみを与えていないかどうかを心配していました...(!dumpheapを見ると、それは間違いなく要因です)

少し説明します-2つのメソッドを投稿します-1つはConcurrentQueueでwhileループを実行し、新しいメッセージを待機し、もう1つは一度に1つのメッセージを送信します。また、Azure博士が規定したとおりに、一時的な障害処理ブロックを使用しています。

送信ループ(最初から開始し、新しいメッセージを待機しています):

private async void SendingLoop()
    {
        try
        {
            await this.RecreateMessageFactory();

            this.loopSemaphore.Reset();
            Buffer<SendMessage> message = null;

            while (true)
            {
                if (this.cancel.Token.IsCancellationRequested)
                {
                    break;
                }
                this.semaphore.WaitOne();
                if (this.cancel.Token.IsCancellationRequested)
                {
                    break;
                }

                while (this.queue.TryDequeue(out message))
                {                       
                    try
                    {
                        using (message)
                        {
                            //only take send the latest message
                            if (!this.queue.IsEmpty)
                            {
                                this.Log.Debug("Skipping qeued message, Topic: " + message.Value.Topic);
                                continue;
                            }
                            else
                            {
                                if (this.Topic == null || this.Topic.Path != message.Value.Topic)
                                    await this.EnsureTopicExists(message.Value.Topic, this.cancel.Token);

                                if (this.cancel.Token.IsCancellationRequested)
                                    break;
                                await this.SendMessage(message, this.cancel.Token);
                            }
                        }
                    }
                    catch (OperationCanceledException)
                    {
                        break;
                    }
                    catch (Exception ex)
                    {
                        ex.LogError();
                    }
                }
            }
        }
        catch (OperationCanceledException)
        { }
        catch (Exception ex)
        {
            ex.LogError();
        }
        finally
        {
            if (this.loopSemaphore != null)
                this.loopSemaphore.Set();
        }
    }

メッセージの送信:

private async Task SendMessage(Buffer<SendMessage> message, CancellationToken cancellationToken)
    {
        //this.Log.Debug("MessageBroadcaster.SendMessage to " + this.GetTopic());
        bool entityNotFound = false;

        if (this.MessageSender.IsClosed)
        {
            //this.Log.Debug("MessageBroadcaster.SendMessage MessageSender closed, recreating " + this.GetTopic());
            await this.EnsureMessageSender(cancellationToken);
        }

        try
        {
            await this.sendMessageRetryPolicy.ExecuteAsync(async () =>
            {
                message.Value.Body.Seek(0, SeekOrigin.Begin);
                using (var msg = new BrokeredMessage(message.Value.Body, false))
                {
                    await Task.Factory.FromAsync(this.MessageSender.BeginSend, this.MessageSender.EndSend, msg, null);
                }
            }, cancellationToken);
        }
        catch (MessagingEntityNotFoundException)
        {
            entityNotFound = true;                
        }
        catch (OperationCanceledException)
        { }
        catch (ObjectDisposedException)
        { }
        catch (Exception ex)
        {
            ex.LogError();
        }

        if (entityNotFound)
        {
            if (!cancellationToken.IsCancellationRequested)
            {
                await this.EnsureTopicExists(message.Value.Topic, cancellationToken);
            }
        }
    }

上記のコードは、1メッセージ/秒を送信する「Sender」クラスからのものです。常に約50〜100のインスタンスが実行されているため、かなりの数のスレッドになる可能性があります。

ところで、EnsureMessageSender、RecreateMessageFactory、EnsureTopicExistsについてはあまり心配しないでください。これらは、それほど頻繁には呼び出されません。

必要なのは一度に1つのメッセージを送信するだけで、非同期のものを気にせず、それに伴うオーバーヘッドを回避することを条件として、メッセージキューを介して1つのバックグラウンドスレッドを処理し、メッセージを同期的に送信する方がよいのではないでしょうか。

通常、1つのメッセージをAzure Service Busに送信するのは数ミリ秒であり、それほど高価ではないことに注意してください。(低速、タイムアウト、またはService Busバックエンドに問題がある場合を除いて、送信しようとしてしばらくハングする可能性があります)。

長い投稿をありがとう、ごめんなさい、

ステボ

提案された解決策

この例は私の状況の解決策になりますか?

static void Main(string[] args)
    {
        var broadcaster = new BufferBlock<int>(); //queue
        var cancel = new CancellationTokenSource();

        var run = Task.Run(async () =>
        {
            try
            {
                while (true)
                {
                    //check if we are not finished
                    if (cancel.IsCancellationRequested)
                        break;                       

                    //async wait until a value is available
                    var val = await broadcaster.ReceiveAsync(cancel.Token).ConfigureAwait(false);
                    int next = 0;

                    //greedy - eat up and ignore all the values but last
                    while (broadcaster.TryReceive(out next))
                    {
                        Console.WriteLine("Skipping " + val);
                        val = next;
                    }

                    //check if we are not finished
                    if (cancel.IsCancellationRequested)
                        break;

                    Console.WriteLine("Sending " + val);

                    //simulate sending delay
                    await Task.Delay(1000).ConfigureAwait(false); 

                    Console.WriteLine("Value sent " + val);                        
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
            }

        }, cancel.Token);

        //simulate sending messages. One every 200mls 
        for (int i = 0; i < 20; i++)
        {
            Console.WriteLine("Broadcasting " + i);
            broadcaster.Post(i);
            Thread.Sleep(200);
        }

        cancel.Cancel();
        run.Wait();
    }
4

2 に答える 2

5

あなたは言う:

上記のコードは、1メッセージ/秒を送信する「Sender」クラスからのものです。常に約50〜100のインスタンスが実行されているため、かなりの数のスレッドになる可能性があります。

これは非同期の良いケースです。ここでたくさんのスレッドを保存します。非同期はスレッドベースではないため、コンテキストの切り替えを減らします。待機が必要な場合は、コンテキストスイッチは行われません。代わりに、次の作業項目が同じスレッドで処理されています(存在する場合)。

そのため、非同期ソリューションは同期ソリューションよりも確実に拡張性が高くなります。ワークフローの50〜100インスタンスで実際に使用するCPUが少ないかどうかを測定する必要があります。インスタンスが多いほど、非同期が速くなる可能性が高くなります。

ここで、実装に1つの問題がありますConcurrentQueue。非同期対応ではないを使用しています。したがって、非同期バージョンでも実際には50〜100のスレッドを使用します。それらはブロックするか(回避したかった)、またはビジーウェイトで100%CPUを燃焼するのを待ちます(これはあなたの実装の場合のようです!)。この問題を取り除き、キューイングも非同期にする必要があります。SemaphoreSlim非同期で待機できるので、ここで役立つかもしれません。

于 2013-03-25T12:01:15.097 に答える
4

まず、Task!= Thread。タスク(およびasyncメソッドの継続)はスレッドプールにスケジュールされます。ここで、Microsoftは、タスクがかなり短い限り、驚異的に機能する多数の最適化を導入しました。

コードを確認すると、1行でフラグが立てられますsemaphore.WaitOne。キューにデータがあることを示す一種のシグナルとしてこれを使用していると思います。これは、メソッド内のブロッキング待機であるため、悪いことです。asyncブロッキング待機を使用することにより、コードは軽量の継続からはるかに重いスレッドプールスレッドに変わります。

したがって、@ usrの推奨に従い、キュー(およびセマフォ)をasync-readyキューに置き換えます。TPL DataflowBufferBlock<T>は、NuGetを介してasync利用できる準備が整ったプロデューサー/コンシューマーキューです。プロジェクトでは、キューとしてだけでなくデータフローをより広範囲に使用することでメリットが得られるように思われるため、最初にこれをお勧めします(ただし、キューは開始に適した場所です)。

他のasync準備ができたデータ構造が存在します。私のAsyncExライブラリにはそれらがいくつかあります。簡単なものを自分で作成することも難しくありません。この件に関するブログ投稿があります。ただし、状況に応じてTPLデータフローをお勧めします。

于 2013-03-25T13:35:42.503 に答える