4

1 つのプロデューサー、複数のコンシューマー、マルチスレッド アプリケーションを実装するための最良のシナリオを探しています。現在、共有バッファーに 1 つのキューを使用していますが、1 つのプロデューサーと 1 つのコンシューマーの場合よりもはるかに遅くなります。私は次のようにする予定です:

Queue<item>[] buffs = new Queue<item>[N];
object[] _locks = new object[N];
static void Produce()
{
    int curIndex = 0;
    while(true)
    {
        // Produce item;
        lock(_locks[curIndex])
        {
            buffs[curIndex].Enqueue(curItem);
            Monitor.Pulse(_locks[curIndex]);
        }
        curIndex = (curIndex+1)%N;
    }
}

static void Consume(int myIndex)
{
    item curItem;
    while(true)
    {
        lock(_locks[myIndex])
        {
            while(buffs[myIndex].Count == 0)
                Monitor.Wait(_locks[myIndex]);
            curItem = buffs[myIndex].Dequeue();
        }
        // Consume item;
    }
}

static void main()
{
    int N = 100;
    Thread[] consumers = new Thread[N];
    for(int i = 0; i < N; i++)
    {
        consumers[i] = new Thread(Consume);
        consumers[i].Start(i);
    }
    Thread producer = new Thread(Produce);
    producer.Start();
}
4

3 に答える 3

6

BlockingCollectionを使用する

BlockingCollection<item> _buffer = new BlockingCollection<item>();

static void Produce()
{
    while(true)
    {
        // Produce item;
        _buffer.Add(curItem);
    }

    // eventually stop producing
    _buffer.CompleteAdding();
}

static void Consume(int myIndex)
{
    foreach (var curItem in _buffer.GetConsumingEnumerable())
    {
        // Consume item;
    }
}

static void main()
{
    int N = 100;
    Thread[] consumers = new Thread[N];
    for(int i = 0; i < N; i++)
    {
        consumers[i] = new Thread(Consume);
        consumers[i].Start(i);
    }
    Thread producer = new Thread(Produce);
    producer.Start();
}

最初からスレッド数を指定したくない場合は、代わりに Parallel.ForEach を使用できます。

static void Consume(item curItem)
{
    // consume item
}

void Main()
{
    Thread producer = new Thread(Produce);
    producer.Start();

    Parallel.ForEach(_buffer.GetConsumingPartitioner(), Consumer)
}
于 2013-10-02T14:05:20.407 に答える
1

より多くのスレッドを使用しても役に立ちません。パフォーマンスが低下することさえあります。ThreadPoolすべての作業項目がプロデューサーによって作成された 1 つの項目である場合に使用することをお勧めします。ただし、それは、生産されたアイテムが生産された順序で消費されることを保証するものではありません。


別の方法として、コンシューマーの数を 4 に減らし、次のように動作方法を変更することもできます。

プロデューサーは新しい作業をキューに追加します。すべてのワーカー スレッドに対して 1 つのグローバル キューしかありません。次に、次のような新しい作業があることを示すフラグを設定します。

ManualResetEvent workPresent = new ManualResetEvent(false);
Queue<item> workQueue = new Queue<item>();

static void Produce()
{
    while(true)
    {
        // Produce item;
        lock(workQueue)
        {
            workQueue.Enqueue(newItem);
            workPresent.Set();
        }
    }
}

コンシューマーは、作業がキューに追加されるのを待ちます。1 つの消費者だけがその仕事をするようになります。次に、キューからすべての作業を取得し、フラグをリセットします。それが完了するまで、プロデューサーは新しい作業を追加できません。

static void Consume()
{
    while(true)
    {
        if (WaitHandle.WaitOne(workPresent))
        {
            workPresent.Reset();

            Queue<item> localWorkQueue = new Queue<item>();
            lock(workQueue)
            {
                while (workQueue.Count > 0)
                    localWorkQueue.Enqueue(workQueue.Dequeue());
            }

            // Handle items in local work queue
            ...
        }
    }
}    

ただし、これの結果は少し予測できません。1 つのスレッドがすべての作業を行っていて、他のスレッドは何もしていない可能性があります。

于 2013-10-02T07:44:53.880 に答える