6

(注:.Net4.5ではなく.Net4を使用しているため、TPLのDataflowBlockクラスを使用できません。)

TL;DRバージョン

最終的には、無制限の出力バッファーを必要とせずに、最終出力での順序を維持する方法で、複数のスレッドを使用して順次作業項目を処理する方法を探しています。

動機

データの複数のブロックを処理するためのマルチスレッドメカニズムを提供する既存のコードがあります。1つのI/Oバウンドスレッド(「サプライヤー」)がデータのブロックを処理のためにキューに入れる責任があります。これらのデータブロックは、作業項目を構成します。

1つ以上のスレッド(「プロセッサー」)は、一度に1つの作業項目をデキューする役割を果たし、次の作業項目をデキューする前に、処理してから処理済みデータを出力キューに書き込みます。

最終的なI/Oバウンドスレッド(「コンシューマー」)は、完了した作業項目を出力キューからデキューし、それらを最終的な宛先に書き込む役割を果たします。これらの作業項目は、キューに入れられたのと同じ順序で書き込まれます(また、書き込まれる必要があります)。これは、各アイテムの優先度がソースインデックスによって定義される同時優先度キューを使用して実装しました。

私はこのスキームを使用して、大規模なデータストリームでカスタム圧縮を実行しています。この場合、圧縮自体は比較的低速ですが、非圧縮データの読み取りと圧縮データの書き込みは比較的高速です(I / Oバウンドではありますが)。

私は64Kのオーダーのかなり大きなチャンクでデータを処理するので、パイプラインのオーバーヘッドは比較的小さいです。

私の現在のソリューションはうまく機能していますが、6年前に多くの同期イベントを使用して記述された多くのカスタムコードが含まれており、設計はやや不格好なようです。したがって、私は、より近代的な.Netライブラリを使用して書き直すことができるかどうかを確認するために、学術的な演習に着手しました。

新しいデザイン

私の新しいデザインはクラスを使用しており、このMicrosoftの記事BlockingCollection<>にある程度基づいています。

特に、「複数のプロデューサーを使用した負荷分散」というタイトルのセクションを参照してください。私はそのアプローチを使用しようとしました。したがって、それぞれが共有入力BlockingCollectionから作業項目を取得し、完了した項目を独自のBlockingCollection出力キューに書き込むいくつかの処理タスクがあります。

各処理タスクには独自の出力キューがあるためBlockingCollection.TakeFromAny()、最初に使用可能な完了した作業項目をデキューするために使用しようとしています。

マルチプレクサの問題

これまでのところ良好ですが、ここで問題が発生します。Microsoftの記事は次のように述べています。

ギャップが問題です。パイプラインの次のステージである画像の表示ステージでは、画像を順番に、シーケンスにギャップなしで表示する必要があります。これがマルチプレクサの出番です。TakeFromAnyメソッドを使用して、マルチプレクサは両方のフィルタステージプロデューサキューからの入力を待機します。画像が到着すると、マルチプレクサは画像のシーケンス番号が予想されるシーケンスの次の番号であるかどうかを確認します。そうである場合、マルチプレクサはそれをDisplayImageステージに渡します。イメージがシーケンスの次のイメージでない場合、マルチプレクサは値を内部先読みバッファに保持し、先読み値を持たない入力キューに対してテイク操作を繰り返します。このアルゴリズムにより、マルチプレクサは、値をソートせずに順番を保証する方法で、着信プロデューサキューからの入力をまとめることができます。

さて、何が起こるかというと、処理タスクはほぼすべての順序で完成品を生産できるということです。マルチプレクサは、これらの項目を正しい順序で出力する役割を果たします。

でも...

処理するアイテムが1000個あると想像してください。さらに、奇妙な理由で、最初のアイテムが他のすべてのアイテムを組み合わせた処理に時間がかかることを想像してみてください。

