私が解決しなければならない問題は、(私の理解から)典型的な生産者/消費者問題です。24時間年中無休でデータを受信しています。着信データ(生データと呼びます)はテーブルに保存され、エンドユーザーは使用できません。次に、処理されていないすべての生データを選択し、1つずつ処理を開始します。データの各ユニットが処理された後、そのデータは別のテーブルに格納され、クライアントアプリケーションで使用できるようになります。生データのロードから処理済みデータの永続化までのプロセスには、平均で2〜5秒かかります。ただし、データの処理に使用するサードパーティのWebサービスに大きく依存しています。Webサービスが遅い場合、データを取得してバックログを蓄積するほど速くデータを処理できなくなるため、お客様はライブフィードを失うことになります。このプロセスをマルチスレッドプロセスにします。
LOADING-無期限に実行され、未処理のデータをDBから
BlockingCollection<T>
(または並行コレクションの他のバリエーション)にロードするローダータスク(プロデューサー)。私が選んだのBlockingCollection
は、それが生産者/消費者パターンを念頭に置いて設計されており、GetConsumingEnumerable()
方法を提供しているという事実によるものです。処理-上記のデータを消費する複数のコンシューマー
BlockingCollection<T>
。現在の実装では、各反復で2つのタスク継続でタスクを開始するParallel.ForEach
ループGetConsumingEnumerable()
があります。タスクの最初のステップは、サードパーティのWebサービスを呼び出し、結果を待って、2番目のタスクが消費する結果を出力することです。2番目のタスクは、最初のタスクの出力に基づいて計算を行い、3番目のタスクの結果を出力します。3番目のタスクは、基本的にその結果を2番目のタスクに格納しますBlockingCollection<T>
(これは出力コレクションです)。ですから、私の消費者は事実上生産者でもあります。理想的には、タスク1によってロードされたデータの各ユニットは、並列処理のためにキューに入れられます。永続化-単一のコンシューマーが
BlockingCollection
上記の2番目のコンシューマーに対して実行し、処理されたデータをデータベースに永続化します。
私が直面している問題は、上記のリストのアイテム番号2です。(を使用するだけでは)十分に高速ではないようですParallel.ForEach
。Parallel.ForEach
継続してタスクを直接開始するのではなく、内部で試しました。ラッピングスレッドを開始すると、処理タスクが開始されます。しかし、スレッド数が制御不能になり、すぐに1200に達したため、これによりOutOfMemory例外が発生しました。また、無駄なスレッドプールを使用して作業をスケジュールしてみました。
私のアプローチが私たちが行う必要のあることに対して十分に良いかどうか、またはそれを行うためのより良い方法はありますか?