ポートとレシーバーを介して 15 の非同期操作を連鎖させています。これにより、スレッド間のメッセージング時間、特にタスクがデータをポートに投稿してから、新しいタスクが別のスレッドで同じデータの処理を開始するまでの時間に非常に懸念を抱くようになりました。各スレッドが開始時にアイドル状態であるという最良の状況を想定して、ストップ ウォッチ クラスを使用して、それぞれが 1 つのスレッドで最高の優先度で動作する 2 つの異なるディスパッチャからの時間を測定するテストを生成しました。
驚いたことに、私の開発リグは Windows 7 x64 を実行している Q6600 クアッド コア 2.4 Ghz コンピューターであり、私のテストでの平均コンテキスト スイッチ時間は 5.66 マイクロ秒で、標準偏差は 5.738 マイクロ秒で、最大値はほぼ 1.58 ミリ秒でした ( 282 倍!)。ストップウォッチの周波数は 427.7 ナノ秒なので、センサー ノイズからはまだ十分離れています。
私がやりたいことは、スレッド間のメッセージング時間を可能な限り短縮することです。同様に重要なことは、コンテキスト スイッチの標準偏差を減らすことです。Windows はリアルタイム OS ではなく、保証もありませんが、Windows スケジューラは公平なラウンド ロビンの優先度ベースのスケジュールであり、このテストの 2 つのスレッドは両方とも最高の優先度です (その必要がある唯一のスレッドは高い) したがって、スレッド上でコンテキスト スイッチが発生することはありません (1.58 ミリ秒の最大時間で明らかです... Windows の量子は 15.65 ミリ秒だと思いますか?) 私が考えることができる唯一のことは、OS 呼び出しのタイミングの変動です。スレッド間でメッセージを渡すために CCR によって使用されるロック メカニズムに。
他の誰かがスレッド間メッセージング時間を測定し、それを改善する方法について何か提案があれば教えてください。
私のテストのソースコードは次のとおりです。
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using Microsoft.Ccr.Core;
using System.Diagnostics;
namespace Test.CCR.TestConsole
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Starting Timer");
var sw = new Stopwatch();
sw.Start();
var dispatcher = new Dispatcher(1, ThreadPriority.Highest, true, "My Thread Pool");
var dispQueue = new DispatcherQueue("Disp Queue", dispatcher);
var sDispatcher = new Dispatcher(1, ThreadPriority.Highest, true, "Second Dispatcher");
var sDispQueue = new DispatcherQueue("Second Queue", sDispatcher);
var legAPort = new Port<EmptyValue>();
var legBPort = new Port<TimeSpan>();
var distances = new List<double>();
long totalTicks = 0;
while (sw.Elapsed.TotalMilliseconds < 5000) ;
int runCnt = 100000;
int offset = 1000;
Arbiter.Activate(dispQueue, Arbiter.Receive(true, legAPort, i =>
{
TimeSpan sTime = sw.Elapsed;
legBPort.Post(sTime);
}));
Arbiter.Activate(sDispQueue, Arbiter.Receive(true, legBPort, i =>
{
TimeSpan eTime = sw.Elapsed;
TimeSpan dt = eTime.Subtract(i);
//if (distances.Count == 0 || Math.Abs(distances[distances.Count - 1] - dt.TotalMilliseconds) / distances[distances.Count - 1] > 0.1)
distances.Add(dt.TotalMilliseconds);
if(distances.Count > offset)
Interlocked.Add(ref totalTicks,
dt.Ticks);
if(distances.Count < runCnt)
legAPort.Post(EmptyValue.SharedInstance);
}));
//Thread.Sleep(100);
legAPort.Post(EmptyValue.SharedInstance);
Thread.Sleep(500);
while (distances.Count < runCnt)
Thread.Sleep(25);
TimeSpan exTime = TimeSpan.FromTicks(totalTicks);
double exMS = exTime.TotalMilliseconds / (runCnt - offset);
Console.WriteLine("Exchange Time: {0} Stopwatch Resolution: {1}", exMS, Stopwatch.Frequency);
using(var stw = new StreamWriter("test.csv"))
{
for(int ix=0; ix < distances.Count; ix++)
{
stw.WriteLine("{0},{1}", ix, distances[ix]);
}
stw.Flush();
}
Console.ReadKey();
}
}
}