1

スレッドが相互にシグナルを送信できるようにする方法に頭を悩ませようとしているのが最も難しいです。

私のデザイン:

main 関数は、他の多数のワーカー スレッドを調整する単一のマスター スレッドを作成します。ワーカー スレッドはメインでプログラムされた間隔で生成および終了するため、メイン関数もワーカーを作成します。マスター スレッドは、これらのワーカー スレッドにシグナルを送信し、それらすべてを signal_broadcast できる必要があります。また、ワーカー スレッドがマスターにシグナルを送り返す (pthread_cond_signal) 必要があります。各スレッドには pthread_mutex と pthread_cond が必要なので、これらの変数を使用して Worker クラスと Master クラスを作成しました。今、これが私が立ち往生しているところです。C++ では、メンバー関数を pthread_create(...) ハンドラーとして渡すことができないため、内部で静的ハンドラーを作成し、それ自体へのポインターを渡して、クラス データを使用するために reinterpret_cast する必要がありました...

void Worker::start() {
pthread_create(&thread, NULL, &Worker::run, this);
}

void* Worker::run(void *ptr) {
Worker* data = reinterpret_cast<Worker*>(ptr);
}

このおそらく間違ったセットアップの問題は、ワーカー ポインターの配列をマスター スレッドに渡したときに、キャストが何らかのコピーを行ったと思われるため、ワーカーの別の参照を通知することです。それで、 static_cast と同じ動作を試しました。

マスターとワーカーが互いに pthread_cond_wait(...) と pthread_cond_signal(...) できるような設計が必要です。

編集 1

追加した:

private:
    Worker(const Worker&);

まだ動作していません。

4

2 に答える 2

2

編集すべてのバージョンでの潜在的なレースを修正しました:

1./1 b C++0xで概説されているように、(mutex+条件+カウンター) から構築されたセマフォを使用します セマフォはありませんか? スレッドを同期する方法は?
2.「リバース」待機を使用して、目的のワーカーによってシグナルが確認されたことを確認します

<thread>C++11 スタイルを使用してこれを実現することをお勧めします<condition_variable>

2 つ (半) のデモンストレーションがあります。彼らはそれぞれ、10 個のワーカーを駆動する 1 つのマスターがあると想定しています。各ワーカーは、作業を行う前にシグナルを待ちます。

std::condition_variable( a と連携して動作する) を使用std::mutexして、シグナリングを行います。最初のバージョンと 2 番目のバージョンの違いは、シグナリングが行われる方法です。

  • 1. ワーカーに 1 つずつ通知する:
  • 1b . _ ワーカー構造体を使用
  • 2. すべてのスレッドに通知し、どの受信側ワーカーが応答するかを調整します

1. ワーカーに 1 つずつ通知する:

調整はほとんど行われないため、これが最も簡単です。

#include <vector>
#include <thread>
#include <mutex>
#include <algorithm>
#include <iostream>
#include <condition_variable>

using namespace std;

class semaphore 
{ // see https://stackoverflow.com/questions/4792449/c0x-has-no-semaphores-how-to-synchronize-threads
    std::mutex mx;
    std::condition_variable cv;
    unsigned long count;
public:
    semaphore() : count() {} 
    void notify();
    void wait();
};

static void run(int id, struct master& m);

struct master
{
    mutable semaphore sem;

    master()
    {
        for (int i = 0; i<10; ++i)
            threads.emplace_back(run, i, ref(*this));
    }

    ~master() {
        for(auto& th : threads) if (th.joinable()) th.join(); 
        std::cout << "done\n";
    }

    void drive()
    {
        // do wakeups
        for (unsigned i = 0; i<threads.size(); ++i)
        {
            this_thread::sleep_for(chrono::milliseconds(rand()%100));
            sem.notify();
        }
    }

  private:
    vector<thread> threads;
};

static void run(int id, master& m)
{
    m.sem.wait();
    {
        static mutex io_mx;
        lock_guard<mutex> lk(io_mx);
        cout << "signaled: " << id << "\n";
    }
}