私の現在のスキームを使用すると、マルチプレクサは、出力することになっている次のキューを見つけるまで、すべての処理出力キューからアイテムを読み取り、バッファリングし続けます。待機しているアイテムは(上記の「想像する」によると)他のすべての作業アイテムが処理された後にのみ表示されるため、入力全体のすべての作業アイテムを効果的にバッファリングします。

データ量が多すぎるため、これを実行できません。出力キューが特定の最大サイズに達したとき(つまり、制限された出力キュー)に、処理タスクが完了した作業項目を出力しないようにする必要があります。ただし、作業項目がマルチプレクサが待機しているものである場合を除きます。

そして、それは私が少し立ち往生しているところです。これを実際に実装する方法はたくさん考えられますが、それらはすべて、私が置き換えようと考えているコードよりも優れていないほど複雑すぎるようです。

私の質問は何ですか?

私の質問は:私はこれについて正しい方法で進んでいますか?

これはよく理解されている問題だと思いましたが、私の調査では、他のすべての作業項目と比較して、作業項目に非常に長い時間がかかる場合に発生する無制限のバッファリングの問題を無視しているように見える記事しか見つかりませんでした。

これを達成するための合理的な方法を説明している記事を誰かに教えてもらえますか?

TL;DRバージョン

最終的には、無制限の出力バッファーを必要とせずに、最終出力での順序を維持する方法で、複数のスレッドを使用して順次作業項目を処理する方法を探しています。

4

4 に答える 4

2

起動時にアイテムのプールを作成します。たとえば、1000 です。それらを BlockingCollection (「プール キュー」) に格納します。

サプライヤは、プール キューからアイテムを取得し、ファイルからロードし、シーケンス番号などをロードして、プロセッサのスレッドプールに送信します。

プロセッサは処理を行い、出力をマルチプレクサに送信します。マルチプレクサは、以前のアイテムが処理されるまで、順序が乱れたアイテムを格納する役割を果たします。

マルチプレクサの出力先によって項目が完全に消費されると、サプライヤによる再利用のためにプール キューに返されます。

1 つの「遅いアイテム」が膨大な量の処理を必要とする場合、「クイック アイテム」が他のプール スレッドを通過するにつれて、マルチプレクサ内の順不同のコレクションが増加しますが、マルチプレクサは実際にはアイテムをフィードしていないためです。その出力、プール キューは補充されていません。

プールが空になると、サプライヤーはそれをブロックし、それ以上アイテムを供給できなくなります。

処理プール入力に残っている「クイック アイテム」が処理され、「スロー アイテム」を除いて処理が停止します。サプライヤはブロックされています。マルチプレクサのコレクションには [poolSize-1] 項目があります。余分なメモリは使用されず、CPU が浪費されることもありません。発生するのは「遅いアイテム」の処理だけです。

「遅い項目」が最終的に完了すると、マルチプレクサに出力されます。

マルチプレクサは、すべての [poolSize] 項目を必要な順序で出力できるようになりました。これらのアイテムが消費されると、プールが再びいっぱいになり、プールからアイテムを取得できるようになったサプライヤが実行され、ファイルを再度読み取り、アイテムをプロセッサ プールのキューに入れます。

自動調整、バウンド バッファーは不要、メモリの暴走はありません。

編集:「境界のあるバッファは必要ありません」という意味でした:)

また、GC ホールドアップはありません。アイテムは再利用されるため、GC を実行する必要はありません。

于 2013-02-22T12:55:59.740 に答える
1

ファローアップ

完全を期すために、ここに私が巻き上げたコードを示します。ソリューションの基礎を提供してくれた Martin James の回答に感謝します。

マルチプレクサにはまだ完全には満足していません (「参考文献」を参照ParallelWorkProcessor.multiplex())。動作しますが、少しぎこちないようです。

作業プールに関する Martin James のアイデアを使用して、マルチプレクサ バッファーの無制限の成長を防ぎましたが、作業プール キューの代わりに SemaphoreSlim を使用しました (同じ機能を提供しますが、使用するのが少し簡単で、使用するリソースが少ないため)。

