4

現在、ビデオから読み取ったフレームを 1 つずつ処理し、ファイルに書き込みます。これは非効率的で遅いように見えるので、作業を複数のスレッドに分割したいと考えています。

私の現在のコードは次のように要約できます。

for(long n = 0; n < totalframes; n++) {
    using(Bitmap frame = vreader.ReadVideoFrame()) {
        Process(frame); //awfully slow
        WriteToFile(frame);
    }
}

たとえば、4 つのフレームをロードし、それらを 4 つのスレッドで処理し、すべてが終了するのを待ってから、ファイルに書き込むにはどうすればよいでしょうか? フレームは、ビデオとまったく同じ順序で書かれていることが重要です。

4

7 に答える 7

3

これは、パイプラインが必要な場所です。並列プログラミングのパターンからコードをほとんど直接コピーし、手順 2 で追加の並列処理を導入しました (並列タスクと PLINQ の両方を使用する例を含めました)。それほど複雑ではなく、機能します。私のボックスでは、シーケンシャル バージョンよりも何倍も高速に動作します。あなたのコードでは同じ程度の改善が見られないかもしれませんが (あなたのコードProcessは よりも少し複雑であると推測しているためThread.Sleep)、それでも実行速度は速くなります。

明らかに、追加の並列処理とオブジェクト モデルを一致させようとしているために、多くの混乱が生じています。オリジナルの簡単なサンプル コードについては、Patterns of Parallel Programming の 55 ページを参照してください。これは美しいものなので、ぜひチェックしてみてください ( http://www.microsoft.com/en-au/download/details.aspx?id=19222 )。

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace PipelineExample
{
    /// <summary>
    /// Stack Overflow question 16882318.
    /// </summary>
    public class Program
    {
        /// <summary>
        /// This is our simulated "file". In essense it will contain the
        /// ID of each Frame which has been processed and written to file.
        /// </summary> 
        private static readonly List<long> FrameFile = new List<long>();

        /// <summary>
        /// This is a modification of Stephen Toub's Pipelines
        /// example from Patterns Of Parallel Programming.
        /// </summary>
        private static void RunPipeline(VReader vreader, long totalframes)
        {
            var rawFrames = new BlockingCollection<Bitmap>();
            var processedFrames = new BlockingCollection<Bitmap>();

            // Stage 1: read raw frames.
            var readTask = Task.Run(() =>
            {
                try
                {
                    for (long n = 0; n < totalframes; n++)
                    {
                        rawFrames.Add(vreader.ReadVideoFrame());
                    }
                }
                finally { rawFrames.CompleteAdding(); }
            });

            // Stage 2: process frames in parallel.
            var processTask = Task.Run(() =>
            {
                try
                {
                    // Try both - see which performs better in your scenario.
                    Step2WithParallelTasks(rawFrames, processedFrames);
                    //Step2WithPLinq(rawFrames, processedFrames);
                }
                finally { processedFrames.CompleteAdding(); }
            });

            // Stage 3: write results to file and dispose of the frame.
            var writeTask = Task.Run(() =>
            {
                foreach (var processedFrame in processedFrames.GetConsumingEnumerable())
                {
                    WriteToFile(processedFrame);
                    processedFrame.Dispose();
                }
            });

            Task.WaitAll(readTask, processTask, writeTask);
        }

        /// <summary>
        /// Processes frames in rawFrames and adds them to
        /// processedFrames preserving the original frame order.
        /// </summary>
        private static void Step2WithPLinq(BlockingCollection<Bitmap> rawFrames, BlockingCollection<Bitmap> processedFrames)
        {
            Console.WriteLine("Executing Step 2 via PLinq.");

            var processed = rawFrames.GetConsumingEnumerable()
                .AsParallel()
                .AsOrdered()
                .Select(frame =>
                {
                    Process(frame);
                    return frame;
                });

            foreach (var frame in processed)
            {
                processedFrames.Add(frame);
            }
        }

        /// <summary>
        /// Processes frames in rawFrames and adds them to
        /// processedFrames preserving the original frame order.
        /// </summary>
        private static void Step2WithParallelTasks(BlockingCollection<Bitmap> rawFrames, BlockingCollection<Bitmap> processedFrames)
        {
            Console.WriteLine("Executing Step 2 via parallel tasks.");

            var degreesOfParallellism = Environment.ProcessorCount;
            var inbox = rawFrames.GetConsumingEnumerable();

            // Start our parallel tasks.
            while (true)
            {
                var tasks = inbox
                    .Take(degreesOfParallellism)
                    .Select(frame => Task.Run(() =>
                    {
                        Process(frame);
                        return frame;
                    }))
                    .ToArray();

                if (tasks.Length == 0)
                {
                    break;
                }

                Task.WaitAll(tasks);

                foreach (var t in tasks)
                {
                    processedFrames.Add(t.Result);
                }
            }
        }

        /// <summary>
        /// Sequential implementation - as is (for comparison).
        /// </summary>
        private static void RunSequential(VReader vreader, long totalframes)
        {
            for (long n = 0; n < totalframes; n++)
            {
                using (var frame = vreader.ReadVideoFrame())
                {
                    Process(frame);
                    WriteToFile(frame);
                }
            }
        }

        /// <summary>
        /// Main entry point.
        /// </summary>
        private static void Main(string[] args)
        {
            // Arguments.
            long totalframes = 1000;
            var vreader = new VReader();

            // We'll time our run.
            var sw = Stopwatch.StartNew();

            // Try both for comparison.
            //RunSequential(vreader, totalframes);
            RunPipeline(vreader, totalframes);

            sw.Stop();

            Console.WriteLine("Elapsed ms: {0}.", sw.ElapsedMilliseconds);

            // Validation: count, order and contents.
            if (Range(1, totalframes).SequenceEqual(FrameFile))
            {
                Console.WriteLine("Frame count and order of frames in the file are CORRECT.");
            }
            else
            {
                Console.WriteLine("Frame count and order of frames in the file are INCORRECT.");
            }

            Console.ReadLine();
        }

        /// <summary>
        /// Simulate CPU work.
        /// </summary>
        private static void Process(Bitmap frame)
        {
            Thread.Sleep(10);
        }

        /// <summary>
        /// Simulate IO pressure.
        /// </summary>
        private static void WriteToFile(Bitmap frame)
        {
            Thread.Sleep(5);
            FrameFile.Add(frame.ID);
        }

        /// <summary>
        /// Naive implementation of Enumerable.Range(int, int) for long.
        /// </summary>
        private static IEnumerable<long> Range(long start, long count)
        {
            for (long i = start; i < start + count; i++)
            {
                yield return i;
            }
        }

        private class VReader
        {
            public Bitmap ReadVideoFrame()
            {
                return new Bitmap();
            }
        }

        private class Bitmap : IDisposable
        {
            private static int MaxID;
            public readonly long ID;

            public Bitmap()
            {
                this.ID = Interlocked.Increment(ref MaxID);
            }

            public void Dispose()
            {
                // Dummy method.
            }
        }
    }
}
于 2013-06-02T14:30:17.887 に答える
2

要素を並列System.Linq処理するには、 のような の並列メソッドを使用しParallelEnumerable.Range()ます。要素を順番に保つには、 を使用できます.AsOrdered()

ParallelEnumerable.Range(0, totalframes)
                  .AsOrdered()
                  .Select(x => vreader.ReadVideoFrame())
                  .Select(Process)
                  .Select(WriteToFile);
于 2013-06-02T18:08:23.683 に答える
0

おそらく 4 つのBackgroundWorkerです。データ自体に加えて、それぞれに 1 ~ 4 の数字を渡します。RunWorkerCompletedイベント ハンドラで、他の 3 つすべてが終了したかどうかを確認します... (そのためbool[4]に a を使用できます。)

RunWorkerCompleted私の知る限り、すべて同じスレッドで実行されるため、 2 が同時に呼び出されることを心配する必要はありません。

于 2013-06-02T12:23:24.727 に答える
0

このスレッドで尋ねたのと同様の問題がありました。

問題なく動作するように見える解決策を思いつきましたが、目的には複雑すぎるように思えるかもしれません。

3 つのデリゲートを提供できることを中心に展開します。1 つは作業項目を取得するため (この場合は を返しますBitmap)、1 つはその作業項目を処理するため、もう 1 つはその作業項目を出力するためです。また、実行される同時スレッドの最大数を指定することもできます。これを使用して、メモリ使用量を制限できます。以下のコンストラクターのnumTasksパラメーターを参照してください。ParallelBlockProcessor

処理デリゲートのみが複数のスレッドによって呼び出されます。

あなたと同じように、最終出力が元の入力と同じ順序で書き込まれるようにする必要がありました。これにはプライオリティ キューを使用しました。

.Net 4.5 の TPL を使用したより良い解決策があるかもしれませんが、私は .Net 4 に限定されていました。

これが私が思いついたコードです-あなたの問題に適応できると思います:

ParallelBlockProcessor クラス:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.Threading.Tasks;
using ConsoleApplication1;

namespace Demo
{
    public sealed class ParallelBlockProcessor<T> where T: class
    {
        public delegate T Read();            // Called by only one thread.
        public delegate T Process(T block);  // Called simultaneously by multiple threads.
        public delegate void Write(T block); // Called by only one thread.

        public ParallelBlockProcessor(Read read, Process process, Write write, int numTasks = 0)
        {
            Contract.Requires(read != null);
            Contract.Requires(process != null);
            Contract.Requires(write != null);
            Contract.Requires((0 <= numTasks) && (numTasks <= 64));

            _read    = read;
            _process = process;
            _write   = write;

            numTasks = (numTasks > 0) ? numTasks : Environment.ProcessorCount;

            _workPool   = new BlockingCollection<WorkItem>(numTasks*2);
            _inputQueue  = new BlockingCollection<WorkItem>(numTasks);
            _outputQueue = new ConcurrentPriorityQueue<int, T>();
            _processors  = new Task[numTasks];

            initWorkItems();
            startProcessors();
            Task.Factory.StartNew(enqueueBlocks);
            _dequeuer = Task.Factory.StartNew(dequeueBlocks);
        }

        private void startProcessors()
        {
            for (int i = 0; i < _processors.Length; ++i)
            {
                _processors[i] = Task.Factory.StartNew(processBlocks);
            }
        }

        private void initWorkItems()
        {
            for (int i = 0; i < _workPool.BoundedCapacity; ++i)
            {
                _workPool.Add(new WorkItem());
            }
        }

        private void enqueueBlocks()
        {
            int index = 0;

            while (true)
            {
                T data = _read();

                if (data == null)
                {
                    _inputQueue.CompleteAdding();
                    _outputQueue.Enqueue(index, null); // Special terminator WorkItem.
                    break;
                }

                WorkItem workItem = _workPool.Take();
                workItem.Data = data;
                workItem.Index = index++;

                _inputQueue.Add(workItem);
            }
        }

        private void dequeueBlocks()
        {
            int index = 0; // Next required index.
            int last = int.MaxValue;

            while (true)
            {
                KeyValuePair<int, T> workItem;
                _outputQueue.WaitForNewItem();   // There will always be at least one item - the sentinel item.

                while (_outputQueue.TryPeek(out workItem))
                {
                    if (workItem.Value == null) // The sentinel item has a null value to indicate that it's the sentinel.
                    {
                        last = workItem.Key;  // The sentinel's key is the index of the last block + 1.
                    }
                    else if (workItem.Key == index) // Is this block the next one that we want?
                    {
                        // Even if new items are added to the queue while we're here, the new items will be lower priority.
                        // Therefore it is safe to assume that the item we will dequeue now is the same one we peeked at.

                        _outputQueue.TryDequeue(out workItem);
                        Contract.Assume(workItem.Key == index);
                        _workPool.Add(new WorkItem()); // Free up a work pool item.     
                        _write(workItem.Value);
                        ++index;
                    }
                    else // If it's not the block we want, we know we'll get a new item at some point.
                    {
                        _outputQueue.WaitForNewItem();
                    }

                    if (index == last)
                    {
                        return;
                    }
                }
            }
        }

        private void processBlocks()
        {
            foreach (var block in _inputQueue.GetConsumingEnumerable())
            {
                var processedData = _process(block.Data);
                _outputQueue.Enqueue(block.Index, processedData);
            }
        }

        public bool WaitForFinished(int maxMillisecondsToWait) // Can be Timeout.Infinite.
        {
            return _dequeuer.Wait(maxMillisecondsToWait);
        }

        private sealed class WorkItem // Note: This is mutable.
        {
            public T   Data  { get; set; }
            public int Index { get; set; }
        }

        private readonly Task[] _processors;

        private readonly Task _dequeuer;

        private readonly BlockingCollection<WorkItem> _workPool;
        private readonly BlockingCollection<WorkItem> _inputQueue;
        private readonly ConcurrentPriorityQueue<int, T> _outputQueue;

        private readonly Read    _read;
        private readonly Process _process;
        private readonly Write   _write;
    }
}

プライオリティ キュー (Microsoft のものから適応):

using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Threading;

namespace ConsoleApplication1
{
    /// <summary>Provides a thread-safe priority queue data structure.</summary> 
    /// <typeparam name="TKey">Specifies the type of keys used to prioritize values.</typeparam> 
    /// <typeparam name="TValue">Specifies the type of elements in the queue.</typeparam> 

    [SuppressMessage("Microsoft.Naming", "CA1711:IdentifiersShouldNotHaveIncorrectSuffix")]
    [SuppressMessage("Microsoft.Naming", "CA1710:IdentifiersShouldHaveCorrectSuffix")]
    [DebuggerDisplay("Count={Count}")] 

    public sealed class ConcurrentPriorityQueue<TKey, TValue> : 
        IProducerConsumerCollection<KeyValuePair<TKey,TValue>>  
        where TKey : IComparable<TKey> 
    { 
        /// <summary>Initializes a new instance of the ConcurrentPriorityQueue class.</summary> 
        public ConcurrentPriorityQueue() {} 

        /// <summary>Initializes a new instance of the ConcurrentPriorityQueue class that contains elements copied from the specified collection.</summary> 
        /// <param name="collection">The collection whose elements are copied to the new ConcurrentPriorityQueue.</param> 

        [SuppressMessage("Microsoft.Design", "CA1006:DoNotNestGenericTypesInMemberSignatures")]

        public ConcurrentPriorityQueue(IEnumerable<KeyValuePair<TKey, TValue>> collection) 
        { 
            if (collection == null) throw new ArgumentNullException("collection"); 
            foreach (var item in collection) _minHeap.Insert(item); 
        } 

        /// <summary>Adds the key/value pair to the priority queue.</summary> 
        /// <param name="priority">The priority of the item to be added.</param> 
        /// <param name="value">The item to be added.</param> 
        public void Enqueue(TKey priority, TValue value) 
        { 
            Enqueue(new KeyValuePair<TKey, TValue>(priority, value)); 
        } 

        /// <summary>Adds the key/value pair to the priority queue.</summary> 
        /// <param name="item">The key/value pair to be added to the queue.</param> 
        public void Enqueue(KeyValuePair<TKey, TValue> item) 
        {
            lock (_syncLock)
            {
                _minHeap.Insert(item);
                _newItem.Set();
            }
        }

        /// <summary>Waits for a new item to appear.</summary>
        public void WaitForNewItem()
        {
            _newItem.WaitOne();
        }

        /// <summary>Attempts to remove and return the next prioritized item in the queue.</summary> 
        /// <param name="result"> 
        /// When this method returns, if the operation was successful, result contains the object removed. If 
        /// no object was available to be removed, the value is unspecified. 
        /// </param> 
        /// <returns> 
        /// true if an element was removed and returned from the queue succesfully; otherwise, false. 
        /// </returns> 
        public bool TryDequeue(out KeyValuePair<TKey, TValue> result) 
        { 
            result = default(KeyValuePair<TKey, TValue>); 
            lock (_syncLock) 
            { 
                if (_minHeap.Count > 0) 
                { 
                    result = _minHeap.Remove(); 
                    return true; 
                } 
            } 
            return false; 
        } 

        /// <summary>Attempts to return the next prioritized item in the queue.</summary> 
        /// <param name="result"> 
        /// When this method returns, if the operation was successful, result contains the object. 
        /// The queue was not modified by the operation. 
        /// </param> 
        /// <returns> 
        /// true if an element was returned from the queue succesfully; otherwise, false. 
        /// </returns> 
        public bool TryPeek(out KeyValuePair<TKey, TValue> result) 
        { 
            result = default(KeyValuePair<TKey, TValue>); 
            lock (_syncLock) 
            { 
                if (_minHeap.Count > 0) 
                { 
                    result = _minHeap.Peek(); 
                    return true; 
                } 
            } 
            return false; 
        } 

        /// <summary>Empties the queue.</summary> 
        public void Clear() { lock(_syncLock) _minHeap.Clear(); } 

        /// <summary>Gets whether the queue is empty.</summary> 
        public bool IsEmpty { get { return Count == 0; } } 

        /// <summary>Gets the number of elements contained in the queue.</summary> 
        public int Count 
        { 
            get { lock (_syncLock) return _minHeap.Count; } 
        } 

        /// <summary>Copies the elements of the collection to an array, starting at a particular array index.</summary> 
        /// <param name="array"> 
        /// The one-dimensional array that is the destination of the elements copied from the queue. 
        /// </param> 
        /// <param name="index"> 
        /// The zero-based index in array at which copying begins. 
        /// </param> 
        /// <remarks>The elements will not be copied to the array in any guaranteed order.</remarks> 
        public void CopyTo(KeyValuePair<TKey, TValue>[] array, int index) 
        { 
            lock (_syncLock) _minHeap.Items.CopyTo(array, index); 
        } 

        /// <summary>Copies the elements stored in the queue to a new array.</summary> 
        /// <returns>A new array containing a snapshot of elements copied from the queue.</returns> 
        public KeyValuePair<TKey, TValue>[] ToArray() 
        { 
            lock (_syncLock) 
            { 
                var clonedHeap = new MinBinaryHeap(_minHeap); 
                var result = new KeyValuePair<TKey, TValue>[_minHeap.Count]; 
                for (int i = 0; i < result.Length; i++) 
                { 
                    result[i] = clonedHeap.Remove(); 
                } 
                return result; 
            } 
        } 

        /// <summary>Attempts to add an item in the queue.</summary> 
        /// <param name="item">The key/value pair to be added.</param> 
        /// <returns> 
        /// true if the pair was added; otherwise, false. 
        /// </returns> 
        bool IProducerConsumerCollection<KeyValuePair<TKey, TValue>>.TryAdd(KeyValuePair<TKey, TValue> item) 
        { 
            Enqueue(item); 
            return true; 
        } 

        /// <summary>Attempts to remove and return the next prioritized item in the queue.</summary> 
        /// <param name="item"> 
        /// When this method returns, if the operation was successful, result contains the object removed. If 
        /// no object was available to be removed, the value is unspecified. 
        /// </param> 
        /// <returns> 
        /// true if an element was removed and returned from the queue succesfully; otherwise, false. 
        /// </returns> 
        bool IProducerConsumerCollection<KeyValuePair<TKey, TValue>>.TryTake(out KeyValuePair<TKey, TValue> item) 
        { 
            return TryDequeue(out item); 
        } 

        /// <summary>Returns an enumerator that iterates through the collection.</summary> 
        /// <returns>An enumerator for the contents of the queue.</returns> 
        /// <remarks> 
        /// The enumeration represents a moment-in-time snapshot of the contents of the queue. It does not 
        /// reflect any updates to the collection after GetEnumerator was called. The enumerator is safe to 
        /// use concurrently with reads from and writes to the queue. 
        /// </remarks> 
        public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator() 
        { 
            var arr = ToArray(); 
            return ((IEnumerable<KeyValuePair<TKey, TValue>>)arr).GetEnumerator(); 
        } 

        /// <summary>Returns an enumerator that iterates through a collection.</summary> 
        /// <returns>An IEnumerator that can be used to iterate through the collection.</returns> 
        IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); } 

        /// <summary>Copies the elements of the collection to an array, starting at a particular array index.</summary> 
        /// <param name="array"> 
        /// The one-dimensional array that is the destination of the elements copied from the queue. 
        /// </param> 
        /// <param name="index"> 
        /// The zero-based index in array at which copying begins. 
        /// </param> 
        void ICollection.CopyTo(Array array, int index) 
        { 
            lock (_syncLock) ((ICollection)_minHeap.Items).CopyTo(array, index); 
        } 

        /// <summary> 
        /// Gets a value indicating whether access to the ICollection is synchronized with the SyncRoot. 
        /// </summary> 
        bool ICollection.IsSynchronized { get { return true; } } 

        /// <summary> 
        /// Gets an object that can be used to synchronize access to the collection. 
        /// </summary> 
        object ICollection.SyncRoot { get { return _syncLock; } } 

        /// <summary>Implements a binary heap that prioritizes smaller values.</summary> 
        private sealed class MinBinaryHeap 
        { 
            private readonly List<KeyValuePair<TKey, TValue>> _items; 

            /// <summary>Initializes an empty heap.</summary> 
            public MinBinaryHeap() 
            { 
                _items = new List<KeyValuePair<TKey, TValue>>(); 
            } 

            /// <summary>Initializes a heap as a copy of another heap instance.</summary> 
            /// <param name="heapToCopy">The heap to copy.</param> 
            /// <remarks>Key/Value values are not deep cloned.</remarks> 
            public MinBinaryHeap(MinBinaryHeap heapToCopy) 
            { 
                _items = new List<KeyValuePair<TKey, TValue>>(heapToCopy.Items); 
            } 

            /// <summary>Empties the heap.</summary> 
            public void Clear() { _items.Clear(); } 

            /// <summary>Adds an item to the heap.</summary> 
            public void Insert(KeyValuePair<TKey,TValue> entry) 
            { 
                // Add the item to the list, making sure to keep track of where it was added. 
                _items.Add(entry); 
                int pos = _items.Count - 1; 

                // If the new item is the only item, we're done. 
                if (pos == 0) return; 

                // Otherwise, perform log(n) operations, walking up the tree, swapping 
                // where necessary based on key values 
                while (pos > 0) 
                { 
                    // Get the next position to check 
                    int nextPos = (pos-1) / 2; 

                    // Extract the entry at the next position 
                    var toCheck = _items[nextPos]; 

                    // Compare that entry to our new one.  If our entry has a smaller key, move it up. 
                    // Otherwise, we're done. 
                    if (entry.Key.CompareTo(toCheck.Key) < 0) 
                    { 
                        _items[pos] = toCheck; 
                        pos = nextPos; 
                    } 
                    else break; 
                } 

                // Make sure we put this entry back in, just in case 
                _items[pos] = entry; 
            } 

            /// <summary>Returns the entry at the top of the heap.</summary> 
            public KeyValuePair<TKey, TValue> Peek() 
            { 
                // Returns the first item 
                if (_items.Count == 0) throw new InvalidOperationException("The heap is empty."); 
                return _items[0]; 
            } 

            /// <summary>Removes the entry at the top of the heap.</summary> 
            public KeyValuePair<TKey, TValue> Remove() 
            { 
                // Get the first item and save it for later (this is what will be returned). 
                if (_items.Count == 0) throw new InvalidOperationException("The heap is empty."); 
                KeyValuePair<TKey, TValue> toReturn = _items[0]; 

                // Remove the first item if there will only be 0 or 1 items left after doing so.   
                if (_items.Count <= 2) _items.RemoveAt(0); 
                // A reheapify will be required for the removal 
                else 
                { 
                    // Remove the first item and move the last item to the front. 
                    _items[0] = _items[_items.Count - 1]; 
                    _items.RemoveAt(_items.Count - 1); 

                    // Start reheapify 
                    int current = 0, possibleSwap = 0; 

                    // Keep going until the tree is a heap 
                    while (true) 
                    { 
                        // Get the positions of the node's children 
                        int leftChildPos = 2 * current + 1; 
                        int rightChildPos = leftChildPos + 1; 

                        // Should we swap with the left child? 
                        if (leftChildPos < _items.Count) 
                        { 
                            // Get the two entries to compare (node and its left child) 
                            var entry1 = _items[current]; 
                            var entry2 = _items[leftChildPos]; 

                            // If the child has a lower key than the parent, set that as a possible swap 
                            if (entry2.Key.CompareTo(entry1.Key) < 0) possibleSwap = leftChildPos; 
                        } 
                        else break; // if can't swap this, we're done 

                        // Should we swap with the right child?  Note that now we check with the possible swap 
                        // position (which might be current and might be left child). 
                        if (rightChildPos < _items.Count) 
                        { 
                            // Get the two entries to compare (node and its left child) 
                            var entry1 = _items[possibleSwap]; 
                            var entry2 = _items[rightChildPos]; 

                            // If the child has a lower key than the parent, set that as a possible swap 
                            if (entry2.Key.CompareTo(entry1.Key) < 0) possibleSwap = rightChildPos; 
                        } 

                        // Now swap current and possible swap if necessary 
                        if (current != possibleSwap) 
                        { 
                            var temp = _items[current]; 
                            _items[current] = _items[possibleSwap]; 
                            _items[possibleSwap] = temp; 
                        } 
                        else break; // if nothing to swap, we're done 

                        // Update current to the location of the swap 
                        current = possibleSwap; 
                    } 
                } 

                // Return the item from the heap 
                return toReturn; 
            } 

            /// <summary>Gets the number of objects stored in the heap.</summary> 
            public int Count { get { return _items.Count; } } 

            internal List<KeyValuePair<TKey, TValue>> Items { get { return _items; } } 
        }

        private readonly AutoResetEvent _newItem = new AutoResetEvent(false);
        private readonly object _syncLock = new object();
        private readonly MinBinaryHeap _minHeap = new MinBinaryHeap();
    } 
}

