0

次の方法で、AsParallel を WithDegreeOfParallelism および WithCancellation と組み合わせて使用​​しています

AsParallel().WithCancellation(cs.Token).WithDegreeOfParallelism(2)

これが私の理解です。一度に処理されるのは、着信シーケンスのうち 2 つだけです。リクエストの 1 つが完了すると、さらに多くのアイテムが処理されます。ただし、キャンセル要求が開始された場合、まだピックアップされていない着信キューのアイテムはまったく処理されません。この理解に基づいて、次のコードを作成しました。

class Employee
    {
        public int ID { get; set;}
        public string FirstName { get; set;}
        public string LastName { get; set;}
    }

    class Program
    {

        private static List<Employee> _Employees;
        static CancellationTokenSource cs = new CancellationTokenSource();
        static Random rand = new Random();

        static void Main(string[] args)
        {
            _Employees = new List<Employee>() 
            {
                new Employee() { ID = 1, FirstName = "John", LastName = "Doe" },
                new Employee() { ID = 2, FirstName = "Peter", LastName = "Saul" },
                new Employee() { ID = 3, FirstName = "Mike", LastName = "Sue" },
                new Employee() { ID = 4, FirstName = "Catherina", LastName = "Desoza" },
                new Employee() { ID = 5, FirstName = "Paul", LastName = "Smith" },
                new Employee() { ID = 6, FirstName = "Paul2", LastName = "Smith" },
                new Employee() { ID = 7, FirstName = "Paul3", LastName = "Smith" },
                new Employee() { ID = 8, FirstName = "Paul4", LastName = "Smith" },
                new Employee() { ID = 9, FirstName = "Paul5", LastName = "Smith" },
                new Employee() { ID = 10, FirstName = "Paul6", LastName = "Smith" },
                new Employee() { ID = 5, FirstName = "Paul7", LastName = "Smith" }
            };

            try
            {
                var tasks = _Employees.AsParallel().WithCancellation(cs.Token).WithDegreeOfParallelism(2).Select(x => ProcessThisEmployee(x, cs.Token)).ToArray();
                Console.WriteLine("Now waiting");
                Thread.Sleep(1000);
                cs.Cancel();
                Task.WaitAll(tasks);
            }
            catch (AggregateException ae)
            {
                // error handling code
                Console.WriteLine("something bad happened");
            }
            catch (Exception ex)
            {
                // error handling code
                Console.WriteLine("something even worst happened");
            }
            // other stuff
            Console.WriteLine("All Done");
        }

        private static async Task ProcessThisEmployee(Employee x, CancellationToken token)
        {
            if (token.IsCancellationRequested)
            {
                Console.WriteLine(string.Format("ThreadID = {0} -> Employee {1} -> Cancelled", System.Threading.Thread.CurrentThread.ManagedThreadId));
                return;
            }
            int Sleep = rand.Next(800, 2000);
            Console.WriteLine(string.Format("ThreadID = {0} -> Employee {1} -> Sleeping for {2}", System.Threading.Thread.CurrentThread.ManagedThreadId, x.ID, Sleep));
            await TaskEx.Run(() => System.Threading.Thread.Sleep(Sleep));

            Console.WriteLine(string.Format("ThreadID = {0} -> Employee {1} finished", System.Threading.Thread.CurrentThread.ManagedThreadId, x.ID));
        }

    }

実行したときの出力は次のとおりです。

ThreadID = 3 -> 従業員 1 -> 1058 の
スリープ ThreadID = 1 -> 従業員 7 -> 1187 の
スリープ ThreadID = 1 -> 従業員 8 -> 1296 の
スリープ ThreadID = 1 -> 従業員 9 -> 1614 の
スリープ ThreadID = 1 -> 従業員 10 -> 1607
ThreadID = 1 の間スリープ中 -> 従業員 5 -> 1928
ThreadID = 3 の間スリープ中 -> 従業員 2 -> 1487
ThreadID = 3 の間スリープ中 -> 従業員 3 -> 1535 の
間スリープ中 ThreadID = 3 - > 従業員 4 -> 1265
ThreadID = 3 の間スリープ中 -> 従業員 5 -> 1248
ThreadID = 3 の間スリープ中 -> 従業員 6 -> 807 の間スリープ中 ThreadID = 3
を待機中
-> 従業員 6 終了
ThreadID = 4 ->従業員 1 終了
ThreadID = 5 -> 従業員 7 終了
ThreadID = 6 -> 従業員 8 終了
ThreadID = 3 -> 従業員 5 終了
ThreadID = 4 -> 従業員 9 終了
ThreadID = 5 -> 従業員 10 終了
ThreadID = 6 -> 従業員 5 終了
ThreadID = 3 -> 従業員 4 終了
ThreadID = 7 -> 従業員 2 終了
ThreadID = 8 -> 従業員 3 終了
すべて完了

