複数のプロデューサーから複数のスレッドでイベント/アイテムを取得し、それらすべてを単一のスレッドで消費できるキューを実装したいと思います。このキューはいくつかの重要な環境で機能するので、私はそれの安定性に非常に関心があります。
Rx機能を使用して実装しましたが、2つの質問があります。
- この実装は大丈夫ですか?それとも、私が知らない何らかの欠陥があるのでしょうか?(代替として-キューとロックを使用した手動実装)
- Dispatcherのバッファ長はどれくらいですか?キューに入れられたアイテムの100kを処理できますか?
以下のコードは、単純なTestMethodを使用した私のアプローチを示しています。その出力は、すべての値が異なるスレッドから入力されているが、別の単一のスレッドで処理されていることを示しています。
[TestMethod()]
public void RxTest()
{
Subject<string> queue = new Subject<string>();
queue
.ObserveOnDispatcher()
.Subscribe(s =>
{
Debug.WriteLine("Value: {0}, Observed on ThreadId: {1}", s, Thread.CurrentThread.ManagedThreadId);
},
() => Dispatcher.CurrentDispatcher.InvokeShutdown());
for (int j = 0; j < 10; j++)
{
ThreadPool.QueueUserWorkItem(o =>
{
for (int i = 0; i < 100; i++)
{
Thread.Sleep(10);
queue.OnNext(string.Format("value: {0}, from thread: {1}", i.ToString(), Thread.CurrentThread.ManagedThreadId));
}
queue.OnCompleted();
});
}
Dispatcher.Run();
}