2

複数のスレッドがデータを生成し、単一のスレッドがデータを消費するシステムをモデル化しようとしています。秘訣は、すべてのスレッドがプールに存在するため、専用スレッドでデータを消費したくないということです。代わりに、作業があるときにプロデューサーの 1 つがキューを空にし、別のプロデューサーが既にキューをクリアしている場合は降伏するようにします。

基本的な考え方は、作業のキューと、処理の周りのロックがあるということです。各プロデューサーはペイロードをキューにプッシュし、ロックに入ろうとします。この試行は非ブロッキングであり、true (ロックが取得された) または false (ロックが他の誰かによって保持されている) を返します。

ロックが取得されると、そのスレッドはキュー内のすべてのデータを空になるまで処理します (処理中に他のプロデューサーによって導入された新しいペイロードを含む)。すべての作業が処理されると、スレッドはロックを解除して終了します。

以下は、アルゴリズムの C++ コードです。

void Process(ITask *task) {
     // queue is a thread safe implementation of a regular queue
     queue.push(task);

     // crit_sec is some handle to a critical section like object
     // try_scoped_lock uses RAII to attempt to acquire the lock in the constructor
     //                 if the lock was acquired, it will release the lock in the
     //                 destructor
     try_scoped_lock lock(crit_sec);

     // See if this thread won the lottery. Prize is doing all of the dishes
     if (!lock.Acquired())
        return;

     // This thread got the lock, so it needs to do the work
     ITask *currTask;
     while (queue.try_pop(currTask)) {
          ... execute task ...
     }
}

一般的に、このコードは正常に動作し、以下で説明する動作を実際に目撃したことはありませんが、その実装には不安を感じます。スレッドが while ループを終了してからクリティカル セクションを解放するまでの間に競合状態が発生するのは当然のことです。

アルゴリズム全体は、ロックが保持されている場合、スレッドがキューを処理しているという前提に依存しています。

私は本質的に2つの質問についての啓発を探しています:

  1. 説明されているように競合状態があることを訂正しますか (他のレースのボーナス)
  2. このメカニズムを実装するためのパフォーマンスが高く、競合状態を引き起こさない標準パターンはありますか?
4

1 に答える 1

2

はい、競合状態があります。

スレッド A はタスクを追加し、 を取得しlock、自身を処理してから、 にタスクを要求しますqueue。拒否されます。

この時点で、スレッド B は にタスクを追加しますqueue。次に、スレッド A がロックを持っているため、ロックを取得しようとしますが、失敗します。スレッド B が終了します。

その後、スレッド A はqueue空ではなく、誰もそのタスクを処理していない状態で終了します。

ウィンドウが比較的狭いため、これを見つけるのは困難です。見つけやすくするために、whileループの後に「10 秒間のスリープ」を導入します。呼び出しコードで、タスクを挿入し、5 秒待ってから 2 番目のタスクを挿入します。さらに 10 秒後に、両方の挿入タスクが完了していることを確認しますqueue

これを修正する 1 つの方法は、に変更try_popしてtry_pop_or_unlock、それに渡すlockことです。 try_pop_or_unlock次に、空の をアトミックにチェックし、空queueの場合はロックを解除してlockfalse を返します。

もう 1 つのアプローチは、スレッド プールを改善することです。それにカウント セマフォ ベースの「消費」タスク ランチャーを追加します。

semaphore_bool bTaskActive;
counting_semaphore counter;

when (counter || !bTaskActive)
  if (bTaskActive)
    return
  bTaskActive = true
  --counter
  launch_task( process_one_off_queue, when_done( [&]{ bTaskActive=false ) );

カウンティング セマフォがアクティブな場合、または終了した消費タスクによって呼び出された場合、アクティブな消費タスクがない場合は、消費タスクを起動します。

しかし、それは私の頭のてっぺんから外れています。

于 2013-02-28T23:26:53.597 に答える