整数1から5を返す2つのシーケンスがあるとしましょう。
最初は1、2、3を非常に速く返しますが、4と5はそれぞれ200msかかります。
public static IEnumerable<int> FastFirst()
{
for (int i = 1; i < 6; i++)
{
if (i > 3) Thread.Sleep(200);
yield return i;
}
}
2番目は200msの遅延で1、2、および3を返しますが、4と5は高速で返されます。
public static IEnumerable<int> SlowFirst()
{
for (int i = 1; i < 6; i++)
{
if (i < 4) Thread.Sleep(200);
yield return i;
}
}
これらの両方のシーケンスを結合すると、1から5までの数字が得られます。
FastFirst().Union(SlowFirst());
2つの方法のどちらがどの時点で遅延があるかを保証できないため、実行の順序で解決策を保証することはできません。したがって、私の例での(人為的な)遅延を最小限に抑えるために、ユニオンを並列化したいと思います。
実際のシナリオ:いくつかのエンティティを返すキャッシュと、すべてのエンティティを返すデータソースがあります。キャッシュされた結果ができるだけ速く生成されるように、リクエストをキャッシュとデータソースの両方に内部的に並列化するメソッドからイテレータを返すことができるようにしたいと思います。
注1:これはまだCPUサイクルを浪費していることに気づきました。シーケンスが遅い要素を反復処理するのをどのように防ぐことができるのか、できるだけ速くそれらを結合する方法を尋ねているのではありません。
更新1:複数のプロデューサーを受け入れ、ContinueWhenAllを使用してBlockingCollectionのCompleteAddingを1回だけ設定するように、achitakaさんのすばらしい応答を調整しました。コメントのフォーマットがないために失われるため、ここに配置しました。それ以上のフィードバックは素晴らしいでしょう!
public static IEnumerable<TResult> SelectAsync<TResult>(
params IEnumerable<TResult>[] producer)
{
var resultsQueue = new BlockingCollection<TResult>();
var taskList = new HashSet<Task>();
foreach (var result in producer)
{
taskList.Add(
Task.Factory.StartNew(
() =>
{
foreach (var product in result)
{
resultsQueue.Add(product);
}
}));
}
Task.Factory.ContinueWhenAll(taskList.ToArray(), x => resultsQueue.CompleteAdding());
return resultsQueue.GetConsumingEnumerable();
}