4

コンシューマーの数を動的に調整して、複数のスレッドでキューを処理する方法を見つけようとしています。基本的に、タスクは非常によく知られています。複数のプロデューサーがメッセージを作成してキューに送信し、複数のコンシューマーがキューからのメッセージを処理します。今、私は System.Collections.Queue.Synchronized、System.Collections.Concurrent.ConcurrentQueue、System.Collections.Concurrent.BlockingCollection などのさまざまなコンポーネントを利用してそれを行うさまざまな方法を考えましたが、それを適切に行う方法を決定することはできません効率を最大限に高めますので、皆様からのご意見を参考に、明るいアイデアをお待ちしております。
詳細は次のとおりです。

  • 場合によっては、メッセージ レートが非常に集中することが予想されますが、処理は比較的簡単です。
  • 何人のコンシューマを用意すればよいかわかりません。
  • エンキューされたメッセージの量に応じて、現在のコンシューマーをブロックするのではなく、現在のコンシューマーの数を調整するプロセスが必要です (つまり、100 のメッセージごとに追加のコンシューマー fe を入力したいということです。エンキューされたメッセージの数が、入力に必要な数よりも 50 少ない場合、メッセージの量が 300 を超えると 3 番目のコンシューマーに入力され、250 に減少すると停止する必要があります)。

これがアイデアです。ここで、ConcurrentQueue を Enqueue メソッドをカプセル化するクラスにラップし、エンキュー後にメッセージの数をチェックして、追加のコンシューマーを開始するかどうかを決定することを考えました。そして、コンシューマは、ループ内でそれを停止するかどうかを決定するチェックを行う必要があります。もっと面白い解決策を提案してくれると思います。

ところで、私がまだ処理方法がわからない状況の 1 つは、理論的には、最後のメッセージがキューに入れられ、同時に最後のコンシューマが停止したときです。もう 1 つの状況は停止に関するものです。複数のコンシューマが同時に停止チェックに到達すると停止します。これらの状況にどのように対処すればよいですか?

私が何を意味するかを示すために、次のサンプルを検討してください。

class MessageController
{
    private BlockingCollection<IMessage> messageQueue = new BlockingCollection<IMessage>();

    int amountOfConsumers;

    public void Enqueue(IMessage message)
    {
        messageQueue.Add(message); // point two

        if (Math.Floor((double)messageQueue.Count / 100)+1 > amountOfConsumers) // point three
        {
            Task.Factory.StartNew(() =>
            {
                IMessage msg;
                while ((messageQueue.Count > 0) && (Math.Floor((double)((messageQueue.Count + 50) / 100)) + 1 >= amountOfConsumers)) //point one
                {
                    msg = messageQueue.Take();
                    //process msg...
                }

                ConsumerQuit(); // point four
            });

            Interlocked.Increment(ref amountOfConsumers);
        }
    }

    public void ConsumerQuit()
    {
        Interlocked.Decrement(ref amountOfConsumers);
    }
}

したがって、特定のコード行を指すことができるようになったとき、これらは質問です。

  • 最後のコンシューマーがエンキューされたメッセージがないことを発見したとき (@ ポイント 1)、ConsumerQuit メソッドを呼び出す前に、最後のメッセージが到着してエンキューされ、追加のコンシューマーのチェックが行われ、それが判明します (@ ポイント 3)。まだコンシューマーが動作しているため、1 つのメッセージに対して 1 つのコンシューマーで十分です。何も起こらず、ConsumerQuit が最終的に呼び出され、メッセージがキューにスタックします。
ConsumerTask                       | LastMessageThread
------------------------------------------------------
@point one(messageQueue.Count=0)   | @point two
no time                            | @point three(amountOfConsumers=1)
@point four                        | ended;
ended;                             | ended;
  • いくつかのコンシューマーは、そのうちの 1 つを停止する必要があるときに「ポイント 1」チェックに同時に達しました (fe messageQueue.Count は 249)。そのうちの 1 つで ConsumerQuit が呼び出される前に、他のいくつかのコンシューマーもこのチェックを行うため、いくつかは停止します。 .
