0

次の非同期キュー処理ルーティングがあります。

      var commandQueue = new BlockingCollection<MyCommand>();
      commandQueue
            .GetConsumingEnumerable()
            .ToObservable(new LimitedConcurrencyLevelTaskPoolScheduler(5))
            .Subscribe(c =>
                           {
                               try
                               {
                                   ProcessCommand(c);
                               }
                               catch (Exception ex)
                               {
                                   Trace.TraceError(ex.ToString());
                               }
                           }
            );

ある特定のシナリオ(データを取得しようとしているとき)では、外に出てデータを取得する前に、commandQueueが空であることを確認する必要があります。この操作は同期して行われることが期待されます。基本的にはこういうことをしたい

  public void GetData()
  {
     commandQueue.WaitForEmpty(); 

     // could potentially be expressed: 
     // while (commandQueue.Count > 0) Thread.Sleep(10);

     return GoGetTheData()
  }

理想的なシナリオでは、すべての呼び出し元が非同期で「GetData」を実行することを認識しています...ただし、同期して実行する必要がある場合もあります...したがって、一貫性を確保するためにコマンドキューが空になるのを待つ必要があります。私のデータの最新性。

ManualResetEventを使用してこれを非常に簡単に行う方法を知っていますが、System.Reactive/TPLを使用して簡単な方法があるかどうかを知りたいです。

ありがとう。

4

3 に答える 3

1

これは、最初に思われるよりも難しい質問です。生産者/消費者のジョブセマンティクスに必要BlockingCollectionな(およびその基礎となる)。ConcurrentQueueただし、「空の」信号を待つなど、これらのコレクションで何が起こっているかを観察できるようにする必要もあります。

最善の策は、ここから見JobQueueていくことです。ParallelJobQueue

http://social.msdn.microsoft.com/Forums/en-US/rx/thread/2817c6e5-e5a4-4aac-91c1-97ba7de88ff7

これには、WhenQueueEmpty同時に実行されるジョブとキューに入れられたジョブ(この場合はコマンドの概念と同義のジョブ)の監視可能であり、その数を制御できます。

于 2012-06-28T00:46:11.210 に答える
0

これ使ってもらえますか?

    var dataObservable = Observable.Start(() =>
    {
        commandQueue.WaitForEmpty(); 
        return GoGetTheData();
    });
于 2012-06-27T07:04:09.913 に答える
0

私にはあなたの要件は

  • データを非同期で取得する
  • このデータを並列処理します(最大5度の並列処理)
  • プロセスを繰り返します

これらが要件であり、BlockingCollectionを使用する必要がない場合、つまり既存のAPIでない場合は、Rxだけでこれを非常に簡単に解決できると思います。

var dataRequestScheduler = new EventLoopScheduler();
var subscription = GetTheData()
    .Repeat()
    .SubscribeOn(dataRequestScheduler)
    .ObserveOn(Scheduler.TaskPool)//new LimitedConcurrencyLevelTaskPoolScheduler(5)
    .Subscribe(c =>
           {
               try
               {
                   ProcessCommand(c);
               }
               catch (Exception ex)
               {
                   Trace.TraceError(ex.ToString());
               }
           }
        );

GetTheDataメソッドがIObservableを返す場所

カスタムスケジューラを必要とせずに、Observable.StartとMerge(5)を活用して、最大5つのスレッドを取得できる可能性があります。

于 2012-07-06T11:38:01.313 に答える