6

固定サイズの FIFO キューにプロデューサー/コンシューマー パターンを実装する必要があります。ConcurrentQueue のラッパー クラスがこれで機能する可能性があると思いますが、完全にはわかりません (以前に ConcurrentQueue を使用したことがありません)。これのひねりは、キューが固定数のアイテム (私の場合は文字列) のみを保持する必要があることです。私のアプリケーションには、1 つのプロデューサー タスク/スレッドと 1 つのコンシューマー タスク/スレッドがあります。私の消費者タスクが実行されると、その時点でキューに存在するすべてのアイテムをキューから取り出して処理する必要があります。

価値のあることとして、消費者によるキューに入れられたアイテムの処理は、100% 信頼できない Web アプリに SOAP 経由でアイテムをアップロードすることに他なりません。接続を確立できない場合、または SOAP 呼び出しの呼び出しが失敗した場合は、それらの項目を破棄して、キューに戻って詳細を確認する必要があります。SOAP のオーバーヘッドのため、1 回の SOAP 呼び出しで送信できるキューからの項目数を最大化しようとしました。

場合によっては、消費者がアイテムを削除して処理するよりも、生産者がアイテムを追加する方が速い場合があります。キューがすでにいっぱいで、プロデューサーが別のアイテムを追加する必要がある場合、キューのサイズが固定されたままになるように、新しいアイテムをキューに入れ、最も古いアイテムをデキューする必要があります。基本的に、生成された最新のアイテムを常にキューに保持する必要があります (コンシューマーが現在前のアイテムを処理しているため、一部のアイテムが消費されないことを意味する場合でも)。

キュー内のアイテムが固定されている場合にプロデューサーが数を維持することに関して、この質問から1つの潜在的なアイデアを見つけました。

新しいエンキュー時に古い値を自動的にデキューする固定サイズのキュー

私は現在、次のような Enqueue() メソッドを使用して ConcurrentQueue の周りにラッパー クラス (その回答に基づく) を使用しています。

public class FixedSizeQueue<T>
{
    readonly ConcurrentQueue<T> queue = new ConcurrentQueue<T>();

    public int Size { get; private set; }

    public FixedSizeQueue(int size)
    {
        Size = size;
    }

    public void Enqueue(T obj)
    {
        // add item to the queue
        queue.Enqueue(obj);

        lock (this) // lock queue so that queue.Count is reliable
        {
            while (queue.Count > Size) // if queue count > max queue size, then dequeue an item
            {
                T objOut;
                queue.TryDequeue(out objOut);
            }
        }
    }
}

次のように、キューにサイズ制限のあるこのクラスのインスタンスを作成します。

FixedSizeQueue<string> incomingMessageQueue = new FixedSizeQueue<string>(10); // 10 item limit

プロデューサー タスクを開始すると、キューがいっぱいになり始めます。私の Enqueue() メソッドのコードは、アイテムを追加するとキューの数が最大サイズを超えたときにキューから最も古いアイテムを削除することに関して、適切に機能しているようです。ここで、アイテムをデキューして処理する消費者タスクが必要ですが、ここで私の頭が混乱します。ある時点でキューのスナップショットを取得し、処理のためにすべての項目をデキューするコンシューマ用の Dequeue メソッドを実装する最良の方法は何ですか (プロデューサーは、このプロセス中にまだ項目をキューに追加している可能性があります)。

4

2 に答える 2

7

簡単に言うと、ConcurrentQueue には「ToArray」メソッドがあり、入力するとコレクションがロックされ、キュー内の現在のすべてのアイテムの「スナップショット」が生成されます。コンシューマーに処理対象のブロックを与えたい場合は、キューに登録するメソッドと同じオブジェクトをロックし、ToArray() を呼び出してから、while(!queue.IsEmpty) queue.TryDequeue(out trash)抽出した配列を返す前に、ループをスピンしてキューをクリアします。

これはあなたのGetAll()方法です:

public T[] GetAll()
{
    lock (syncObj) // so that we don't clear items we didn't get with ToArray()
    {
        var result = queue.ToArray();
        T trash;
        while(!queue.IsEmpty) queue.TryDequeue(out trash);
    }
}

キューをクリアする必要があるため、2 つの操作を単純に組み合わせることができます。(queue.Count を使用して) 適切なサイズの配列を作成し、キューが空でない間にアイテムをデキューして配列に入れ、戻る前に。

さて、それが特定の質問に対する答えです。CodeReview.SE の帽子をかぶって、いくつかのことを指摘しなければなりません。

  • 絶対に使用しないlock(this)でください。他のどのオブジェクトがあなたのオブジェクトをロック フォーカスとして使用している可能性があるかはわかりません。そのため、オブジェクトが内部からロックされるとブロックされます。ベスト プラクティスは、プライベート スコープのオブジェクト インスタンスをロックすることです。通常は、ロックするためだけに作成されたものです。private readonly object syncObj = new object();

  • とにかくラッパーの重要なセクションをロックしているのでList<T>、並行コレクションの代わりに通常のコレクションを使用します。アクセスが高速になり、より簡単にクリーンアップできるため、ConcurrentQueue で許可されているよりもはるかに簡単に実行できるようになります。エンキューするには、同期オブジェクト Insert() をインデックス 0 の前にロックし、RemoveRange() を使用して、インデックス Size からリストの現在の Count までの項目を削除します。デキューするには、同じ同期オブジェクトをロックし、myList.ToArray() を呼び出し (Linq 名前空間から。ConcurrentQueue とほぼ同じことを行います)、配列を返す前に myList.Clear() を呼び出します。これ以上簡単なことはありません:

    public class FixedSizeQueue<T>
    {
    private readonly List<T> queue = new List<T>();
    private readonly object syncObj = new object();
    
    public int Size { get; private set; }
    
    public FixedSizeQueue(int size) { Size = size; }
    
    public void Enqueue(T obj)
    {
        lock (syncObj)
        {
            queue.Insert(0,obj)
            if(queue.Count > Size) 
               queue.RemoveRange(Size, Count-Size);
        }
    }
    
    public T[] Dequeue()
    {
        lock (syncObj)
        {
            var result = queue.ToArray();
            queue.Clear();
            return result;
        }
    }
    }
    
  • このモデルを使用して、エンキューされたアイテムを捨てていることを理解しているようです。それは通常は良いことではありませんが、疑いの余地はありません。ただし、BlockingCollection を使用して、これを達成するロスレスな方法があると言えます。BlockingCollection は、ほとんどの System.Collections.Concurrent クラスを含む IProducerConsumerCollection をラップし、キューの最大容量を指定できるようにします。コレクションは、空のキューからデキューしようとするスレッド、または満杯のキューに追加しようとするスレッドをブロックします。アイテムが追加または削除されて、何かを取得するか挿入する余地ができるまでブロックされます。これは、最大サイズのプロデューサー/コンシューマー キューを実装するための最良の方法です。消費者が取り組むものです。この道を行くと、消費者が捨てなければならないものだけが捨てられます。コンシューマーは、プロデューサーが入れたすべての行を見て、それぞれについて独自の決定を下します。

于 2012-09-13T16:55:35.800 に答える
5

lockと一緒に使用したくありませんthislock(this) {…} が悪い理由を参照してください。詳細については。

このコード

// if queue count > max queue size, then dequeue an item
while (queue.Count > Size) 
{
    T objOut;
    queue.TryDequeue(out objOut);
}

商品の入手可能性について何らかの方法で待つか、消費者に通知する必要があることを示唆しています。この場合、BlockingCollection<T>代わりに使用することを検討してください。

于 2012-09-13T16:43:02.147 に答える