9

毎秒わずか20メッセージ!それが私が得たすべてです!これは、キューから50個のメッセージをピークし、ReceiveByIdを使用してそれらを並行して受信するコードです。キュー内のメッセージの総数は500です。他の数もテストしました。ただし、上限は1秒あたり20メッセージです。私はどこか完全に邪魔になっていませんか?

編集1:

1-キューを回復可能にする必要があります。しかし、興味深いのは、リカバリ可能なオプションをfalseに設定した場合でもです。それでも上限は20メッセージ/秒です。

2-いくつかのレガシーアプリが関係しているため、ここでMSMQを使用することを余儀なくされています。しかし、このコードが正しく、この上位20の制限が実際に存在する場合は、グループに切り替えるように説得できます。したがって、MSMQを置き換えるための(実際の経験に基づく)推奨事項は大歓迎です(また、何らかの障害が発生した場合に備えて、メッセージを保持する必要があることに注意してください)。

3-役立つ場合に備えて、ThreadPoolのスレッド数を高い数に設定しましたが、実際にはこのコードでは100〜200のスレッドが作成されます。私は50から10000までの異なる数をテストしましたが、違いはありません。

4-各タスクで、ReceiveByIdがスレッドセーフではないため、新しいMessageQueueが作成されました。

5-コードでわかるように、メッセージサイズは非常に小さいです。文字列とintだけです。

編集2:[非常に奇妙な新しい結果]

私はこのコードのすべてのビットで遊んで、これを見つけました:行をコメントアウトするとsingleLocal.UseJournalQueue = false; 私のタスクでは、1秒あたり最大1200のメッセージを読み取ることができます。印象的ではありませんが、私の場合は受け入れられます。奇妙な部分は、UseJournalQueueのデフォルト値がfalseであるということです。なぜもう一度falseに設定すると、パフォーマンスにそのような違いが生じるのでしょうか。

static partial class Program
{
    static void Main(string[] args)
    {
        ThreadPool.SetMaxThreads(15000, 30000);
        ThreadPool.SetMinThreads(10000, 20000);

        var qName = @".\private$\deep_den";

        if (!MessageQueue.Exists(qName))
        {
            var q = MessageQueue.Create(qName);
        }

        var single = new MessageQueue(qName);
        single.UseJournalQueue = false;
        single.DefaultPropertiesToSend.AttachSenderId = false;
        single.DefaultPropertiesToSend.Recoverable = true;
        single.Formatter = new XmlMessageFormatter(new[] { typeof(Data) });

        var count = 500;
        var watch = new Stopwatch();

        watch.Start();
        for (int i = 0; i < count; i++)
        {
            var data = new Data { Name = string.Format("name_{0}", i), Value = i };

            single.Send(new Message(data));
        }
        watch.Stop();

        Console.WriteLine("sent {0} msec/message", watch.Elapsed.TotalMilliseconds / count);
        Console.WriteLine("sent {0} message/sec", count / watch.Elapsed.TotalSeconds);

        var enu = single.GetMessageEnumerator2();

        watch.Reset();
        watch.Start();
        while (Interlocked.Read(ref __counter) < count)
        {
            var list = new List<Message>();
            var peekCount = 50;

            while (peekCount > 0 && enu.MoveNext(TimeSpan.FromMilliseconds(10)))
            {
                try
                {
                    list.Add(enu.Current);
                    peekCount--;
                }
                catch (Exception ex2)
                {
                    Trace.WriteLine(ex2.ToString());
                    break;
                }
            }

            var tlist = new List<Task>();
            foreach (var message in list)
            {
                var stupid_closure = message;

                var t = new Task(() =>
                {
                    using (var singleLocal = new MessageQueue(qName))
                    {
                        singleLocal.UseJournalQueue = false;
                        singleLocal.DefaultPropertiesToSend.AttachSenderId = false;
                        singleLocal.DefaultPropertiesToSend.Recoverable = true;
                        singleLocal.Formatter = new XmlMessageFormatter(new[] { typeof(Data) });

                        try
                        {
                            // processing the message and insert it into database
                            // workflow completed here, so we can safely remove the message from queue

                            var localM = singleLocal.ReceiveById(stupid_closure.Id);
                            var localSample = (Data)localM.Body;

                            Interlocked.Increment(ref __counter);
                            Console.WriteLine(Interlocked.Read(ref __counter));
                        }
                        catch (MessageQueueException ex) { if (ex.MessageQueueErrorCode != MessageQueueErrorCode.IOTimeout) Trace.WriteLine(ex.ToString()); }
                        catch (Exception ex2) { Trace.WriteLine(ex2.ToString()); }
                    }
                }, TaskCreationOptions.PreferFairness);

                tlist.Add(t);
            }

            foreach (var t in tlist) t.Start();

            Task.WaitAll(tlist.ToArray());

            list.Clear();
        }
        watch.Stop();
        Console.WriteLine("rcvd {0} msec/message", watch.Elapsed.TotalMilliseconds / count);
        Console.WriteLine("rcvd {0} message/sec", count / watch.Elapsed.TotalSeconds);

        Console.WriteLine("press any key to continue ...");
        Console.ReadKey();
    }
    static long __counter = 0;
}
4

