30

私は (率直に言って素晴らしい)BlockingCollection<T>型を、高度にマルチスレッド化された高性能アプリに利用しています。

コレクション全体で多くのスループットがあり、ミクロレベルでは非常に高性能です。ただし、「バッチ」ごとに、キャンセルトークンにフラグを立てることで常に終了します。これにより、待機中のTake呼び出しで例外がスローされます。それは問題ありませんが、a) 例外には明らかなオーバーヘッドがあり、b) デバッグ時に、その特定の例外のブレークを手動でオフにしたくないため、戻り値または出力パラメーターでそれを通知することにしました。例外。

実装は複雑なようで、理論的には、例外を使用しない独自のバージョンを逆アセンブルして再作成できると思いますが、おそらくそれほど複雑ではない方法がありますか?

nullコレクションにオブジェクト (そうでない場合はプレースホルダー) を追加して、プロセスが終了する必要があることを示すこともできますが、適切に中止する手段も必要です。

では - 代替のコレクション型ですか? 私自身を再作成しますか?これを悪用する方法はありますか?

(いくつかのコンテキスト: を使用BlockingCollection<T>して手動でロックするよりも利点があるため、使用しましたQueue。スレッドプリミティブの使用が優れていることがわかります。私の場合、あちこちで数ミリ秒と最適なコアを使用することが重要です。 )

編集:私はちょうどこれのために報奨金を開きました. アナスタシアスヤルの答えが、私のコメントで提起した質問をカバーしているとは思いません。私はこれが難しい問題であることを知っています。誰でも支援できますか?

4

5 に答える 5

9

すでに行っていると思いますが、BlockingCollection の反映されたソースを調べると、残念ながら、CancellationToken が BlockingCollection に渡されてキャンセルされると、下の画像に示すように OperationCancelledException が発生するように見えます (いくつかの画像の後の回避策)

GetConsumingEnumerableTryTakeWithNoTimeValidationこの例外を発生させる BlockingCollection で 呼び出します。

ここに画像の説明を入力

回避策 #1

可能性のある戦略の 1 つは、キャンセル トークンを BlockingCollection に渡す (この例外が発生します) のではなく、プロデューサーとコンシューマーをより細かく制御できると仮定して、キャンセル トークンをプロデューサーとコンシューマーに渡すことです。

プロデューサーが生成しておらず、コンシューマーが消費していない場合は、この例外を発生させず、BlockingCollection で CancellationToken.None を渡すことによって操作を効果的にキャンセルしたことになります。

特殊なケースBlockingCollection が BoundedCapacity または Empty の場合のキャンセル

プロデューサーがブロックされました: BlockingCollection の BoundedCapacity に達すると、プロデューサー スレッドがブロックされます。したがって、キャンセルを試みて、BlockingCollection が BoundedCapacity にある場合 (つまり、コンシューマーはブロックされませんが、プロデューサーはキューにアイテムを追加できないためブロックされます)、追加のアイテムを消費できるようにする必要があります (1 つ)。これにより、プロデューサーのブロックが解除され (blockingCollection への追加時にブロックされるため)、キャンセル ロジックがプロデューサー側で開始されます。

ブロックされたコンシューマー: キューが空であるためにコンシューマーがブロックされている場合、Blocking コレクションに空の作業単位 (コンシューマー スレッドごとに 1 つ) を挿入して、コンシューマー スレッドのブロックを解除し、キャンセル ロジックが開始されるようにすることができます。消費者側。

キューにアイテムがあり、BoundedCapacity や Empty などの制限に達していない場合、プロデューサー スレッドとコンシューマー スレッドはブロックされません。

回避策 2

キャンセル作業単位の使用。

アプリケーションをキャンセルする必要がある場合、プロデューサー (おそらく 1 つのプロデューサーだけで十分ですが、他のプロデューサーは生産をキャンセルするだけです) はキャンセル作業単位を生成します (言及したように null になるか、マーカー インターフェイスを実装するクラスになる可能性があります)。コンシューマがこの作業ユニットを消費し、それが実際にはキャンセル作業ユニットであることを検出すると、キャンセル ロジックが作動します。生成されるキャンセル作業ユニットの数は、コンシューマ スレッドの数と同じである必要があります。

繰り返しになりますが、BoundedCapacity に近づくと、一部のプロデューサーがブロックされている兆候である可能性があるため、注意が必要です。プロデューサー/コンシューマーの数に応じて、すべてのプロデューサー (1 つを除く) がシャットダウンするまでコンシューマーを消費させることができます。これにより、残存するプロデューサーが存在しないことが保証されます。プロデューサーが 1 つしか残っていない場合、最後のコンシューマーはシャットダウンでき、プロデューサーはキャンセルされた作業単位の生成を停止できます。

于 2012-01-26T11:27:27.443 に答える
1

最後のアイテムにフラグを設定することで、バッチの終了を通知します(IsLastItemブールプロパティを追加するか、ラップします)。または、最後のアイテムとしてnullを送信することもできます(nullがblockingcollectionを正しく通過するかどうかはわかりません)。

'batch'の概念の必要性を取り除くことができれば、blockingcollectionから新しいデータを継続的にTake()して処理するための追加のスレッドを作成し、他に何もすることができません。

于 2012-06-19T15:26:28.610 に答える
1

少し前に行った BlockingQueue はどうですか?

http://apichange.codeplex.com/SourceControl/changeset/view/76c98b8c7311#ApiChange.Api%2fsrc%2fInfrastructure%2fBlockingQueue.cs