テストプログラム:

using System;
using System.Diagnostics;
using System.Threading;

namespace Demo
{
    public static class Program
    {
        private static void Main(string[] args)
        {
            _rng = new Random(34324);

            int threadCount = 8;
            int maxBlocks = 200;
            ThreadPool.SetMinThreads(threadCount + 2, 4);  // Kludge!

            var stopwatch = new Stopwatch();

            _numBlocks = maxBlocks;
            stopwatch.Restart();
            var processor = new ParallelBlockProcessor<byte[]>(read, process, write, threadCount);
            processor.WaitForFinished(Timeout.Infinite);

            Console.WriteLine("\n\nFinished in " + stopwatch.Elapsed + "\n\n");
        }

        private static byte[] read()
        {
            if (_numBlocks-- == 0)
            {
                return null;
            }

            var result = new byte[128];
            result[0] = (byte)_numBlocks;
            Console.WriteLine("Supplied input: " + _numBlocks);
            return result;
        }

        private static byte[] process(byte[] data)
        {
            if (data[0] == 190)/*!*/
            {
                Thread.Sleep(5000);
            }

            Thread.Sleep(10 + _rng.Next(50));
            Console.WriteLine("Processed: " + data[0]);
            return data;
        }

        private static void write(byte[] data)
        {
            Console.WriteLine("Received output: " + data[0]);
        }

        private static Random _rng;
        private static int _numBlocks;
    }
}
于 2013-06-02T12:29:36.740 に答える