ConsumerTask1                  | ConsumerTask2| ConsumerTask3 | ConsumerTask4|
------------------------------------------------------------------------------
@point one(.Count=249;amount=4)| no time      | no time       | @point one   |
no time                        | @point one   | processing msg| @point four  |
@point four                    | no time      | @point one    | ended;       |
ended;                         | @point four  | processing msg| ended;       |
ended;                         | ended;       | ...           | ended;       |

ここで、最後のメッセージがすでにキューに入れられている場合、249 個のメッセージを単独で処理する必要がある 1 つのコンシューマー タスクが残っていますが、最悪の場合、最後のメッセージの後、数百のメッセージがスタックする可能性があるため、すべてのタスクが停止する可能性があります。

4

2 に答える 2

1

パフォーマンスについてはわかりませんが、最終的に解決策を思いついたようです。次のコードを検討してください。フィードバックをお待ちしております。たとえそれらが完全に異なっていて、アプローチの大きな変更を必要とするとしても、私はまだいくつかの他の解決策やアイデアを見たいと思っています. これが目的です:「コンシューマーの数を動的に調整して、複数のスレッドでキューを処理する方法」

class MessageController
{
    private BlockingCollection<IMessage> messageQueue = new BlockingCollection<IMessage>();

    private ManualResetEvent mre = new ManualResetEvent(true);

    private int amountOfConsumers;

    object o = new object();

    public void Enqueue(IMessage message)
    {
        messageQueue.Add(message);

        mre.WaitOne();
        if (Math.Floor((double)messageQueue.Count / 100)+1 > amountOfConsumers)
        {
            Interlocked.Increment(ref amountOfConsumers);

            var task = Task.Factory.StartNew(() =>
            {
                IMessage msg;
                bool repeat = true;

                while (repeat)
                {
                    while ((messageQueue.Count > 0) && (Math.Floor((double)((messageQueue.Count + 50) / 100)) + 1 >= amountOfConsumers))
                    {
                        msg = messageQueue.Take();
                        //process msg...
                    }

                    lock (o)
                    {
                        mre.Reset();

                        if ((messageQueue.Count == 0) || (Math.Ceiling((double)((messageQueue.Count + 51) / 100)) < amountOfConsumers))
                        {
                            ConsumerQuit();
                            repeat = false;
                        }

                        mre.Set();
                    }
                }
            });
        }
    }

    public void ConsumerQuit()
    {
        Interlocked.Decrement(ref amountOfConsumers);
    }
}
于 2013-03-05T19:05:41.537 に答える
1

私の最初の考えは、あなたがこれを逆に設計していることです。

並列処理を見ると、1 つのタスクにスレッドを追加することで常に効率が上がるとは限りません。最適な数が、使用しているマシンのコア数以下である場合があります。これは、ロックの競合とスレッドの切り替えによりオーバーヘッドが増えるためです。

コンシューマーを追加すると、消費率が実際には増加するのではなく減少することがわかる場合があります。

考慮すべきことの 1 つは、メッセージの処理にかかる時間です。この時間がタスクの生成にかかる時間よりも大幅に長い場合、各メッセージを処理する新しいタスクを作成するだけの単一のコンシューマーを使用しないのはなぜでしょうか?

class MessageController
{
    private BlockingCollection<IMessage> messageQueue = new BlockingCollection<IMessage>();

    public void Enqueue(IMessage message)
    {
        messageQueue.Add(message);
    }

    public void Consume()
    {
        //This loop will not exit until messageQueue.CompleteAdding() is called
        foreach (var item in messageQueue.GetConsumingEnumerable())
        {
            IMessage message = item;
            Task.Run(() => ProcessMessage(message);
        }
    }
}
于 2013-03-05T19:06:57.113 に答える