2

アイテムのストリームに非同期ラムダを適用することによって生成されるタスクの非同期ストリームがあります。

IAsyncEnumerable<int> streamOfItems = AsyncEnumerable.Range(1, 10);
IAsyncEnumerable<Task<string>> streamOfTasks = streamOfItems.Select(async x =>
{
    await Task.Delay(100);
    return x.ToString();
})

上記のメソッドはパッケージAsyncEnumerable.RangeからSelect提供されます。System.Linq.Async

私が望む結果は、 として表される結果のストリームですIAsyncEnumerable<string>。結果は、元のタスクと同じ順序でストリーミングする必要があります。また、特定の時間にアクティブになるタスクの数が指定された数を超えないように、ストリームの列挙を調整する必要があります。

型の拡張メソッドの形で解決策が欲しいIAsyncEnumerable<Task<T>>ので、それを複数回チェーンして処理パイプラインを形成できます。TPL Dataflowパイプラインと機能的に似ていますが、流暢に表現できます。以下は、望ましい拡張メソッドのシグネチャです。

public async static IAsyncEnumerable<TResult> AwaitResults<TResult>(
    this IAsyncEnumerable<Task<TResult>> source,
    int concurrencyLevel);

引数としても受け入れるCancellationTokenことは素晴らしい機能です。


更新:AwaitResults完全を期すために、メソッドを 2 回チェーンすることによって形成された流暢な処理パイプラインの例を含めています。このパイプラインは、PLINQ と Linq.Async の混在が可能であることを示すために、PLINQ ブロックから始まります。

int[] results = await Partitioner
    .Create(Enumerable.Range(1, 20), EnumerablePartitionerOptions.NoBuffering)
    .AsParallel()
    .AsOrdered()
    .WithDegreeOfParallelism(2)
    .WithMergeOptions(ParallelMergeOptions.NotBuffered)
    .Select(x =>
    {
        Thread.Sleep(100); // Simulate some CPU-bound operation
        return x;
    })
    .ToAsyncEnumerable()
    .Select(async x =>
    {
        await Task.Delay(300); // Simulate some I/O operation
        return x;
    })
    .AwaitResults(concurrencyLevel: 5)
    .Select(x => Task.Run(() =>
    {
        Thread.Sleep(100); // Simulate another CPU-bound operation
        return x;
    }))
    .AwaitResults(concurrencyLevel: 2)
    .ToArrayAsync();

Console.WriteLine($"Results: {String.Join(", ", results)}");

期待される出力:

結果: 1、2、3、4、5、6、7、8、9、10、11、12、13、14、15、16、17、18、19、20


注:振り返ってみると、AwaitResultsメソッドの名前はおそらく でMergeconcurrencyLevel引数の名前は であるはずmaxConcurrentです。その機能はRxライブラリMergeに存在する演算子に似ているからです。System.Interactive.Asyncパッケージには、を生成するという名前の演算子が含まれていますが、そのオーバーロードはいずれもソースに対して動作しません。ソース上で動作します。待機/マージ操作に必要なバッファーのサイズを明示的に制御するために、パラメーターを追加することもできます。MergeIAsyncEnumerable<T>IAsyncEnumerable<Task<T>>IEnumerable<IAsyncEnumerable<TSource>>IAsyncEnumerable<IAsyncEnumerable<TSource>>bufferCapacity

4

1 に答える 1

3

これが私のAwaitResultsメソッドの実装です。これはSemaphoreSlim、同時実行レベルを制御するための とChannel<Task<TResult>>、非同期キューとして使用される に基づいています。ソースの列挙はIAsyncEnumerable<Task<TResult>>、ホット タスクをチャネルにプッシュするファイア アンド フォーゲット タスク (フィーダー) 内で行われます。また、セマフォが解放される各タスクに継続を添付します。

メソッドの最後の部分は、タスクがチャネルから 1 つずつキューから取り出され、順番に待機する譲歩ループです。このようにして、結果はソース ストリーム内のタスクと同じ順序で生成されます。

この実装では、各タスクを 2 回待機する必要があります。つまり、タイプ のソースには使用できIAsyncEnumerable<ValueTask<TResult>>ませValueTask

public async static IAsyncEnumerable<TResult> AwaitResults<TResult>(
    this IAsyncEnumerable<Task<TResult>> source,
    int concurrencyLevel = 1,
    [EnumeratorCancellation]CancellationToken cancellationToken = default)
{
    if (source == null) throw new ArgumentNullException(nameof(source));
    if (concurrencyLevel < 1)
        throw new ArgumentOutOfRangeException(nameof(concurrencyLevel));

    var semaphore = new SemaphoreSlim(concurrencyLevel - 1);
    var channelCapacity = Math.Max(1000, concurrencyLevel * 10);
    var tasksChannel = Channel.CreateBounded<Task<TResult>>(channelCapacity);
    var completionCts = CancellationTokenSource.CreateLinkedTokenSource(
        cancellationToken);

    // Feeder task: fire and forget
    _ = Task.Run(async () =>
    {
        try
        {
            await foreach (var task in source
                .WithCancellation(completionCts.Token).ConfigureAwait(false))
            {
                HandleTaskCompletion(task);
                await tasksChannel.Writer.WriteAsync(task, completionCts.Token)
                    .ConfigureAwait(false);
                await semaphore.WaitAsync(completionCts.Token)
                    .ConfigureAwait(false); // Acquire before MoveNextAsync
            }
            tasksChannel.Writer.Complete();
        }
        catch (Exception ex)
        {
            tasksChannel.Writer.Complete(ex);
        }
    });

    async void HandleTaskCompletion(Task task)
    {
        try
        {
            await task.ConfigureAwait(false);
        }
        catch
        {
            // Ignore exceptions here
        }
        finally
        {
            semaphore.Release();
        }
    }

    try
    {
        while (await tasksChannel.Reader.WaitToReadAsync(cancellationToken)
            .ConfigureAwait(false))
        {
            while (tasksChannel.Reader.TryRead(out var task))
            {
                yield return await task.ConfigureAwait(false);
                cancellationToken.ThrowIfCancellationRequested();
            }
        }
    }
    finally // Happens when the caller disposes the output enumerator
    {
        completionCts.Cancel();
    }
}

重要な詳細は、最終的な yielding ループの周りの try-finally ブロックです。これは、メソッドの呼び出し元が結果のストリームの列挙を時期尚早に放棄した場合に必要です。その場合、ソース ストリームの列挙も終了する必要があり、この終了はCancellationTokenSource. これがないと、フィーダー タスクが完了せず、オブジェクトがガベージ コレクションされず、メモリ リークが発生します。

注:をキャンセルしてcancellationTokenも、操作全体が即座にキャンセルされるとは限りません。最大限の応答性cancellationTokenを得るには、個々のタスクをキャンセルするために同じものを使用する必要があります。

于 2020-02-24T11:41:04.890 に答える