1

並列タスクのワークフロー

http://i.stack.imgur.com/luRnu.png

私が直面している問題について助けを得たいと思っています。問題は、ファイルのフォルダーを検索するために並列タスクを実行していることです。各タスクでは、ファイルを識別し、それをファイルの配列に追加する必要があります。次に、すべてのタスクが完了してファイルが集まるのを待ってから、結果の並べ替えを実行します。次に、ファイルごとに 1 つのタスクを実行してファイルを読み取り、一致するパターンを取得することにより、並べ替えられたファイルを個別に処理します。最終段階では、すべての結果を人間が読める形式にまとめて、ユーザー フレンドリーな方法で表示します。

問題は、UI スレッドをブロックしない適切な方法でタスクを連鎖させたいということです。プログラムがどの段階であっても、すべてキャンセルできるようにしたいと思います。

要約すると:

ステージ 1: フォルダーを検索してファイルを見つけます。各タスクは、フォルダー ツリーを再帰的に検索します。

ステージ 2: 見つかったすべてのファイルを並べ替え、重複をクリーンアップする

ステージ 3: ファイルを個別に処理する新しいタスクを開始します。各タスクはファイルを開き、一致するパターンを検索します。

ステージ 4: すべてのファイル検索の結果を 1 つの巨大な結果セットに集約し、人間が読みやすいようにします。

     List<Task> myTasks = new List<Task>();

// ==== stage 1 ======
        for(int i = 0; i < 10; i++) {
           string directoryName = directories[i];

           Task t = new Task(() =>
           {
              FindFiles(directoryName);
           });

           myTasks.Add(t);
            t.Start();
        }

// ==== stage 2 ====
        Task sortTask = Task.Factory.ContinueWhenAll(myTasks.ToArray(), (t) =>
        {
           if(_fileResults.Count > 1) {
              // sort the files and remove any duplicates
           }
        });

        sortTask.Wait();

// ==== stage 3 ====
        Task tt = new Task(() =>
        {
             Parallel.For(0, _fileResults.Count, new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount, CancellationToken = token, TaskScheduler = _taskScheduler },
                    (i, loopstate) => {
              // 1. open file
              // 2. read file
              // 3. read file line by line
              }
        }

// == stage 4 === 
        tt.ContinueWith((t) =>
        {
           // 1. aggregate the file results into one giant result set
           // 2. display the giant result set in human readable format
        }, token, TaskContinuationOptions.OnlyOnRanToCompletion, TaskScheduler.FromCurrentSynchronizationContext());

      tt.start();
4

2 に答える 2

0

この機能の使用を検討しましたかasync/await- 質問の内容からすると、この機能はお客様のニーズにぴったりです。これを使用して問題を簡単に試してみます。

try
{
    List<Task<File[]>> stage1Tasks = new List<Task<File[]>>();

    // ==== stage 1 ======
    for (int i = 0; i < 10; i++)
    {
        string directoryName = directories[i];

        Task<File[]> t = Task.Run(() =>
        {
            return FindFiles(directoryName);
        },
            token);

        stage1Tasks.Add(t);
    }

    File[][] files = await Task.WhenAll(stage1Tasks).ConfigureAwait(false);

    // Flatten files.
    File[] _fileResults = files.SelectMany(x => x).ToArray();

    // ==== stage 2 ====
    Task<File[]> sortFilesTask = Task.Run(() =>
    {
        if (_fileResults.Count > 1)
        {
            // sort the files and remove any duplicates
            return _fileResults.Reverse().ToArray();
        }
    },
        token);

    File[] _sortedFileResults = await sortFilesTask.ConfigureAwait(false);

    // ==== stage 3 ====
    Task<SomeResult[]> tt = Task.Run(() =>
    {
        SomeResult[] results = new SomeResult[_sortedFileResults.Length];
        Parallel.ForEach(_sortedFileResults,
            new ParallelOptions {
                MaxDegreeOfParallelism = Environment.ProcessorCount,
                CancellationToken = token,
                TaskScheduler = _taskScheduler
            },
            (i, loopstate) =>
            {
                // 1. open file
                // 2. read file
                // 3. read file line by line
                results[i] = new SomeResult( /* here goes your results for each file */);
            });
        return results;
    },
        token);

    SomeResult[] theResults = await tt.ConfigureAwait(false);


    // == stage 4 === 
    // 1. aggregate the file results into one giant result set
    // 2. display the giant result set in human readable format
    // ....

}
catch (TaskCanceledException)
{
    // some task has been cancelled...
}
于 2016-01-07T13:55:50.707 に答える