これが私の問題です(私の理解によると)。

  1. 一部の従業員の ProcessThisEmployee はキャンセルされるためまったく呼び出されないことを期待していましたが、すべての従業員に対して呼び出されます
  2. ProcessThisEmployee メソッドが呼び出されても、次のコード パスを通過しますが、これも発生していません。

    if ( token.IsCancellationRequested )
    {
        Console.WriteLine(string.Format("ThreadID = {0} -> Employee {1} -> Cancelled",System.Threading.Thread.CurrentThread.ManagedThreadId));
        return;
    }
    

それで、ProcessThisEmployee を変更し、基本的に次のように Sleep の後に token.IsCancellationRequested メッセージを移動しました。

private static async Task ProcessThisEmployee(Employee x, CancellationToken token)
{

    int Sleep = rand.Next(800, 2000);
    Console.WriteLine(string.Format("ThreadID = {0} -> Employee {1} -> Sleeping for {2}", System.Threading.Thread.CurrentThread.ManagedThreadId, x.ID, Sleep));
    await TaskEx.Run(() => System.Threading.Thread.Sleep(Sleep));
    if (token.IsCancellationRequested)
    {
        Console.WriteLine(string.Format("ThreadID = {0} -> Employee {1} -> Cancelled", System.Threading.Thread.CurrentThread.ManagedThreadId));
        return;
    }
    Console.WriteLine(string.Format("ThreadID = {0} -> Employee {1} finished", System.Threading.Thread.CurrentThread.ManagedThreadId, x.ID));
}

これで、次の出力が得られます。

ThreadID = 3 -> Employee 1 -> Sleeping for 1330  
ThreadID = 1 -> Employee 7 -> Sleeping for 1868  
ThreadID = 3 -> Employee 2 -> Sleeping for 903  
ThreadID = 3 -> Employee 3 -> Sleeping for 1241  
ThreadID = 3 -> Employee 4 -> Sleeping for 1367  
ThreadID = 3 -> Employee 5 -> Sleeping for 1007  
ThreadID = 3 -> Employee 6 -> Sleeping for 923  
ThreadID = 1 -> Employee 8 -> Sleeping for 1032  
ThreadID = 1 -> Employee 9 -> Sleeping for 1948  
ThreadID = 1 -> Employee 10 -> Sleeping for 1456  
ThreadID = 1 -> Employee 5 -> Sleeping for 1737  
Now waiting  
ThreadID = 5 -> Employee 2 finished  
ThreadID = 3 -> Employee 6 finished  
something bad happened  
All Done  

私の質問は、このワークフローについて何を誤解しているのかということです。私は基本的に、長時間実行される操作を行わずに、できるだけ早く操作をキャンセルしたいと考えています (この場合のスリープは単なる例ですが、非常に高価なものになる可能性があります)。

4

1 に答える 1

2

そのコードにはいくつかの問題があります。

1.)ToArray()シーケンスを実体化します。つまり、ソース シーケンスからのすべての入力がSelect(...).

その後呼び出すので、開始時にすぐにcs.Cancel()トリガーされませんtoken.IsCancellationRequestedProcessThisEmployee

2.)良さそうに見えますが、最初の return または最初の await に達するとすぐに戻る非同期メソッドであるWithDegreeOfParallelism(2).Select(x => ProcessThisEmployee(x, cs.Token))ため、実際にはやりたいことを実際には行っていません。ProcessThisEmployee

おそらくやりたいことは、ProcessThisEmployee2 度の並列処理だけで長期実行メソッドを実行することです。Tasks実際に行うことは、 2 度の並列処理のみを使用して一連の を作成することです。その後、タスク自体はすべて同時に実行されます。

コンテキストがわからないため、特定のケースでこれを修正する方法がわかりません。しかし、おそらくこれはすでにあなたを少し助けています.


コメントへの返信の更新: ToArray を実行しています。ProcessThisEmployee は非同期メソッドです。これは、このコードがライブラリの一部になり、WPF アプリケーションから使用できるためです。エンドユーザーは UI の更新を提供したいかもしれないので、操作が完了するまでブロックしたくありません。 (john smith)

本質的に非同期ではないもの、つまり主にファイル、ネットワーク、またはデータベースへのアクセスに対して async ラッパーを作成しないでください。ライブラリを使用している開発者が非同期コンテキストで何かを呼び出したい場合でも、await Task.Run(...). 詳細については、同期メソッドの非同期ラッパーを公開する必要があるかどうかに関するこの記事を参照してください。

私の目には、PLINQ は、既に動作している LINQ クエリがあり、そのクエリが並列処理に適しているため高速化したい場合に最も役立ちます。

あなたのケースで最も簡単な方法は、2 つのスレッドを使用する作業キューである可能性があります。これらの例がウェブにあると確信しています。

于 2015-01-14T14:58:46.777 に答える