3

複数のプロデューサーから複数のスレッドでイベント/アイテムを取得し、それらすべてを単一のスレッドで消費できるキューを実装したいと思います。このキューはいくつかの重要な環境で機能するので、私はそれの安定性に非常に関心があります。

Rx機能を使用して実装しましたが、2つの質問があります。

  1. この実装は大丈夫ですか?それとも、私が知らない何らかの欠陥があるのでしょうか?(代替として-キューとロックを使用した手動実装)
  2. Dispatcherのバッファ長はどれくらいですか?キューに入れられたアイテムの100kを処理できますか?

以下のコードは、単純なTestMethodを使用した私のアプローチを示しています。その出力は、すべての値が異なるスレッドから入力されているが、別の単一のスレッドで処理されていることを示しています。

[TestMethod()]
public void RxTest()
{
    Subject<string> queue = new Subject<string>();

    queue
        .ObserveOnDispatcher()
        .Subscribe(s =>
                        {
                            Debug.WriteLine("Value: {0}, Observed on ThreadId: {1}", s, Thread.CurrentThread.ManagedThreadId);
                        },
                    () => Dispatcher.CurrentDispatcher.InvokeShutdown());

    for (int j = 0; j < 10; j++)
    {
        ThreadPool.QueueUserWorkItem(o =>
        {
            for (int i = 0; i < 100; i++)
            {
                Thread.Sleep(10);
                queue.OnNext(string.Format("value: {0}, from thread: {1}", i.ToString(), Thread.CurrentThread.ManagedThreadId));
            }
            queue.OnCompleted();
        });
    }


    Dispatcher.Run();
}
4

2 に答える 2

4

Subject非常にマルチスレッド化されたシナリオでの動作についてはよくわかりません。あなたが話している状況では、のようなものBlockingCollection(およびその基礎となるもの)がよく着用されていると想像できます。ConcurrentQueue起動も簡単です。

var queue = new BlockingCollection<long>();

// subscribing
queue.GetConsumingEnumerable()
     .ToObservable(Scheduler.NewThread)
     .Subscribe(i => Debug.WriteLine("Value: {0}, Observed on ThreadId: {1}", i, Thread.CurrentThread.ManagedThreadId));

// sending
Observable.Interval(TimeSpan.FromMilliseconds(500), Scheduler.ThreadPool)
          .Do(i => Debug.WriteLine("Value: {0}, Sent on ThreadId: {1}", i, Thread.CurrentThread.ManagedThreadId))
          .Subscribe(i => queue.Add(i));

あなたは確かにキューやロックに触れたくありません。実装は優れており、ConcurrentQueueあなたが話しているサイズのキューを効果的に処理します。

于 2012-07-02T11:27:21.133 に答える
3

を見てくださいEventLoopScheduler。RXに組み込まれており、必要なことはすべて実行できると思います。

任意の数のオブザーバブルを取得して、.ObserveOn(els)elsのインスタンスであるEventLoopScheduler)を呼び出すことができます。これで、複数のスレッドから1つのスレッドに複数のオブザーバブルをマーシャリングし、各呼び出しをOnNextシリアルにキューに入れることができます。

于 2012-07-02T11:52:55.097 に答える