2

いくつかの調査を行った後、Concurrent コレクションから 2 つのアイテムを効果的に削除する方法に関するフィードバックに頼っています。私の状況には、現在 BlockingCollection に配置されている UDP 経由の受信メッセージが含まれます。コレクションに 2 人のユーザーが含まれたら、安全に 2 人のユーザーを取得して処理する必要があります。以下にリストされているいくつかのアイデアを含む、いくつかの異なる手法を見てきました。私の現在の実装は以下のとおりですが、Users が 2 つのグループで処理されることを保証しながら、これを行うためのよりクリーンな方法があると考えています。これが、このシナリオでの唯一の制限です。

現在の実装:

    private int userQueueCount = 0;
    public BlockingCollection<User> UserQueue = new BlockingCollection<User>();

    public void JoinQueue(User u)
    {
           UserQueue.Add(u);
           Interlocked.Increment(ref userQueueCount);

           if (userQueueCount > 1)
           {
               IEnumerable<User> users = UserQueue.Take(2);
               if(users.Count==2) {
                 Interlocked.Decrement(ref userQueueCount);
                 Interlocked.Decrement(ref userQueueCount);
                 ... do some work with users but if only one 
                 is removed I'll run into problems
               }

           }
    }

私がやりたいのはこのようなことですが、現在、整合性を確保するために本番環境でこれをテストすることはできません.

 Parallel.ForEach(UserQueue.Take(2), (u) => { ... });

またはさらに良い:

    public void JoinQueue(User u)
    {
           UserQueue.Add(u);
           // if needed? increment
           Interlocked.Increment(ref userQueueCount);
           UserQueue.CompleteAdding();
    }

次に、これをどこかに実装します。

        Task.Factory.StartNew(() =>
        {
            while (userQueueCount > 1) OR (UserQueue.Count > 1) If it's safe?
            {
                IEnumerable<User> users = UserQueue.Take(2);
                ... do stuff
            }

        });

これに関する問題は、条件 (Count > 1) と、UserQueue に処理するアイテムが少なくとも 2 つあることを確認している Take(2) の間で保証できるかどうかわからないことです。着信 UDP メッセージは並行して処理されるため、ブロッキング/同時コレクションからアイテムを 2 つずつ安全にプルする方法が必要です。

これを行うためのより良い/安全な方法はありますか?

改訂されたコメント: この質問の意図した目標は、.Net 4.0 の並行コレクションから項目を処理する安定した/スレッドセーフな方法を実現することです。きれいである必要はありません。並列環境で順序付けされていない 2 のペアでアイテムを処理するタスクで安定している必要があります。

4

4 に答える 4

2

これが私が大まかなコードで行うことです:

ConcurrentQueuequeue = new ConcurrentQueue(); //can use a BlockingCollection too (as it's just a blocking ConcurrentQueue by default anyway)

public void OnUserStartedGame(User joiningUser)
{
   User waitingUser;
   if (this.gameQueue.TryDequeue(out waitingUser)) //if there's someone waiting, we'll get him
      this.MatchUsers(waitingUser, joiningUser);
   else
      this.QueueUser(joiningUser); //it doesn't matter if there's already someone in the queue by now because, well, we are using a queue and it will sort itself out.
}

private void QueueUser(User user)
{
   this.gameQueue.Enqueue(user);
}

private void MatchUsers(User first, User second)
{
   //not sure what you do here
}

基本的な考え方は、誰かがゲームを開始したいと思っていて、キューに誰かがいる場合は、それらを一致させてゲームを開始するというものです。誰もいない場合は、キューに追加します。せいぜい一度に1人のユーザーしかキューに入れませんが、そうでない場合でも、他のユーザーがゲームを開始すると、待機中のユーザーは徐々に削除され、キューが空になるまで新しいユーザーは追加されないため、それほど悪くはありません。また。

于 2012-06-04T20:58:20.737 に答える
1

何らかの理由でユーザーのペアをコレクションに入れることができなかった場合は、ConcurrentQueueを使用して、一度に2つのアイテムをTryDequeueしようとします(1つしか取得できない場合は、元に戻します)。必要に応じて待ちます。

于 2012-06-04T20:22:20.197 に答える
1

ここでの最も簡単な解決策は、ロックを使用することだと思います。すべてのコンシューマーに対して 1 つのロックを使用します (プロデューサーはロックを使用しません)。これにより、ユーザーを常に正しい順序で取得できます。

User firstUser;
User secondUser;

lock (consumerLock)
{
    firstUser = userQueue.Take();
    secondUser = userQueue.Take();
}

Process(firstUser, secondUser);

もう 1 つのオプションは、1 人のユーザー用と 2 人のユーザー用の 2 つのキューを用意し、それらを最初のキューから 2 番目のキューに転送するプロセスを用意することです。

別のスレッドを浪費してもかまわない場合は、次の 2 つBlockingCollectionの sを使用してこれを行うことができます。

while (true)
{
    var firstUser = incomingUsers.Take();
    var secondUser = incomingUsers.Take();

    userPairs.Add(Tuple.Create(firstUser, secondUser));
}

ここでロックについて心配する必要はありません。シングル ユーザーのキューにはコンシューマーが 1 つしかないため、ペアのコンシューマーはシンプルTake()を安全に使用できるようになりました。

スレッドを浪費することに関心があり、TPL Dataflow を使用できる場合は、受信アイテムをn 個のアイテムBatchBlock<T>のバッチに結合する を使用できます。ここで、 nはブロックの作成時に構成されているため、2 に設定できます。

于 2012-06-05T09:27:55.517 に答える