1

Webサービス用の比較的単純な「プロキシ」アプリを作成しています。一般的な考え方は、TCPサーバー(非同期接続付き)がクライアントからデータを読み取り(文字列)、そのデータを(コールバック読み取り関数の一部として)2つのキュー(Q1とQ2)のいずれかに配置することです。別のスレッドがこれらのキューのデータを読み取り、それをWebサービスに渡します。第1四半期のデータは、第2四半期にある可能性のあるすべてのデータよりも優先される必要があります。

私は生産者/消費者パターンについて読んでいますが、それは私がキューに関して実装しようとしていることとほぼ同じようです。エンキューとデキューの操作は異なるスレッドで行われるため、キューがスレッドセーフであり、何らかのロックメカニズムをサポートしている必要があることは明らかですか?これは.NET4.0アプリケーションであり、新しいBlockingCollectionクラスとConcurrentQueueクラスのドキュメントを見ましたが、違いが何であるか、またはこのシナリオでそれらをどのように実装するかは正確にはわかりません。誰かがそれについてもう少し光を当てることができますか?ありがとうございました!

4

2 に答える 2

3

次のクラスのようにします。Enqueue()アイテムを作成するときに呼び出して、キューの1つに追加します。このメソッドは常に(ほぼ)すぐに戻ります。別のスレッドではDequeue()、アイテムを消費する準備ができたときに呼び出します。最初に優先度の高いキューから取得しようとします。キューのいずれにも現在利用可能なアイテムがない場合、呼び出しはブロックされます。制作が終わったら、と呼びますComplete()。その呼び出しが行われ、両方のキューが空になった後、次の呼び出し(または現在ブロックされている呼び出し)がDequeue()スローしInvalidOperationExceptionます。

プロデューサーがコンシューマーよりも長期間高速である可能性がある場合は、キューをバインドする必要があります(new BlockingCollection<T>(capacity))。ただし、この場合、優先度の低いアイテムと優先度の高いアイテムの両方を生成するスレッドが1つしかない場合、優先度の高いアイテムは優先度の低いアイテムを待機しなければならない可能性があります。これを修正するには、優先度の高いアイテム用に1つのスレッドを作成し、優先度の低いアイテム用に1つのスレッドを作成します。または、優先度の高いキューのみをバインドして、一度に100万個の優先度の低いアイテムを取得しないことを期待することもできます。

class Worker<T>
{
    BlockingCollection<T> m_highPriorityQueue = new BlockingCollection<T>();
    BlockingCollection<T> m_lowPriorityQueue = new BlockingCollection<T>();

    public void Enqueue(T item, bool highPriority)
    {
        BlockingCollection<T> queue;
        if (highPriority)
            queue = m_highPriorityQueue;
        else
            queue = m_lowPriorityQueue;

        queue.Add(item);
    }

    public T Dequeue()
    {
        T result;

        if (!m_highPriorityQueue.IsCompleted)
        {
            if (m_highPriorityQueue.TryTake(out result))
                return result;
        }

        if (!m_lowPriorityQueue.IsCompleted)
        {
            if (m_lowPriorityQueue.TryTake(out result))
                return result;
        }

        if (m_highPriorityQueue.IsCompleted && m_lowPriorityQueue.IsCompleted)
            throw new InvalidOperationException("All work is done.");
        else
        {
            try
            {
                BlockingCollection<T>.TakeFromAny(
                    new[] { m_highPriorityQueue, m_lowPriorityQueue },
                    out result);
            }
            catch (ArgumentException ex)
            {
                throw new InvalidOperationException("All work is done.", ex);
            }

            return result;
        }
    }

    public void Complete()
    {
        m_highPriorityQueue.CompleteAdding();
        m_lowPriorityQueue.CompleteAdding();
    }
}
于 2011-03-31T21:10:23.633 に答える
1

BlockingCollectionは、デフォルトでConcurrentQueueを使用します。アプリケーションに適しているはずです。メールボックスと非同期ブロックでF#を使用した方が簡単な場合があります。以前、一般的な実装のサンプル投稿を作成しました。

F#AgentまたはMailboxProcessorを使用したMap / Reduce

于 2011-03-31T18:11:16.003 に答える