79

私が取り組んでいるプロジェクトでは、複数のスレッドを使用してファイルのコレクションを処理しています。各スレッドは処理されるファイルのリストにファイルを追加できるので、私は(私が思っていたように)スレッドセーフなキューをまとめました。関連する部分は次のとおりです。

// qMutex is a std::mutex intended to guard the queue
// populatedNotifier is a std::condition_variable intended to
//                   notify waiting threads of a new item in the queue

void FileQueue::enqueue(std::string&& filename)
{
    std::lock_guard<std::mutex> lock(qMutex);
    q.push(std::move(filename));

    // Notify anyone waiting for additional files that more have arrived
    populatedNotifier.notify_one();
}

std::string FileQueue::dequeue(const std::chrono::milliseconds& timeout)
{
    std::unique_lock<std::mutex> lock(qMutex);
    if (q.empty()) {
        if (populatedNotifier.wait_for(lock, timeout) == std::cv_status::no_timeout) {
            std::string ret = q.front();
            q.pop();
            return ret;
        }
        else {
            return std::string();
        }
    }
    else {
        std::string ret = q.front();
        q.pop();
        return ret;
    }
}

ただし、if (...wait_for(lock, timeout) == std::cv_status::no_timeout) { }ブロック内でセグフォールトが発生することがあります。gdbで検査すると、キューが空であるためにセグフォールトが発生していることがわかります。これはどのように可能ですか?通知されたときにwait_forのみ戻るというのが私の理解でした。これは、新しいアイテムをキューにプッシュした後にのみ発生するはずです。cv_status::no_timeoutFileQueue::enqueue

4

8 に答える 8

74

条件(条件変数によって監視される)をwhileループの逆条件にするのが最善です: while(!some_condition)。このループ内では、条件が失敗するとスリープ状態になり、ループの本体がトリガーされます。

このように、スレッドが目覚めた場合(おそらく誤って)、ループは続行する前に状態をチェックします。条件を対象の状態と考え、条件変数を、この状態の準備ができている可能性があるというシステムからのシグナルの詳細と考えてください。ループは、それが真実であることを実際に確認し、そうでない場合はスリープ状態になるという重労働を行います。

非同期キューのテンプレートを作成しました。これがお役に立てば幸いです。これは、私q.empty()たちが望むものの逆の条件です。キューに何かが含まれているためです。したがって、whileループのチェックとして機能します。

#ifndef SAFE_QUEUE
#define SAFE_QUEUE

#include <queue>
#include <mutex>
#include <condition_variable>

// A threadsafe-queue.
template <class T>
class SafeQueue
{
public:
  SafeQueue(void)
    : q()
    , m()
    , c()
  {}

  ~SafeQueue(void)
  {}

  // Add an element to the queue.
  void enqueue(T t)
  {
    std::lock_guard<std::mutex> lock(m);
    q.push(t);
    c.notify_one();
  }

  // Get the "front"-element.
  // If the queue is empty, wait till a element is avaiable.
  T dequeue(void)
  {
    std::unique_lock<std::mutex> lock(m);
    while(q.empty())
    {
      // release lock as long as the wait and reaquire it afterwards.
      c.wait(lock);
    }
    T val = q.front();
    q.pop();
    return val;
  }

private:
  std::queue<T> q;
  mutable std::mutex m;
  std::condition_variable c;
};
#endif
于 2013-04-18T06:00:48.270 に答える
35

規格によれcondition_variablesば、イベントが発生していなくても、スプリアスにウェイクアップすることが許可されています。誤ってウェイクアップした場合は、cv_status::no_timeout通知されていなくても(タイムアウトではなくウェイクアップしたため)戻ります。もちろん、これに対する正しい解決策は、続行する前にウェイクアップが実際に合法であったかどうかを確認することです。

詳細は、標準§30.5.1[thread.condition.condvar]で指定されています。

-この関数は、notify_one()の呼び出し、notify_all()の呼び出し、abs_timeで指定された絶対タイムアウト(30.2.4)の満了、または誤って通知された場合にブロックを解除します。

..。

戻り値: abs_timeで指定された絶対タイムアウト(30.2.4)が期限切れの場合はcv_status :: timeout、それ以外の場合はcv_status::no_timeout。

于 2013-03-07T18:01:33.463 に答える
18

これはおそらくあなたがそれを行うべき方法です:

void push(std::string&& filename)
{
    {
        std::lock_guard<std::mutex> lock(qMutex);

        q.push(std::move(filename));
    }

    populatedNotifier.notify_one();
}

bool try_pop(std::string& filename, std::chrono::milliseconds timeout)
{
    std::unique_lock<std::mutex> lock(qMutex);

    if(!populatedNotifier.wait_for(lock, timeout, [this] { return !q.empty(); }))
        return false;

    filename = std::move(q.front());
    q.pop();

    return true;    
}
于 2013-03-07T18:08:37.297 に答える
13

受け入れられた答えに加えて、正しいマルチプロデューサー/マルチコンシューマーキューを実装することは難しいと思います(ただし、C ++ 11以降は簡単です)

(非常に優れた)ロックフリーブーストライブラリを試してみることをお勧めします。「キュー」構造は、待機なし/ロックなしの保証があり、C++11コンパイラを必要とせずに必要なことを実行します。

ロックフリーライブラリは非常に新しいため、この回答を追加します(1.53以降)

于 2014-05-20T21:09:36.017 に答える
5

デキュー関数を次のように書き直します。

std::string FileQueue::dequeue(const std::chrono::milliseconds& timeout)
{
    std::unique_lock<std::mutex> lock(qMutex);
    while(q.empty()) {
        if (populatedNotifier.wait_for(lock, timeout) == std::cv_status::timeout ) 
           return std::string();
    }
    std::string ret = q.front();
    q.pop();
    return ret;
}

それは短く、あなたのように重複したコードはありません。発行するだけで、そのタイムアウトより長く待機する可能性があります。ループの前に開始時刻を覚えておく必要がないようにするには、タイムアウトを確認し、それに応じて待機時間を調整します。または、待機条件の絶対時間を指定します。

于 2013-03-07T18:10:11.570 に答える
1

この場合のGLibソリューションもあります。まだ試していませんが、良いソリューションだと思います。 https://developer.gnome.org/glib/2.36/glib-Asynchronous-Queues.html#g-async-queue-new

于 2015-02-17T19:40:57.303 に答える
1

BlockingCollectionは、キュー、スタック、および優先度コンテナーのサポートを提供するC++11スレッドセーフコレクションクラスです。説明した「空の」キューシナリオを処理します。「フル」キューと同様に。

于 2018-10-15T17:01:45.283 に答える
0

あなたはlfqueue、https: //github.com/Taymindis/lfqueueが好きかもしれません。ロックフリーの並行キューです。私は現在、複数の着信コールからキューを消費するためにそれを使用しており、魅力のように機能します。

于 2018-07-17T23:48:58.093 に答える