ハイパースレッディングが有効になっている Windows7 クアッドコアでは、何百ものスレッドが通信してBlockingCollection<T>
います (すべてデフォルトのコンストラクターで初期化されているため、ConcurrentQueue<T>
内部で使用されます)。
すべてのスレッドは、1日に 4 から 20 のメッセージしか受信しないスレッドを除いて、毎秒 10 から 100 のメッセージを受信します。
私の問題は、この最後の消費者に関連しています。ほとんどの場合、新しいメッセージを待つためにブロックされますが、メッセージを消費する準備ができたら、できるだけ早く、おそらく即座に処理する必要があります。
問題は、メッセージがBlockingCollection
このコンシューマー専用に追加されると、数秒後に受信されることです (Take() は、メッセージがエンキューされてから 3 秒から 12 秒後に返されます)。
問題は、Windows がこのスレッドをスケジュールする方法に関連していると思います。
この消費者の ThreadPriority を改善せずに上げようとしました。次に、プロセッサのアフィニティを専用コアに設定しようとしました。他のすべてのスレッドのアフィニティを他のコアを使用するように変更しましたが、まだ改善されていません。
どうすればこの問題に対処できますか? 実際の問題は何ですか?
これはスレッド ループです (commands
は ですBlockingCollection
):
while (!commands.IsCompleted)
{
_log.Debug("Waiting for command.");
command = commands.Take();
_log.DebugFormat("Received: {0}", command);
command.ApplyTo(target);
_log.InfoFormat("Dispatched: {0}", command);
}
保留中のメッセージがない場合、を直接使用する必要ConcurrentQueue
がありますか?
CPU が 50% を超えて使用していないことに注意してください。
遅延については、ログによると、(ほとんどの場合)「Dispatched: ...」と「Waiting for command」の間に数秒が経過しています。(aka on commands.IsCompleted
) と "Waiting for command" の間の他のいくつか。および「受信: ...」 (別名 on commands.Take()
)
すべての「健全な」スレッドは I/O バウンドですが、協力しています。信じてください。私はそれらの設計を変更することはできません。ただし、それらはかなりうまく機能します。唯一の誤動作は、別の種類の作業を行う低負荷スレッドです。スレッドの優先度が悪であることは知っていますが、より良い解決策が見つかりませんでした。
問題を (ほぼ) 再現するテスト サーバー上で、問題をほぼ
再現
するテストを次に示します。テストでは CPU に負荷がかかる (すべて 100% になる) ため、「ほぼ」ですが、実際のアプリケーションではすべての CPU が 50% 未満です。
public class ThreadTest
{
class TimedInt
{
public int Value;
public DateTime Time;
}
[Test]
public void BlockingCollection_consumedWithLowLoad_delaySomeTimes()
{
// arrange:
int toComplete = 0;
BlockingCollection<KeyValuePair<DateTime, TimedInt>> results = new BlockingCollection<KeyValuePair<DateTime, TimedInt>>();
BlockingCollection<TimedInt> queue = new BlockingCollection<TimedInt>();
Action<int> producer = a =>
{
int i = 1;
int x = Convert.ToInt32(Math.Pow(a, 7));
while (i < 200000000)
{
if (i % x == 0)
{
queue.Add(new TimedInt { Time = DateTime.Now, Value = i });
Thread.SpinWait(100); // just to simulate a bit of actual work here
queue.Add(new TimedInt { Time = DateTime.Now, Value = i + 1 });
}
i++;
}
Interlocked.Decrement(ref toComplete);
};
Action consumer = () =>
{
Thread.CurrentThread.Priority = ThreadPriority.Highest; // Note that the consumer has an higher priority
Thread.CurrentThread.Name = "Consumer";
while (toComplete > 0)
{
TimedInt v;
if (queue.TryTake(out v, 1000))
{
DateTime now = DateTime.Now;
results.Add(new KeyValuePair<DateTime, TimedInt>(now, v));
}
}
};
// act:
List<Thread> threads = new List<Thread>();
threads.Add(new Thread(new ThreadStart(consumer)));
for (int i = 0; i < 200; i++)
{
var t = new Thread(new ThreadStart(() => producer(7 + (i % 3))));
t.Name = "Producer " + i.ToString();
threads.Add(t);
toComplete++;
}
threads.ForEach(t => t.Start());
threads.ForEach(t => t.Join());
// assert:
Assert.AreEqual(0, results.Where(kvp => (kvp.Key - kvp.Value.Time).TotalMilliseconds > 1000).Count());
}
}
何か案は?