8

複数の非同期I/Oタスクを並行して実行する必要があるが、同時に実行されているのはXI/Oプロセスのみであることを確認する必要がある場合はどうなりますか。また、I/O処理の前後のタスクにそのような制限はありません。

これがシナリオです-1000のタスクがあるとしましょう。それぞれが入力パラメータとしてテキスト文字列を受け入れます。そのテキストを変換し(I / O処理前)、変換されたテキストをファイルに書き込みます。目標は、前処理ロジックがCPU /コアの100%を利用し、タスクのI / O部分が最大10度の並列処理(一度にファイルを書き込むために同時に開かれる最大10)で実行されるようにすることです。

C#/ .NET 4.5でそれを行う方法のサンプルコードを提供できますか?

http://blogs.msdn.com/b/csharpfaq/archive/2012/01/23/using-async-for-file-access-alan-berman.aspx

4

3 に答える 3

9

これにTPLDataflowを使用するのは良い考えだと思います。つまり、無制限の並列処理を備えた前処理ブロックと後処理ブロック、制限された並列処理を備えたファイル書き込みブロックを作成し、それらをリンクします。何かのようなもの:

var unboundedParallelismOptions =
    new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
    };

var preProcessBlock = new TransformBlock<string, string>(
    s => PreProcess(s), unboundedParallelismOptions);

var writeToFileBlock = new TransformBlock<string, string>(
    async s =>
            {
                await WriteToFile(s);
                return s;
            },
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10 });

var postProcessBlock = new ActionBlock<string>(
    s => PostProcess(s), unboundedParallelismOptions);

var propagateCompletionOptions =
    new DataflowLinkOptions { PropagateCompletion = true };

preProcessBlock.LinkTo(writeToFileBlock, propagateCompletionOptions);
writeToFileBlock.LinkTo(postProcessBlock, propagateCompletionOptions);

// use something like await preProcessBlock.SendAsync("text") here

preProcessBlock.Complete();
await postProcessBlock.Completion;

WriteToFile()このように見える場所:

private static async Task WriteToFile(string s)
{
    using (var writer = new StreamWriter(GetFileName()))
        await writer.WriteAsync(s);
}
于 2012-05-29T16:13:31.770 に答える
1

タスクの開始へのアクセスを制御するために、Djikstraセマフォを検討する必要があるようです。

ただし、これは一般的なキュー/固定数のコンシューマーのような問題のように聞こえます。これは、それを構造化するためのより適切な方法である可能性があります。

于 2012-05-29T14:40:39.200 に答える
1

最大の並列度を設定できる拡張メソッドを作成します。SemaphoreSlimがここでの救世主になります。

    /// <summary>
    /// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
    /// </summary>
    /// <typeparam name="T">Type of IEnumerable</typeparam>
    /// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
    /// <param name="action">an async <see cref="Action" /> to execute</param>
    /// <param name="maxDegreeOfParallelism">Optional, An integer that represents the maximum degree of parallelism,
    /// Must be grater than 0</param>
    /// <returns>A Task representing an async operation</returns>
    /// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
    public static async Task ForEachAsyncConcurrent<T>(
        this IEnumerable<T> enumerable,
        Func<T, Task> action,
        int? maxDegreeOfParallelism = null)
    {
        if (maxDegreeOfParallelism.HasValue)
        {
            using (var semaphoreSlim = new SemaphoreSlim(
                maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
            {
                var tasksWithThrottler = new List<Task>();

                foreach (var item in enumerable)
                {
                    // Increment the number of currently running tasks and wait if they are more than limit.
                    await semaphoreSlim.WaitAsync();

                    tasksWithThrottler.Add(Task.Run(async () =>
                    {
                        await action(item).ContinueWith(res =>
                        {
                            // action is completed, so decrement the number of currently running tasks
                            semaphoreSlim.Release();
                        });
                    }));
                }

                // Wait for all tasks to complete.
                await Task.WhenAll(tasksWithThrottler.ToArray());
            }
        }
        else
        {
            await Task.WhenAll(enumerable.Select(item => action(item)));
        }
    }

使用例:

await enumerable.ForEachAsyncConcurrent(
    async item =>
    {
        await SomeAsyncMethod(item);
    },
    5);
于 2018-05-09T23:10:26.960 に答える