1

私が達成しようとしているのは、コンシューマ プロデューサー メソッドを持つことです。多くのプロデューサーが存在する可能性がありますが、コンシューマーは 1 つだけです。スケーラビリティーのために専用のコンシューマーを配置することはできません。そのため、消費するデータがあり、現在アクティブなコンシューマーが存在しない場合、プロデューサに消費プロセスを開始させるという考え方です。

 1. Many threads can be producing messages. (Asynchronous)
 2. Only one thread can be consuming messages. (Synchronous)
 3. We should only have a consumer in process if there is data to be consumed 
 4. A continuous consumer that waits for data would not be efficient if we add many of these classes.

私の例では、データを送信する一連のメソッドがあります。複数のスレッドがデータを書き込むことができますWrite()が、それらのスレッドの 1 つだけがループしてデータを送信しますSendNewData()。データを書き込むことができるループが 1 つだけである理由は、データの順序が同期している必要がありAsyncWrite()、制御できない場合は、一度に 1 つずつ実行することによってのみ順序を保証できるAyncWrite()からです。

私が抱えている問題は、生成するためにスレッドが呼び出されるとWrite()、データをキューに入れInterlocked.CompareExchance、消費者がいるかどうかを確認することです。別のスレッドが既に消費しているループ内にあることがわかった場合、このコンシューマーがデータを送信すると想定します。ループ スレッド コンシューマーが「レース ポイント A」にある場合、これは問題です。このコンシューマーは、送信するメッセージがこれ以上ないことを既に確認しており、消費プロセスをシャットダウンしようとしているためです。

コードの大部分をロックせずにこの競合状態を防ぐ方法はありますか。実際のシナリオには多くのキューがあり、これよりも少し複雑です。

実際のコードList<INetworkSerializable>では、実際には byte[] BufferPool です。このブロックを読みやすくするために、例として List を使用しました。

これらのクラスが一度に何千もアクティブになっているため、SendNewData を専用スレッドで継続的にループさせる余裕はありません。ループ スレッドは、送信するデータがある場合にのみアクティブにする必要があります。

public void Write(INetworkSerializable messageToSend)
{
   Queue.Enqueue(messageToSend);

   // Check if there are any current consumers. If not then we should instigate the consuming.
   if (Interlocked.CompareExchange(ref RunningWrites, 1, 0) == 0)
   { //We are now the thread that consumes and sends data
     SendNewData();
   }
}

//Only one thread should be looping here to keep consuming and sending data synchronously.
private void SendNewData()
{
    INetworkSerializable dataToSend;
    List<INetworkSerializable> dataToSendList = new List<INetworkSerializable>();

    while (true)
    {
        if (!Queue.TryDequeue(out dataToSend))
        {
           //Race Point A
           if (dataToSendList.IsEmpty)
           {
              //All data is sent, return so that another thread can take responsibility.
              Interlocked.Decrement(ref RunningWrites);
              return;
           }

           //We have data in the list to send but nothing more to consume so lets send the data that we do have.             
           break;
        }

        dataToSendList.Add(dataToSend);
    }

    //Async callback is WriteAsyncCallback()
    WriteAsync(dataToSendList);
}

//Callback after WriteAsync() has sent the data.
private void WriteAsyncCallback()
{
    //Data was written to sockets, now lets loop back for more data
    SendNewData();
}
4

3 に答える 3

1

BlockingCollectionで簡単に実装できるプロデューサー/コンシューマーパターンの方が良いようです。

var toSend = new BlockingCollection<something>();

// producers
toSend.Add(something);

// when all producers are done
toSend.CompleteAdding();


// consumer -- this won't end until CompleteAdding is called
foreach(var item in toSend.GetConsumingEnumerable())
   Send(item);

CompleteAddingをいつ呼び出すかを知っているというコメントに対処するために、何千ものプロデューサーをタスクとして起動し、それらすべてのタスクが完了するのを待ってから(Task.WaitAll)、CompleteAddingを呼び出します。必要に応じて、より適切な制御を提供するCancellationTokensを取り込む優れたオーバーロードがあります。

また、TPLは、ブロックされたスレッドをスケジュールするのに非常に優れています。

より完全なコード:

var toSend = new BlockingCollection<int>();            
Parallel.Invoke(() => Produce(toSend), () => Consume(toSend));

...

private static void Consume(BlockingCollection<int> toSend)
{
    foreach (var value in toSend.GetConsumingEnumerable())
    {
        Console.WriteLine("Sending {0}", value);
    }
}

private static void Produce(BlockingCollection<int> toSend)
{
    Action<int> generateToSend = toSend.Add;

    var producers = Enumerable.Range(0, 1000)
                              .Select(n => new Task(value => generateToSend((int) value), n))
                              .ToArray();

    foreach(var p in producers)
    {
        p.Start();
    }

    Task.WaitAll(producers);
    toSend.CompleteAdding();
}
于 2013-02-28T20:38:59.743 に答える
1

このバリアントを確認してください。コードには説明的なコメントがいくつかあります。また、メソッドをもうWriteAsyncCallback呼び出さないことにも注意してくださいSendNewData

private int _pendingMessages;

    private int _consuming;

    public void Write(INetworkSerializable messageToSend)
    {
        Interlocked.Increment(ref _pendingMessages);
        Queue.Enqueue(messageToSend);

        // Check if there is anyone consuming messages
        // if not, we will have to become a consumer and process our own message, 
        // and any other further messages until we have cleaned the queue
        if (Interlocked.CompareExchange(ref _consuming, 1, 0) == 0)
        {
            // We are now the thread that consumes and sends data
            SendNewData();
        }
    }

    // Only one thread should be looping here to keep consuming and sending data synchronously.
    private void SendNewData()
    {
        INetworkSerializable dataToSend;
        var dataToSendList = new List<INetworkSerializable>();
        int messagesLeft;

        do
        {
            if (!Queue.TryDequeue(out dataToSend))
            {
                // there is one possibility that we get here while _pendingMessages != 0:
                // some other thread had just increased _pendingMessages from 0 to 1, but haven't put a message to queue.
                if (dataToSendList.Count == 0)
                {
                    if (_pendingMessages == 0)
                    {
                        _consuming = 0;
                        // and if we have no data this mean that we are safe to exit from current thread.
                        return;
                    }
                }
                else
                {
                    // We have data in the list to send but nothing more to consume so lets send the data that we do have.             
                    break;
                }
            }

            dataToSendList.Add(dataToSend);
            messagesLeft = Interlocked.Decrement(ref _pendingMessages);
        }
        while (messagesLeft > 0);

        // Async callback is WriteAsyncCallback()
        WriteAsync(dataToSendList);
    }

    private void WriteAsync(List<INetworkSerializable> dataToSendList)
    {
        // some code
    }

    // Callback after WriteAsync() has sent the data.
    private void WriteAsyncCallback()
    {
        // ...
        SendNewData();
    }
于 2013-03-01T10:59:21.777 に答える
0

コンシューマーではないことを宣言した後、次のコードを追加して Queue をダブルチェックすることで、競合状態を防ぐことができます。

if (dataToSend.IsEmpty)
{
     //Declare that we are no longer the consumer.
     Interlocked.Decrement(ref RunningWrites);

     //Double check the queue to prevent race condition A
     if (Queue.IsEmpty)
         return;
     else
     {   //Race condition A occurred. There is data again.

         //Let's try to become a consumer.
         if (Interlocked.CompareExchange(ref RunningWrites, 1, 0) == 0)
               continue;

         //Another thread has nominated itself as the consumer. Our job is done.
         return;
     }                                    
}

break;
于 2013-03-01T19:08:32.267 に答える