ワーカー タスクは、完了した項目を同時優先キューに書き込みます。これにより、次に出力するアイテムを簡単かつ効率的に見つけることができます。

Microsoft のサンプル同時優先キューを使用し、新しいアイテムがキューに入れられるたびに通知される自動リセット イベントを提供するように変更しました。

これが ParallelWorkProcessor クラスです。これを使用するには、3 つのデリゲートを提供します。1 つは作業項目を提供し、1 つは作業項目を処理し、もう 1 つは完了した作業項目を出力します。

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    public sealed class ParallelWorkProcessor<T> where T: class // T is the work item type.
    {
        public delegate T    Read();           // Called by only one thread.
        public delegate T    Process(T block); // Called simultaneously by multiple threads.
        public delegate void Write(T block);   // Called by only one thread.

        public ParallelWorkProcessor(Read read, Process process, Write write, int numWorkers = 0)
        {
            _read    = read;
            _process = process;
            _write   = write;

            numWorkers = (numWorkers > 0) ? numWorkers : Environment.ProcessorCount;

            _workPool    = new SemaphoreSlim(numWorkers*2);
            _inputQueue  = new BlockingCollection<WorkItem>(numWorkers);
            _outputQueue = new ConcurrentPriorityQueue<int, T>();
            _workers     = new Task[numWorkers];

            startWorkers();
            Task.Factory.StartNew(enqueueWorkItems);
            _multiplexor = Task.Factory.StartNew(multiplex);
        }

        private void startWorkers()
        {
            for (int i = 0; i < _workers.Length; ++i)
            {
                _workers[i] = Task.Factory.StartNew(processBlocks);
            }
        }

        private void enqueueWorkItems()
        {
            int index = 0;

            while (true)
            {
                T data = _read();

                if (data == null) // Signals end of input.
                {
                    _inputQueue.CompleteAdding();
                    _outputQueue.Enqueue(index, null); // Special sentinel WorkItem .
                    break;
                }

                _workPool.Wait();
                _inputQueue.Add(new WorkItem(data, index++));
            }
        }

        private void multiplex()
        {
            int index = 0; // Next required index.
            int last = int.MaxValue;

            while (index != last)
            {
                KeyValuePair<int, T> workItem;
                _outputQueue.WaitForNewItem(); // There will always be at least one item - the sentinel item.

                while ((index != last) && _outputQueue.TryPeek(out workItem))
                {
                    if (workItem.Value == null) // The sentinel item has a null value to indicate that it's the sentinel.
                    {
                        last = workItem.Key;  // The sentinel's key is the index of the last block + 1.
                    }
                    else if (workItem.Key == index) // Is this block the next one that we want?
                    {
                        // Even if new items are added to the queue while we're here, the new items will be lower priority.
                        // Therefore it is safe to assume that the item we will dequeue now is the same one we peeked at.

                        _outputQueue.TryDequeue(out workItem);
                        Contract.Assume(workItem.Key == index); // This *must* be the case.
                        _workPool.Release();                    // Allow the enqueuer to queue another work item.
                        _write(workItem.Value);
                        ++index;
                    }
                    else // If it's not the block we want, we know we'll get a new item at some point.
                    {
                        _outputQueue.WaitForNewItem();
                    }
                }
            }
        }

        private void processBlocks()
        {
            foreach (var block in _inputQueue.GetConsumingEnumerable())
            {
                var processedData = _process(block.Data);
                _outputQueue.Enqueue(block.Index, processedData);
            }
        }

        public bool WaitForFinished(int maxMillisecondsToWait) // Can be Timeout.Infinite.
        {
            return _multiplexor.Wait(maxMillisecondsToWait);
        }

        private sealed class WorkItem
        {
            public WorkItem(T data, int index)
            {
                Data  = data;
                Index = index;
            }

            public T   Data  { get; private set; }
            public int Index { get; private set; }
        }

        private readonly Task[] _workers;
        private readonly Task _multiplexor;
        private readonly SemaphoreSlim _workPool;
        private readonly BlockingCollection<WorkItem> _inputQueue;
        private readonly ConcurrentPriorityQueue<int, T> _outputQueue;
        private readonly Read    _read;
        private readonly Process _process;
        private readonly Write   _write;
    }
}

