私には、これは TPL の問題のように思えます。保存されている既知のデータ セットがあります。負荷の高い処理を分割して並行して実行し、負荷をバッチ処理できるようにしたいと考えています。
あなたの問題のどこにも、非同期であるソース、動いているデータであるソース、またはリアクティブである必要があるコンシューマーは見当たりません。これが、代わりに TPL を使用することを提案する私の根拠です。
別のメモとして、マジック ナンバー 10 を並列処理するのはなぜですか? これはビジネス要件ですか、それともパフォーマンスを最適化するための試みですか? 通常、TaskPool がコア数と現在の負荷に基づいてクライアント CPU に最適なものを見つけられるようにすることがベスト プラクティスです。これは、デバイスとその CPU 構造 (シングル コア、マルチ コア、メニー コア、低電力/無効なコアなど) の大きなバリエーションにより、ますます重要になると思います。
LinqPad で実行できる方法の 1 つを次に示します (ただし、Rx がないことに注意してください)。
void Main()
{
var source = new List<Item>();
for (int i = 0; i < 100; i++){source.Add(new Item(i));}
//Put into batches of ten, but only then pass on the item, not the temporary tuple construct.
var batches = source.Select((item, idx) =>new {item, idx} )
.GroupBy(tuple=>tuple.idx/10, tuple=>tuple.item);
//Process one batch at a time (serially), but process the items of the batch in parallel (concurrently).
foreach (var batch in batches)
{
"Processing batch...".Dump();
var results = batch.AsParallel().Select (item => item.Process());
foreach (var result in results)
{
result.Dump();
}
"Processed batch.".Dump();
}
}
public class Item
{
private static readonly Random _rnd = new Random();
private readonly int _id;
public Item(int id)
{
_id = id;
}
public int Id { get {return _id;} }
public double Process()
{
var threadId = Thread.CurrentThread.ManagedThreadId;
string.Format("Processing on thread:{0}", threadId).Dump(Id);
var loopCount = _rnd.Next(10000,1000000);
Thread.SpinWait(loopCount);
return _rnd.NextDouble();
}
public override string ToString()
{
return string.Format("Item:{0}", _id);
}
}
移動中のデータの問題または反応的な消費者の問題があるかどうかを知りたいのですが、説明を簡単にするために質問を「軽視」しただけです。