3

私には、かなり高速でデータを生成する大量のセンサーと、それを消費する必要のある消費者がいるプログラムがあります。消費者は非常に異なる割合で消費します。

IObserver / IObservableを使用しているので、簡単な解決策は、イベントごとにタスクを作成し、OnNext()呼び出しとデータをlamdaでラップすることでした。これは非常にうまく機能しました。生の呼び出しよりもオーバーヘッドが少ないことに驚きました。

問題は、これらのコンシューマーの一部は、厳密に適用されるイベントの順序を必要とし、イベントを見逃すことはできないということです。「PerferFairness」は十分ではありません。

私が思いついた最善の解決策は、event / OnNext()ペアをラップする代わりに、InsertをParallelQueueにラップし、コンシューマーごとに1つのキューを作成し、キューのもう一方の端にスレッドを作成してOnNext()を作成することです。呼び出します。

このアプローチに関する3つの差し迫った問題。Task / OnNext()ラッピングソリューションよりもはるかに低速です。ParallelQueueにはブロッキングデキューがない(またはありますか?)ため、実装には少し注意が必要です。3つ目は、これは非常に一般的な問題のように思われるため、見逃した順序を強制する方法がないことを想像できません。たとえば、複数のタスクファクトリが基になるプールを共有し、各ファクトリに厳密に強制する設定があります。注文。

誰かが私がやろうとしていることを達成するための適切な方法を知っていますか?

編集:コンシューマーまたはプロデューサーごとのスレッドを含むソリューションは機能しません。生産者/消費者は長いチェーンを形成し、それぞれが数百あります。

4

2 に答える 2

4

TPL DataFlowライブラリは、アプリケーションに適している場合があります。TPLをデータフローパラダイムで拡張し、処理グラフを構成して高性能の実行環境で実行できるようにします。

TPL Dataflowは、.NET 4上のライブラリとして利用可能であり、.NET4.5の一部として出荷される必要があります。

于 2011-11-07T13:13:58.490 に答える
1

半順序を強制するための最良の抽象化についてのコメントはありませんが、BlockingCollection<T>ラッパーを使用すると、要素をデキューConcurrentQueue<T>するためのブロック操作が可能になります。Take例えば:

// the default is ConcurrentQueue, so you don't have to specify, but if you
// wanted different behavior you could use e.g. ConcurrentStack

var coll = new BlockingCollection<int>(new ConcurrentQueue<int>());

coll.Add(5); // blocks if the collection is at max capacity

int five = coll.Take(); // blocks if the collection is empty
于 2010-10-29T19:48:36.707 に答える