そして、ここに私のテストコードがあります:

using System;
using System.Diagnostics;
using System.Threading;

namespace Demo
{
    public static class Program
    {
        private static void Main(string[] args)
        {
            _rng = new Random(34324);

            int threadCount = 8;
            _maxBlocks = 200;
            ThreadPool.SetMinThreads(threadCount + 2, 4); // Kludge to prevent slow thread startup.

            var stopwatch = new Stopwatch();

            _numBlocks = _maxBlocks;
            stopwatch.Restart();
            var processor = new ParallelWorkProcessor<byte[]>(read, process, write, threadCount);
            processor.WaitForFinished(Timeout.Infinite);

            Console.WriteLine("\n\nFinished in " + stopwatch.Elapsed + "\n\n");
        }

        private static byte[] read()
        {
            if (_numBlocks-- == 0)
            {
                return null;
            }

            var result = new byte[128];
            result[0] = (byte)(_maxBlocks-_numBlocks);
            Console.WriteLine("Supplied input: " + result[0]);
            return result;
        }

        private static byte[] process(byte[] data)
        {
            if (data[0] == 10) // Hack for test purposes. Make it REALLY slow for this item!
            {
                Console.WriteLine("Delaying a call to process() for 5s for ID 10");
                Thread.Sleep(5000);
            }

            Thread.Sleep(10 + _rng.Next(50));
            Console.WriteLine("Processed: " + data[0]);
            return data;
        }

        private static void write(byte[] data)
        {
            Console.WriteLine("Received output: " + data[0]);
        }

        private static Random _rng;
        private static int _numBlocks;
        private static int _maxBlocks;
    }
}
于 2013-02-24T21:24:36.183 に答える
1

手動のプロデューサー/コンシューマー バッファリングを使用する代わりに、.AsParallel().AsOrdered()PLINQ の代替手段を使用することを検討しましたか? 意味的には、これはまさにあなたが望むものです - 並行して処理され、出力で順序付けられたアイテムのシーケンスです。あなたのコードは次のように単純に見えるかもしれません...

var orderedOutput = 
    ReadSequentialBlocks()
    .AsParallel()
    .AsOrdered()
    .Select(ProcessBlock)
foreach(var item in orderedOutput)
    Sink(item);

デフォルトの並列度はマシンのプロセッサー数ですが、調整することができます。自動出力バッファがあります。デフォルトのバッファリングが消費するリソースが多すぎる場合は、オフにすることができます。

.WithMergeOptions(ParallelMergeOptions.NotBuffered)

ただし、私は確かに最初にプレーンな飾りのないバージョンを試してみたいと思います. 最後に、単純な自動多重化が必要であるが、ゼロより大きいが自動化されていないバッファーが必要な場合は、常に PLINQ クエリを使用して、BlockingCollection<>別のスレッドで消費する列挙型で読み取られる固定サイズを埋めることができます。

于 2013-02-22T13:38:21.287 に答える
1

あなたは記事を誤解していると思います。説明によると、無制限のバッファはありません。各キューのルックアリード バッファには最大で 1 つの値があります。次の値ではない値をデキューする場合は、それを保存してから、バッファに値がないキューでのみ待機します。(複数の入力バッファがある場合、ロジックはより複雑になるか、2 つのキュー マルチプレクサのツリーが必要になります。)

これを制限された容量を指定した s と組み合わせるとBlockingCollection、希望どおりの動作が得られます。1 つのプロデューサーが遅すぎる場合、遅いスレッドが追いつくまで他のプロデューサーは一時停止します。

于 2013-02-22T12:44:29.063 に答える