1

タスクのキューがあり、各タスクに共有リソースアクセスを制御するロックオブジェクト(syncObject)があるとします。キューには、syncObjectの同じインスタンスを共有する複数のタスクを含めることができます。また、タスクをデキューしてキュー順に処理する必要があるN個の同時スレッドがあります。これは、キューの順序でsyncObjectのロックを取得することを意味します。

コードの説明:

abstract class Task
{
    public readonly Object SyncObject = new Object();
}

Queue<Task> taskQueue = new Queue<Task>();
Object queueLock = new Object();

void TakeItemThreadedMethod()
{
    Task task;
    lock(queueLock) task = taskQueue.Dequeue();
    //Between this lines is my problem,
    //Other thread can dequeue next task and it may share same syncObject and
    //acquire lock on it before this thread, thought this task was first in queue
    lock(task.SyncObject)
    {
        //Do some work here
    }
}

同じSyncObjectをキューにある順序で共有するタスクの処理を開始する方法(Task.SyncObjectロックを取得する)。

4

3 に答える 3

2

キューに個々のタスクを含めるべきではないように思われますが、各サブキューが「同期ロックを共有するすべてのタスク」であるタスクのキューです。

したがって、プロセッサは次のようになります。

  • メインキューからサブキューを削除します
  • 最初のタスクをサブキューからデキューして処理します
  • 終了したら、サブキューをメインキューの最後に戻します(または、実際には、スケジューリングをどのように機能させるかを決定します)。

これにより、サブキューごとに一度に1つのタスクのみが実行されるようになります。

おそらく、ロックからサブキューへのマップが必要になるでしょう。そうすれば、作業を作成するものはすべて、それを適切なサブキューに追加できます。その機能がまったく必要であると仮定すると、マップからサブキューを削除するタイミングをアトミックに検討する必要があります(メインキューに戻さないでください)。

編集:上記の最適化として、共有同期ロックとして使用しているものにサブキュー自体を配置することができます。「次に実行する単一のタスク」または「タスクのキュー」のいずれかへの参照を持つことができます-キューを遅延して作成するだけです。次に、同期ロック(実際にはもうロックとして使用する必要はありません)をキューに配置すると、各コンシューマーは次のタスクの実行を要求するだけです。使用可能なタスクが1つしかない場合は、それが返されます(そして、「次のタスク」変数がnullに設定されます)。使用可能なタスクが複数ある場合、最初のタスクはデキューされます。

プロデューサーが新しいタスクを追加すると、「最初のタスク」変数が実行するタスクに設定されます(以前はnullだった場合)。キューがなかったがすでにタスクであった場合はキューが作成されます。すでに存在する場合は追加するだけです。これにより、不要なキュー作成の非効率性が解決されます。

繰り返しになりますが、トリッキーな部分は、共有リソースロックをアトミックに破棄する方法を検討することです-最後のアイテムを処理したにのみそうする必要があるためですが、同様に、タスクを追加したためにタスクを見逃したくないためです間違った時間。それほど悪くはないはずですが、同様に慎重に考える必要があります。

于 2012-11-01T07:15:47.840 に答える
2

このアプローチはどうですか?

  • キューの代わりにリストを使用する
  • 「ロック解除された」タスクが見つかるまで、各ワーカースレッドをキューから順番にループさせます

(未テスト)のようなもの:

abstract class Task
{
    public readonly Object SyncObject = new Object();
}

List<Task> taskList = new List<Task>();

void TakeItemThreadedMethod()
{
    Task task = null;
    bool found = false;

    try
    {
        // loop until found an task whose SyncObject is free
        while (!found)
        {
            lock (taskList)
            {
                for (int i = 0; i < taskList.Count; i++)
                {
                    object syncObj = taskList[i].SyncObject;
                    if (found = Monitor.TryEnter(syncObj))
                    {
                        for (int x = 0; x < taskList.Count; x++)
                        {
                            if (Object.ReferenceEquals(
                                    syncObj, taskList[x].SyncObject))
                            {
                                task = taskList[x];
                                taskList.RemoveAt(x);
                                break;
                            }
                        }
                        break;
                    }
                }
            }
        }

        // process the task...
        DoWork(task);
    }
    finally
    {
        if (found) Monitor.Exit(task.SyncObject);
    }
}

void QueueTask(Task task)
{
    lock (taskList)
    {
        taskList.Add(task);
    }
}
于 2012-11-01T08:37:43.213 に答える
1

Matthew Brindleyによって提案されたQueuedLockクラスを使用しましたが、わずかな変更を加えて、Enter関数をTakeTicketに分割し、どのブロックを入力しました。

これで、キュー全体をブロックすることなく、共有QueueLock内でTakeTicketを使用できます。

変更されたコード:

abstract class Task
{
    public readonly QueuedLock SyncObject = new QueuedLock();
}

Queue<Task> taskQueue = new Queue<Task>();
Object queueLock = new Object();

void TakeItemThreadedMethod()
{
    Task task;
    int ticket;
    lock(queueLock) 
    {
        task = taskQueue.Dequeue();
        ticket = task.SyncObject.TakeTicket();
    }
    task.SyncObject.Enter(ticket);
    //Do some work here
    task.SyncObject.Exit();
}
于 2012-11-01T08:28:52.090 に答える