例外なくうまくいくはずです。現在のキューは、破棄時にイベントを単純に閉じますが、これは望ましくない可能性があります。null をエンキューして、すべてのアイテムが処理されるまで待ちたい場合があります。これとは別に、それはあなたのニーズに合うはずです。

using System.Collections.Generic;
using System.Collections;
using System.Threading;
using System;

namespace ApiChange.Infrastructure
{

    /// <summary>
    /// A blocking queue which supports end markers to signal that no more work is left by inserting
    /// a null reference. This constrains the queue to reference types only. 
    /// </summary>
    /// <typeparam name="T"></typeparam>
    public class BlockingQueue<T> : IEnumerable<T>, IEnumerable, IDisposable where T : class
    {
        /// <summary>
        /// The queue used to store the elements
        /// </summary>
        private Queue<T> myQueue = new Queue<T>();
        bool myAllItemsProcessed = false;
        ManualResetEvent myEmptyEvent = new ManualResetEvent(false);

        /// <summary>
        /// Deques an element from the queue and returns it.
        /// If the queue is empty the thread will block. If the queue is stopped it will immedieately
        /// return with null.
        /// </summary>
        /// <returns>An object of type T</returns>      
        public T Dequeue()
        {
            if (myAllItemsProcessed)
                return null;

            lock (myQueue)
            {
                while (myQueue.Count == 0) 
                {
                    if(!Monitor.Wait(myQueue, 45))
                    {
                        // dispatch any work which is not done yet
                        if( myQueue.Count > 0 )
                            continue;
                    }

                    // finito 
                    if (myAllItemsProcessed)
                    {
                        return null;
                    }
                }

                T result = myQueue.Dequeue();
                if (result == null)
                {
                    myAllItemsProcessed = true;
                    myEmptyEvent.Set();
                }
                return result;
            }
        }

        /// <summary>
        /// Releases the waiters by enqueuing a null reference which causes all waiters to be released. 
        /// The will then get a null reference as queued element to signal that they should terminate.
        /// </summary>
        public void ReleaseWaiters()
        {
            Enqueue(null);
        }

        /// <summary>
        /// Waits the until empty. This does not mean that all items are already process. Only that
        /// the queue contains no more pending work. 
        /// </summary>
        public void WaitUntilEmpty()
        {
            myEmptyEvent.WaitOne();
        }

        /// <summary>
        /// Adds an element of type T to the queue. 
        /// The consumer thread is notified (if waiting)
        /// </summary>
        /// <param name="data_in">An object of type T</param>      
        public void Enqueue(T data_in)
        {
            lock (myQueue)
            {
                myQueue.Enqueue(data_in);
                Monitor.PulseAll(myQueue);
            }
        }

        /// <summary>
        /// Returns an IEnumerator of Type T for this queue
        /// </summary>
        /// <returns></returns>    
        IEnumerator<T> IEnumerable<T>.GetEnumerator()
        {
            while (true)
            {
                T item = Dequeue();
                if (item == null)
                    break;
                else
                    yield return item;
            }
        }

        /// <summary>
        /// Returns a untyped IEnumerator for this queue
        /// </summary>
        /// <returns></returns>     
        IEnumerator IEnumerable.GetEnumerator()
        {
            return ((IEnumerable<T>)this).GetEnumerator();
        }


        #region IDisposable Members

        /// <summary>
        /// Closes the EmptyEvent WaitHandle.
        /// </summary>
        public void Dispose()
        {
            myEmptyEvent.Close();
        }

        #endregion
    }
}
于 2012-06-16T10:52:29.073 に答える
0

キーレン、

私の調査から、私は個人的に、あなたが望んでいたことを正確に行う ProducerConsumer パターンのスレッドセーフなタイプを知りません。これを競争力のあるソリューションとは主張しませんが、 default の代わりに任意の組み込み型またはカスタム型を自由に提供できるように、BlockingCollection<T>少数で装飾することを提案します。extension methodCancellationToken

ステージ 1:

以下は、underlingTryAddWithNoTimeValidationメソッドを使用してキューに追加するデフォルト メソッドのリストです。

public void Add(T item){
      this.TryAddWithNoTimeValidation(item, -1, new CancellationToken());
}

public void Add(T item, CancellationToken cancellationToken){
      this.TryAddWithNoTimeValidation(item, -1, cancellationToken);
    }

public bool TryAdd(T item){
      return this.TryAddWithNoTimeValidation(item, 0, new CancellationToken());
    }

public bool TryAdd(T item, TimeSpan timeout){
      BlockingCollection<T>.ValidateTimeout(timeout);
      return this.TryAddWithNoTimeValidation(item, (int) timeout.TotalMilliseconds, new CancellationToken());
    }

public bool TryAdd(T item, int millisecondsTimeout){
      BlockingCollection<T>.ValidateMillisecondsTimeout(millisecondsTimeout);
      return this.TryAddWithNoTimeValidation(item, millisecondsTimeout, new           CancellationToken());
}

public bool TryAdd(T item, int millisecondsTimeout, CancellationToken cancellationToken){
 BlockingCollection<T>.ValidateMillisecondsTimeout(millisecondsTimeout);
 return this.TryAddWithNoTimeValidation(item, millisecondsTimeout, cancellationToken);
}

これで、興味のあるメソッドの一部またはすべてに拡張を提供できます。

ステージ 2:

TryAddWithNoTimeValidationデフォルトの代わりに実装を参照するようになりました。

例外TryAddWithNoTimeValidationをスローすることなく安全に続行できる代替バージョンを提供できます。OperationCancellation

于 2012-06-20T13:23:17.603 に答える