(注:.Net4.5ではなく.Net4を使用しているため、TPLのDataflowBlockクラスを使用できません。)
TL;DRバージョン
最終的には、無制限の出力バッファーを必要とせずに、最終出力での順序を維持する方法で、複数のスレッドを使用して順次作業項目を処理する方法を探しています。
動機
データの複数のブロックを処理するためのマルチスレッドメカニズムを提供する既存のコードがあります。1つのI/Oバウンドスレッド(「サプライヤー」)がデータのブロックを処理のためにキューに入れる責任があります。これらのデータブロックは、作業項目を構成します。
1つ以上のスレッド(「プロセッサー」)は、一度に1つの作業項目をデキューする役割を果たし、次の作業項目をデキューする前に、処理してから処理済みデータを出力キューに書き込みます。
最終的なI/Oバウンドスレッド(「コンシューマー」)は、完了した作業項目を出力キューからデキューし、それらを最終的な宛先に書き込む役割を果たします。これらの作業項目は、キューに入れられたのと同じ順序で書き込まれます(また、書き込まれる必要があります)。これは、各アイテムの優先度がソースインデックスによって定義される同時優先度キューを使用して実装しました。
私はこのスキームを使用して、大規模なデータストリームでカスタム圧縮を実行しています。この場合、圧縮自体は比較的低速ですが、非圧縮データの読み取りと圧縮データの書き込みは比較的高速です(I / Oバウンドではありますが)。
私は64Kのオーダーのかなり大きなチャンクでデータを処理するので、パイプラインのオーバーヘッドは比較的小さいです。
私の現在のソリューションはうまく機能していますが、6年前に多くの同期イベントを使用して記述された多くのカスタムコードが含まれており、設計はやや不格好なようです。したがって、私は、より近代的な.Netライブラリを使用して書き直すことができるかどうかを確認するために、学術的な演習に着手しました。
新しいデザイン
私の新しいデザインはクラスを使用しており、このMicrosoftの記事BlockingCollection<>
にある程度基づいています。
特に、「複数のプロデューサーを使用した負荷分散」というタイトルのセクションを参照してください。私はそのアプローチを使用しようとしました。したがって、それぞれが共有入力BlockingCollectionから作業項目を取得し、完了した項目を独自のBlockingCollection出力キューに書き込むいくつかの処理タスクがあります。
各処理タスクには独自の出力キューがあるためBlockingCollection.TakeFromAny()
、最初に使用可能な完了した作業項目をデキューするために使用しようとしています。
マルチプレクサの問題
これまでのところ良好ですが、ここで問題が発生します。Microsoftの記事は次のように述べています。
ギャップが問題です。パイプラインの次のステージである画像の表示ステージでは、画像を順番に、シーケンスにギャップなしで表示する必要があります。これがマルチプレクサの出番です。TakeFromAnyメソッドを使用して、マルチプレクサは両方のフィルタステージプロデューサキューからの入力を待機します。画像が到着すると、マルチプレクサは画像のシーケンス番号が予想されるシーケンスの次の番号であるかどうかを確認します。そうである場合、マルチプレクサはそれをDisplayImageステージに渡します。イメージがシーケンスの次のイメージでない場合、マルチプレクサは値を内部先読みバッファに保持し、先読み値を持たない入力キューに対してテイク操作を繰り返します。このアルゴリズムにより、マルチプレクサは、値をソートせずに順番を保証する方法で、着信プロデューサキューからの入力をまとめることができます。
さて、何が起こるかというと、処理タスクはほぼすべての順序で完成品を生産できるということです。マルチプレクサは、これらの項目を正しい順序で出力する役割を果たします。
でも...
処理するアイテムが1000個あると想像してください。さらに、奇妙な理由で、最初のアイテムが他のすべてのアイテムを組み合わせた処理に時間がかかることを想像してみてください。
私の現在のスキームを使用すると、マルチプレクサは、出力することになっている次のキューを見つけるまで、すべての処理出力キューからアイテムを読み取り、バッファリングし続けます。待機しているアイテムは(上記の「想像する」によると)他のすべての作業アイテムが処理された後にのみ表示されるため、入力全体のすべての作業アイテムを効果的にバッファリングします。
データ量が多すぎるため、これを実行できません。出力キューが特定の最大サイズに達したとき(つまり、制限された出力キュー)に、処理タスクが完了した作業項目を出力しないようにする必要があります。ただし、作業項目がマルチプレクサが待機しているものである場合を除きます。
そして、それは私が少し立ち往生しているところです。これを実際に実装する方法はたくさん考えられますが、それらはすべて、私が置き換えようと考えているコードよりも優れていないほど複雑すぎるようです。
私の質問は何ですか?
私の質問は:私はこれについて正しい方法で進んでいますか?
これはよく理解されている問題だと思いましたが、私の調査では、他のすべての作業項目と比較して、作業項目に非常に長い時間がかかる場合に発生する無制限のバッファリングの問題を無視しているように見える記事しか見つかりませんでした。
これを達成するための合理的な方法を説明している記事を誰かに教えてもらえますか?
TL;DRバージョン
最終的には、無制限の出力バッファーを必要とせずに、最終出力での順序を維持する方法で、複数のスレッドを使用して順次作業項目を処理する方法を探しています。