int main()
{
    master instance;
    instance.drive();
}

/// semaphore members
void semaphore::notify()
{
    lock_guard<mutex> lk(mx);
    ++count;
    cv.notify_one();
}

void semaphore::wait()
{
    unique_lock<mutex> lk(mx);
    while(!count)
        cv.wait(lk);
    --count;
}

1b . _ ワーカー構造体を使用

非静的メンバー関数を持つworkerクラスがある場合は、わずかな変更で同じことができることに注意してください。worker::run

struct worker
{
    worker(int id) : id(id) {}

    void run(master& m) const;

    int id;
};

// ...
struct master
{
    // ...

    master()
    {
        for (int i = 0; i<10; ++i)
            workers.emplace_back(i);

        for (auto& w: workers)
            threads.emplace_back(&worker::run, ref(w), ref(*this));
    }

// ...

void worker::run(master& m) const
{
    m.sem.wait();
    {
        static mutex io_mx;
        lock_guard<mutex> lk(io_mx);
        cout << "signaled: " << id << "\n";
    }
}

警告

  • cv.wait()条件変数が実際には発生しなかった偽のウェイクアップが発生する可能性がありました (たとえば、OS シグナル ハンドラーの場合)。これは、どのプラットフォームでも条件変数でよく起こることです。

次のアプローチでこれを修正します。

2. すべてのスレッドに通知し、どの受信者ワーカーを調整するか

フラグを使用して、どのスレッドがシグナルを受信することを意図しているかを通知します。

struct master
{
    mutable mutex mx;
    mutable condition_variable cv;
    int signaled_id;               // ADDED

    master() : signaled_id(-1)
    {

driverもっと面白くなって、特定の (ランダムな...) 順序ですべてのワーカーにシグナルを送りたいとしましょう:

    void drive()
    {
        // generate random wakeup order
        vector<int> wakeups(10);
        iota(begin(wakeups), end(wakeups), 0);
        random_shuffle(begin(wakeups), end(wakeups));

        // do wakeups
        for (int id : wakeups)
        {
            this_thread::sleep_for(chrono::milliseconds(rand()%1000));
            signal(id);
        }
    }

  private:
    void signal(int id)                // ADDED id
    {
        unique_lock<mutex> lk(mx);

        std::cout << "signaling " << id << "\n";

        signaled_id = id;              // ADDED put it in the shared field
        cv.notify_all();

        cv.wait(lk, [&] { return signaled_id == -1; });
    }

あとは、受信スレッドが id が一致することを確認するだけです。

m.cv.wait(lk, [&] { return m.signaled_id == id; });
m.signaled_id = -1;
m.cv.notify_all();

これにより、誤ったウェイクアップがなくなります。

完全なコード リスト/ライブ デモ:

于 2013-07-20T00:02:26.520 に答える
0

正確な状況は明らかではありませんが、コンテナを使用して で作成された「ワーカー」インスタンスを保持し、mainそれらを「マスター」に渡しているようです。このような場合は、いくつかの救済策があります。実装に適したものを選択する必要があります。

  • コンテナへの参照をmainマスターに渡します。
  • ワーカーへの (スマート) ポインターを保持するようにコンテナーを変更します。
  • コンテナを「マスター」自体の一部にして、渡す必要がないようにします。
  • Worker クラスに適切なデストラクタ、コピー コンストラクタ、および代入演算子を実装します (つまり、Rule of Threeに従います)。

技術的に言えば、pthread_create()は C API であるため、渡される関数ポインタには C リンケージ ( extern "C") が必要です。C++ クラスのメソッドに C リンケージを持たせることはできないため、外部関数を定義する必要があります。

extern "C" { static void * worker_run (void *arg); }

class Worker { //...
};

static void * worker_run (void *arg) {
    return Worker::run(arg);
}
于 2013-07-19T22:43:01.240 に答える