アイテムのストリームに非同期ラムダを適用することによって生成されるタスクの非同期ストリームがあります。
IAsyncEnumerable<int> streamOfItems = AsyncEnumerable.Range(1, 10);
IAsyncEnumerable<Task<string>> streamOfTasks = streamOfItems.Select(async x =>
{
await Task.Delay(100);
return x.ToString();
})
上記のメソッドはパッケージAsyncEnumerable.Range
からSelect
提供されます。System.Linq.Async
私が望む結果は、 として表される結果のストリームですIAsyncEnumerable<string>
。結果は、元のタスクと同じ順序でストリーミングする必要があります。また、特定の時間にアクティブになるタスクの数が指定された数を超えないように、ストリームの列挙を調整する必要があります。
型の拡張メソッドの形で解決策が欲しいIAsyncEnumerable<Task<T>>
ので、それを複数回チェーンして処理パイプラインを形成できます。TPL Dataflowパイプラインと機能的に似ていますが、流暢に表現できます。以下は、望ましい拡張メソッドのシグネチャです。
public async static IAsyncEnumerable<TResult> AwaitResults<TResult>(
this IAsyncEnumerable<Task<TResult>> source,
int concurrencyLevel);
引数としても受け入れるCancellationToken
ことは素晴らしい機能です。
更新:AwaitResults
完全を期すために、メソッドを 2 回チェーンすることによって形成された流暢な処理パイプラインの例を含めています。このパイプラインは、PLINQ と Linq.Async の混在が可能であることを示すために、PLINQ ブロックから始まります。
int[] results = await Partitioner
.Create(Enumerable.Range(1, 20), EnumerablePartitionerOptions.NoBuffering)
.AsParallel()
.AsOrdered()
.WithDegreeOfParallelism(2)
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
.Select(x =>
{
Thread.Sleep(100); // Simulate some CPU-bound operation
return x;
})
.ToAsyncEnumerable()
.Select(async x =>
{
await Task.Delay(300); // Simulate some I/O operation
return x;
})
.AwaitResults(concurrencyLevel: 5)
.Select(x => Task.Run(() =>
{
Thread.Sleep(100); // Simulate another CPU-bound operation
return x;
}))
.AwaitResults(concurrencyLevel: 2)
.ToArrayAsync();
Console.WriteLine($"Results: {String.Join(", ", results)}");
期待される出力:
結果: 1、2、3、4、5、6、7、8、9、10、11、12、13、14、15、16、17、18、19、20
注:振り返ってみると、AwaitResults
メソッドの名前はおそらく でMerge
、concurrencyLevel
引数の名前は であるはずmaxConcurrent
です。その機能はRxライブラリMerge
に存在する演算子に似ているからです。System.Interactive.Asyncパッケージには、を生成するという名前の演算子が含まれていますが、そのオーバーロードはいずれもソースに対して動作しません。ソース上で動作します。待機/マージ操作に必要なバッファーのサイズを明示的に制御するために、パラメーターを追加することもできます。Merge
IAsyncEnumerable<T>
IAsyncEnumerable<Task<T>>
IEnumerable<IAsyncEnumerable<TSource>>
IAsyncEnumerable<IAsyncEnumerable<TSource>>
bufferCapacity