編集すべてのバージョンでの潜在的なレースを修正しました:
1./1 b C++0xで概説されているように、(mutex+条件+カウンター) から構築されたセマフォを使用します セマフォはありませんか? スレッドを同期する方法は?
2.「リバース」待機を使用して、目的のワーカーによってシグナルが確認されたことを確認します
<thread>
C++11 スタイルを使用してこれを実現することをお勧めします<condition_variable>
。
2 つ (半) のデモンストレーションがあります。彼らはそれぞれ、10 個のワーカーを駆動する 1 つのマスターがあると想定しています。各ワーカーは、作業を行う前にシグナルを待ちます。
std::condition_variable
( a と連携して動作する) を使用std::mutex
して、シグナリングを行います。最初のバージョンと 2 番目のバージョンの違いは、シグナリングが行われる方法です。
- 1. ワーカーに 1 つずつ通知する:
- 1b . _ ワーカー構造体を使用
- 2. すべてのスレッドに通知し、どの受信側ワーカーが応答するかを調整します
1. ワーカーに 1 つずつ通知する:
調整はほとんど行われないため、これが最も簡単です。
#include <vector>
#include <thread>
#include <mutex>
#include <algorithm>
#include <iostream>
#include <condition_variable>
using namespace std;
class semaphore
{ // see https://stackoverflow.com/questions/4792449/c0x-has-no-semaphores-how-to-synchronize-threads
std::mutex mx;
std::condition_variable cv;
unsigned long count;
public:
semaphore() : count() {}
void notify();
void wait();
};
static void run(int id, struct master& m);
struct master
{
mutable semaphore sem;
master()
{
for (int i = 0; i<10; ++i)
threads.emplace_back(run, i, ref(*this));
}
~master() {
for(auto& th : threads) if (th.joinable()) th.join();
std::cout << "done\n";
}
void drive()
{
// do wakeups
for (unsigned i = 0; i<threads.size(); ++i)
{
this_thread::sleep_for(chrono::milliseconds(rand()%100));
sem.notify();
}
}
private:
vector<thread> threads;
};
static void run(int id, master& m)
{
m.sem.wait();
{
static mutex io_mx;
lock_guard<mutex> lk(io_mx);
cout << "signaled: " << id << "\n";
}
}
int main()
{
master instance;
instance.drive();
}
/// semaphore members
void semaphore::notify()
{
lock_guard<mutex> lk(mx);
++count;
cv.notify_one();
}
void semaphore::wait()
{
unique_lock<mutex> lk(mx);
while(!count)
cv.wait(lk);
--count;
}
1b . _ ワーカー構造体を使用
非静的メンバー関数を持つworker
クラスがある場合は、わずかな変更で同じことができることに注意してください。worker::run
struct worker
{
worker(int id) : id(id) {}
void run(master& m) const;
int id;
};
// ...
struct master
{
// ...
master()
{
for (int i = 0; i<10; ++i)
workers.emplace_back(i);
for (auto& w: workers)
threads.emplace_back(&worker::run, ref(w), ref(*this));
}
// ...
void worker::run(master& m) const
{
m.sem.wait();
{
static mutex io_mx;
lock_guard<mutex> lk(io_mx);
cout << "signaled: " << id << "\n";
}
}
警告
cv.wait()
条件変数が実際には発生しなかった偽のウェイクアップが発生する可能性がありました (たとえば、OS シグナル ハンドラーの場合)。これは、どのプラットフォームでも条件変数でよく起こることです。
次のアプローチでこれを修正します。
2. すべてのスレッドに通知し、どの受信者ワーカーを調整するか
フラグを使用して、どのスレッドがシグナルを受信することを意図しているかを通知します。
struct master
{
mutable mutex mx;
mutable condition_variable cv;
int signaled_id; // ADDED
master() : signaled_id(-1)
{
driver
もっと面白くなって、特定の (ランダムな...) 順序ですべてのワーカーにシグナルを送りたいとしましょう:
void drive()
{
// generate random wakeup order
vector<int> wakeups(10);
iota(begin(wakeups), end(wakeups), 0);
random_shuffle(begin(wakeups), end(wakeups));
// do wakeups
for (int id : wakeups)
{
this_thread::sleep_for(chrono::milliseconds(rand()%1000));
signal(id);
}
}
private:
void signal(int id) // ADDED id
{
unique_lock<mutex> lk(mx);
std::cout << "signaling " << id << "\n";
signaled_id = id; // ADDED put it in the shared field
cv.notify_all();
cv.wait(lk, [&] { return signaled_id == -1; });
}
あとは、受信スレッドが id が一致することを確認するだけです。
m.cv.wait(lk, [&] { return m.signaled_id == id; });
m.signaled_id = -1;
m.cv.notify_all();
これにより、誤ったウェイクアップがなくなります。
完全なコード リスト/ライブ デモ: