コンシューマーの数を動的に調整して、複数のスレッドでキューを処理する方法を見つけようとしています。基本的に、タスクは非常によく知られています。複数のプロデューサーがメッセージを作成してキューに送信し、複数のコンシューマーがキューからのメッセージを処理します。今、私は System.Collections.Queue.Synchronized、System.Collections.Concurrent.ConcurrentQueue、System.Collections.Concurrent.BlockingCollection などのさまざまなコンポーネントを利用してそれを行うさまざまな方法を考えましたが、それを適切に行う方法を決定することはできません効率を最大限に高めますので、皆様からのご意見を参考に、明るいアイデアをお待ちしております。
詳細は次のとおりです。
- 場合によっては、メッセージ レートが非常に集中することが予想されますが、処理は比較的簡単です。
- 何人のコンシューマを用意すればよいかわかりません。
- エンキューされたメッセージの量に応じて、現在のコンシューマーをブロックするのではなく、現在のコンシューマーの数を調整するプロセスが必要です (つまり、100 のメッセージごとに追加のコンシューマー fe を入力したいということです。エンキューされたメッセージの数が、入力に必要な数よりも 50 少ない場合、メッセージの量が 300 を超えると 3 番目のコンシューマーに入力され、250 に減少すると停止する必要があります)。
これがアイデアです。ここで、ConcurrentQueue を Enqueue メソッドをカプセル化するクラスにラップし、エンキュー後にメッセージの数をチェックして、追加のコンシューマーを開始するかどうかを決定することを考えました。そして、コンシューマは、ループ内でそれを停止するかどうかを決定するチェックを行う必要があります。もっと面白い解決策を提案してくれると思います。
ところで、私がまだ処理方法がわからない状況の 1 つは、理論的には、最後のメッセージがキューに入れられ、同時に最後のコンシューマが停止したときです。もう 1 つの状況は停止に関するものです。複数のコンシューマが同時に停止チェックに到達すると停止します。これらの状況にどのように対処すればよいですか?
私が何を意味するかを示すために、次のサンプルを検討してください。
class MessageController
{
private BlockingCollection<IMessage> messageQueue = new BlockingCollection<IMessage>();
int amountOfConsumers;
public void Enqueue(IMessage message)
{
messageQueue.Add(message); // point two
if (Math.Floor((double)messageQueue.Count / 100)+1 > amountOfConsumers) // point three
{
Task.Factory.StartNew(() =>
{
IMessage msg;
while ((messageQueue.Count > 0) && (Math.Floor((double)((messageQueue.Count + 50) / 100)) + 1 >= amountOfConsumers)) //point one
{
msg = messageQueue.Take();
//process msg...
}
ConsumerQuit(); // point four
});
Interlocked.Increment(ref amountOfConsumers);
}
}
public void ConsumerQuit()
{
Interlocked.Decrement(ref amountOfConsumers);
}
}
したがって、特定のコード行を指すことができるようになったとき、これらは質問です。
- 最後のコンシューマーがエンキューされたメッセージがないことを発見したとき (@ ポイント 1)、ConsumerQuit メソッドを呼び出す前に、最後のメッセージが到着してエンキューされ、追加のコンシューマーのチェックが行われ、それが判明します (@ ポイント 3)。まだコンシューマーが動作しているため、1 つのメッセージに対して 1 つのコンシューマーで十分です。何も起こらず、ConsumerQuit が最終的に呼び出され、メッセージがキューにスタックします。
ConsumerTask | LastMessageThread ------------------------------------------------------ @point one(messageQueue.Count=0) | @point two no time | @point three(amountOfConsumers=1) @point four | ended; ended; | ended;
- いくつかのコンシューマーは、そのうちの 1 つを停止する必要があるときに「ポイント 1」チェックに同時に達しました (fe messageQueue.Count は 249)。そのうちの 1 つで ConsumerQuit が呼び出される前に、他のいくつかのコンシューマーもこのチェックを行うため、いくつかは停止します。 .
ConsumerTask1 | ConsumerTask2| ConsumerTask3 | ConsumerTask4| ------------------------------------------------------------------------------ @point one(.Count=249;amount=4)| no time | no time | @point one | no time | @point one | processing msg| @point four | @point four | no time | @point one | ended; | ended; | @point four | processing msg| ended; | ended; | ended; | ... | ended; |
ここで、最後のメッセージがすでにキューに入れられている場合、249 個のメッセージを単独で処理する必要がある 1 つのコンシューマー タスクが残っていますが、最悪の場合、最後のメッセージの後、数百のメッセージがスタックする可能性があるため、すべてのタスクが停止する可能性があります。