毎秒わずか20メッセージ!それが私が得たすべてです!これは、キューから50個のメッセージをピークし、ReceiveByIdを使用してそれらを並行して受信するコードです。キュー内のメッセージの総数は500です。他の数もテストしました。ただし、上限は1秒あたり20メッセージです。私はどこか完全に邪魔になっていませんか?
編集1:
1-キューを回復可能にする必要があります。しかし、興味深いのは、リカバリ可能なオプションをfalseに設定した場合でもです。それでも上限は20メッセージ/秒です。
2-いくつかのレガシーアプリが関係しているため、ここでMSMQを使用することを余儀なくされています。しかし、このコードが正しく、この上位20の制限が実際に存在する場合は、グループに切り替えるように説得できます。したがって、MSMQを置き換えるための(実際の経験に基づく)推奨事項は大歓迎です(また、何らかの障害が発生した場合に備えて、メッセージを保持する必要があることに注意してください)。
3-役立つ場合に備えて、ThreadPoolのスレッド数を高い数に設定しましたが、実際にはこのコードでは100〜200のスレッドが作成されます。私は50から10000までの異なる数をテストしましたが、違いはありません。
4-各タスクで、ReceiveByIdがスレッドセーフではないため、新しいMessageQueueが作成されました。
5-コードでわかるように、メッセージサイズは非常に小さいです。文字列とintだけです。
編集2:[非常に奇妙な新しい結果]
私はこのコードのすべてのビットで遊んで、これを見つけました:行をコメントアウトするとsingleLocal.UseJournalQueue = false; 私のタスクでは、1秒あたり最大1200のメッセージを読み取ることができます。印象的ではありませんが、私の場合は受け入れられます。奇妙な部分は、UseJournalQueueのデフォルト値がfalseであるということです。なぜもう一度falseに設定すると、パフォーマンスにそのような違いが生じるのでしょうか。
static partial class Program
{
static void Main(string[] args)
{
ThreadPool.SetMaxThreads(15000, 30000);
ThreadPool.SetMinThreads(10000, 20000);
var qName = @".\private$\deep_den";
if (!MessageQueue.Exists(qName))
{
var q = MessageQueue.Create(qName);
}
var single = new MessageQueue(qName);
single.UseJournalQueue = false;
single.DefaultPropertiesToSend.AttachSenderId = false;
single.DefaultPropertiesToSend.Recoverable = true;
single.Formatter = new XmlMessageFormatter(new[] { typeof(Data) });
var count = 500;
var watch = new Stopwatch();
watch.Start();
for (int i = 0; i < count; i++)
{
var data = new Data { Name = string.Format("name_{0}", i), Value = i };
single.Send(new Message(data));
}
watch.Stop();
Console.WriteLine("sent {0} msec/message", watch.Elapsed.TotalMilliseconds / count);
Console.WriteLine("sent {0} message/sec", count / watch.Elapsed.TotalSeconds);
var enu = single.GetMessageEnumerator2();
watch.Reset();
watch.Start();
while (Interlocked.Read(ref __counter) < count)
{
var list = new List<Message>();
var peekCount = 50;
while (peekCount > 0 && enu.MoveNext(TimeSpan.FromMilliseconds(10)))
{
try
{
list.Add(enu.Current);
peekCount--;
}
catch (Exception ex2)
{
Trace.WriteLine(ex2.ToString());
break;
}
}
var tlist = new List<Task>();
foreach (var message in list)
{
var stupid_closure = message;
var t = new Task(() =>
{
using (var singleLocal = new MessageQueue(qName))
{
singleLocal.UseJournalQueue = false;
singleLocal.DefaultPropertiesToSend.AttachSenderId = false;
singleLocal.DefaultPropertiesToSend.Recoverable = true;
singleLocal.Formatter = new XmlMessageFormatter(new[] { typeof(Data) });
try
{
// processing the message and insert it into database
// workflow completed here, so we can safely remove the message from queue
var localM = singleLocal.ReceiveById(stupid_closure.Id);
var localSample = (Data)localM.Body;
Interlocked.Increment(ref __counter);
Console.WriteLine(Interlocked.Read(ref __counter));
}
catch (MessageQueueException ex) { if (ex.MessageQueueErrorCode != MessageQueueErrorCode.IOTimeout) Trace.WriteLine(ex.ToString()); }
catch (Exception ex2) { Trace.WriteLine(ex2.ToString()); }
}
}, TaskCreationOptions.PreferFairness);
tlist.Add(t);
}
foreach (var t in tlist) t.Start();
Task.WaitAll(tlist.ToArray());
list.Clear();
}
watch.Stop();
Console.WriteLine("rcvd {0} msec/message", watch.Elapsed.TotalMilliseconds / count);
Console.WriteLine("rcvd {0} message/sec", count / watch.Elapsed.TotalSeconds);
Console.WriteLine("press any key to continue ...");
Console.ReadKey();
}
static long __counter = 0;
}