私が達成しようとしているのは、コンシューマ プロデューサー メソッドを持つことです。多くのプロデューサーが存在する可能性がありますが、コンシューマーは 1 つだけです。スケーラビリティーのために専用のコンシューマーを配置することはできません。そのため、消費するデータがあり、現在アクティブなコンシューマーが存在しない場合、プロデューサに消費プロセスを開始させるという考え方です。
1. Many threads can be producing messages. (Asynchronous)
2. Only one thread can be consuming messages. (Synchronous)
3. We should only have a consumer in process if there is data to be consumed
4. A continuous consumer that waits for data would not be efficient if we add many of these classes.
私の例では、データを送信する一連のメソッドがあります。複数のスレッドがデータを書き込むことができますWrite()
が、それらのスレッドの 1 つだけがループしてデータを送信しますSendNewData()
。データを書き込むことができるループが 1 つだけである理由は、データの順序が同期している必要がありAsyncWrite()
、制御できない場合は、一度に 1 つずつ実行することによってのみ順序を保証できるAyncWrite()
からです。
私が抱えている問題は、生成するためにスレッドが呼び出されるとWrite()
、データをキューに入れInterlocked.CompareExchance
、消費者がいるかどうかを確認することです。別のスレッドが既に消費しているループ内にあることがわかった場合、このコンシューマーがデータを送信すると想定します。ループ スレッド コンシューマーが「レース ポイント A」にある場合、これは問題です。このコンシューマーは、送信するメッセージがこれ以上ないことを既に確認しており、消費プロセスをシャットダウンしようとしているためです。
コードの大部分をロックせずにこの競合状態を防ぐ方法はありますか。実際のシナリオには多くのキューがあり、これよりも少し複雑です。
実際のコードList<INetworkSerializable>
では、実際には byte[] BufferPool です。このブロックを読みやすくするために、例として List を使用しました。
これらのクラスが一度に何千もアクティブになっているため、SendNewData を専用スレッドで継続的にループさせる余裕はありません。ループ スレッドは、送信するデータがある場合にのみアクティブにする必要があります。
public void Write(INetworkSerializable messageToSend)
{
Queue.Enqueue(messageToSend);
// Check if there are any current consumers. If not then we should instigate the consuming.
if (Interlocked.CompareExchange(ref RunningWrites, 1, 0) == 0)
{ //We are now the thread that consumes and sends data
SendNewData();
}
}
//Only one thread should be looping here to keep consuming and sending data synchronously.
private void SendNewData()
{
INetworkSerializable dataToSend;
List<INetworkSerializable> dataToSendList = new List<INetworkSerializable>();
while (true)
{
if (!Queue.TryDequeue(out dataToSend))
{
//Race Point A
if (dataToSendList.IsEmpty)
{
//All data is sent, return so that another thread can take responsibility.
Interlocked.Decrement(ref RunningWrites);
return;
}
//We have data in the list to send but nothing more to consume so lets send the data that we do have.
break;
}
dataToSendList.Add(dataToSend);
}
//Async callback is WriteAsyncCallback()
WriteAsync(dataToSendList);
}
//Callback after WriteAsync() has sent the data.
private void WriteAsyncCallback()
{
//Data was written to sockets, now lets loop back for more data
SendNewData();
}