1

タスク並列ライブラリを使い始めたばかりです。問題のタスクは、結果を可能な限り並行して処理することですが、結果の順序を維持することです。

また、フラグが設定されてこれ以上アイテムを受け付けないことを示すまで、いつでもアイテムを追加できます。

また、すべての結果が完了したら、一部のクライアントに通知する必要があります (これは、アイテムが受け入れられなくなった場合にのみ発生します)。

以下の単純化されたサンプルを思いつきました。これは、すべてのテストでうまくいくようです。

class Program
{
    static void Main(string[] args)
    {
        for (int i = 1; i < random.Next(5, 21); ++i)
        {
            AddItem(i);
        }

        finishedAddingItems = true;

        completion.Task.Wait();
        Console.WriteLine("Finished");
        Console.ReadKey();
    }

    static TaskCompletionSource<bool> completion = 
                            new TaskCompletionSource<bool>();

    static bool finishedAddingItems = false;

    static Random random = new Random();

    class QueueResult
    {
        public int data;
        public int IsFinished;
    }

    static ConcurrentQueue<QueueResult> queue = 
                           new ConcurrentQueue<QueueResult>();

    static object orderingLockObject = new object();

    static void AddItem(int i)
    {
        var queueItem = new QueueResult { data = i, IsFinished = 0 };

        queue.Enqueue(queueItem);

        Task.Factory
            .StartNew(() => 
            { 
                for (int busy = 0; 
                     busy <= random.Next(9000000, 90000000); 
                     ++busy) 
                { }; 
                Interlocked.Increment(ref queueItem.IsFinished); 
            })
            .ContinueWith(t =>
            {
                QueueResult result;

                //the if check outside the lock is to avoid tying up resources
                //needlessly, since only one continuation can actually process
                //the queue at a time.
                if (queue.TryPeek(out result) 
                    && result.IsFinished == 1)
                {
                    lock (orderingLockObject)
                    {
                        while (queue.TryPeek(out result) 
                               && result.IsFinished == 1)
                        {
                            Console.WriteLine(result.data);
                            queue.TryDequeue(out result);
                        }

                        if (finishedAddingItems && queue.Count == 0)
                        {
                            completion.SetResult(true);
                        }
                    }
                }
            });
    }
}

しかし、アイテムの処理に失敗する可能性のある潜在的な競合状態があるかどうかを自分で納得させるのに苦労していますか?

4

1 に答える 1

2

IsFinishedasvolatileを宣言せず、ロックの外で直接アクセスしているため、コードが正しく機能しない可能性があると思います。いずれにせよ、ダブルチェック ロックを正しく使用するのは難しいため、本当に必要でない限り使用しないでください。

また、あなたのコードも非常に混乱しており (1 つのクラスにすべてがあり、int代わりにbool、不要なを使用しているContinueWith()など)、少なくとも 1 つ以上のスレッド セーフの問題が含まれています (Randomはスレッド セーフではありません)。

以上のことから、TPL のより高度な部分について学ぶことをお勧めします。あなたの場合、PLINQ は適切なソリューションのように思えます。

var source = Enumerable.Range(1, random.Next(5, 21)); // or some other collection

var results = source.AsParallel()
                    .AsOrdered()
                    .WithMergeOptions(ParallelMergeOptions.NotBuffered)
                    .Select(i => /* perform your work here */);

foreach (int i in results)
    Console.WriteLine(i);
于 2013-01-26T18:57:18.657 に答える