3 に答える 3

1

Kaveh さん、使用している MessageQueue オブジェクトのコンストラクターは、メッセージ キュー オブジェクトのジャーナル設定が有効になっている場合に、UseJournalQueue プロパティを true に設定します。なんとなく .\private$\deep_den のジャーナル設定が有効になっていると思われます。編集 - 事前に作成されたキューを使用していますか?

于 2013-01-08T11:19:54.617 に答える
1

ベンチマークを行うときは、バックグラウンド ノイズがテストに干渉するのを避けるために、コードを最小限に抑えることが重要です。

残念ながら、あなたのテストは非常にうるさいので、遅延の原因を正確に見つけるのは非常に困難です.

  • スレッドを使用しないでください。マルチスレッドが役立つことはめったになく、通常は良いことよりも悪いことの方が多くなります。
  • 1 つのことだけをテストします。ReceiveById をテストするときは、コストがかかる GetMessageEnumerator2 を使用しないでください。最後に結果から削除する必要があります。
  • MessageQueue を一度だけ作成し、再利用します。新しい MessageQueue クラスの作成ではなく、ReceiveById のみをテストします。

テストを書き直したところ、はるかに良い結果が得られました。MSMQ はブロックで最速のキューではありませんが、遅くはありません。

    var qName = @".\private$\deep_den";

    if (!MessageQueue.Exists(qName))
    {
        var q = MessageQueue.Create(qName);
    }

    var single = new MessageQueue(qName);
    single.UseJournalQueue = true;
    single.DefaultPropertiesToSend.AttachSenderId = false;
    single.DefaultPropertiesToSend.Recoverable = true;
    single.Formatter = new XmlMessageFormatter(new[] { typeof(Data) });

    var count = 500;
    var watch = new Stopwatch();

    watch.Start();
    for (int i = 0; i < count; i++)
    {
        var data = new Data { Name = string.Format("name_{0}", i), Value = i };

        single.Send(new Message(data));
    }
    watch.Stop();

    Console.WriteLine("sent {0} msec/message", watch.Elapsed.TotalMilliseconds / count);
    Console.WriteLine("sent {0} message/sec", count / watch.Elapsed.TotalSeconds);

    var enu = single.GetMessageEnumerator2();

    watch.Reset();
    watch.Start();

    var queue = new MessageQueue(qName);
    queue.UseJournalQueue = true;
    queue.DefaultPropertiesToSend.AttachSenderId = false;
    queue.DefaultPropertiesToSend.Recoverable = true;
    queue.Formatter = new XmlMessageFormatter(new[] { typeof(Data) });

    List<Data> lst = new List<Data>();
    while (lst.Count != count && enu.MoveNext(TimeSpan.FromDays(1)))
    {
        var message = queue.ReceiveById(enu.Current.Id);
        lst.Add((Data)message.Body);
    }
    watch.Stop();
    Console.WriteLine("rcvd {0} msec/message", watch.Elapsed.TotalMilliseconds / count);
    Console.WriteLine("rcvd {0} message/sec", count / watch.Elapsed.TotalSeconds);

    Console.WriteLine("press any key to continue ...");
    Console.ReadKey();

于 2014-04-01T21:00:29.820 に答える
0

Kaveh、私はここで完全に間違っている可能性がありますが、あなたの問題は XML シリアライゼーションだと思います。XmlSerializer が作成されると、まだ遅くなる可能性がありますが、実際に時間がかかるのはコンストラクターです。

シリアル化を完全に削除してデータを文字列として読み取るか、事前に単一の XmlSerializer または XmlMessageFormatter を作成してスレッドに渡すことをお勧めします。スレッドの問題には気をつけてください。

于 2013-01-29T12:58:34.857 に答える