5

を受信するまで待機状態になるようにいくつかのスレッドを設定しようとしていますpthread_cond_broadcast()

ジョブの完了後、スレッドを待機状態に戻してほしい。

また、 を呼び出したプロセスが、pthread_cond_broadcast()続行する前にすべてのスレッドが待機状態に戻るまで待機するようにします。この場合、ブロードキャストを呼び出すのは main() 関数です。pthread_cond_wait()ブロードキャストを呼び出した後にmain(0 を実行して、これを実行しようとしています。

void* Work::job(void* id)
{
    int idx = (long)id;

    while(1)
    {
        pthread_mutex_lock(&job_lock);

        while(!jobs_complete)
        {
            // wait for main to broadcast
            pthread_cond_wait(&can_work, &job_lock);
            pthread_mutex_unlock(&job_lock);

            // work here

            pthread_mutex_lock(&job_lock);
            ++jobs_completed;

            if(jobs_completed == NUM_THREADS)
            {
                jobs_complete = true;
                pthread_cond_signal(&jobs_done);
                pthread_mutex_unlock(&job_lock);
            }
            pthread_mutex_unlock(&job_lock);
        }

        pthread_mutex_unlock(&job_lock);
    }

    return NULL;
}

NUM_THREADSは 4、job_lockpthread_mutex_tcan_workjobs_donepthread_cond_tjobs_completedbooljobs_completeintです。

// work

jobs_completed = false;
jobs_complete = 0;
pthread_mutex_lock(&job_lock);
pthread_cond_broadcast(&can_work);
pthread_cond_wait(&jobs_complete);
pthread_mutex_unlock(&job_lock);

// work that depends on jobs_complete

現在、私はこれを呼び出しpthread_cond_broadcast()pthread_cond_wait()すぐに実行していますが、これはデッドロックのようです。

これをどのように行うべきか、またはどこが間違っているかを誰かが説明できますか? 助けていただければ幸いです。

ありがとう!

4

2 に答える 2

4

はこれを投稿しているだけです (これはほとんどすべての C コードですが、pthreads もそうです。少し余裕を持ってお願いします) 。明らかに、これらのほとんどを適切なクラスなどに適切にカプセル化する必要があります。これがうまくいけば、条件変数、ミューテックス、および述語管理と通知との関係がどのように機能するかがわかります。

お役に立てば幸いです。すてきな一日を。

#include <iostream>
#include <unistd.h>
#include <pthread.h>
using namespace std;

// our global condition variable and mutex
pthread_cond_t cv = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;

// our predicate values.
bool finished = false;
int jobs_waiting = 0;
int jobs_completed = 0;

// our thread proc
static void *worker_proc(void* p)
{
    intptr_t id = (intptr_t)p;  // our id
    size_t n_completed = 0;     // our job completion count

    // always latch prior to eval'ing predicate vars.
    pthread_mutex_lock(&mtx);
    while (!finished)
    {
        // wait for finish or work-waiting predicate
        while (!finished && jobs_waiting == 0)
            pthread_cond_wait(&cv, &mtx);

        // we own the mutex, so we're free to look at, modify
        //  etc. the values(s) that we're using for our predicate
        if (finished)
            break;

        // must be a job_waiting, reduce that number by one, then
        //  unlock the mutex and start our work. Note that we're
        //  changing the predicate (jobs_waiting is part of it) and
        //  we therefore need to let anyone that is monitoring know.
        --jobs_waiting;
        pthread_cond_broadcast(&cv);
        pthread_mutex_unlock(&mtx);

        // DO WORK HERE (this just runs a lame summation)
        for (int i=0,x=0;i<1048576; x += ++i);
        ++n_completed;

        // finished work latch mutex and setup changes
        pthread_mutex_lock(&mtx);
        ++jobs_completed;
        pthread_cond_broadcast(&cv);
    }

    // final report
    cout << id << ": jobs completed = " << n_completed << endl;

    // we always exit owning the mutex, so unlock it now. but
    //  let anyone else know they should be quitting as well.
    pthread_cond_broadcast(&cv);
    pthread_mutex_unlock(&mtx);
    return p;
}

// sets up a batch of work and waits for it to finish.
void run_batch(int num)
{
    pthread_mutex_lock(&mtx);
    jobs_waiting = num;
    jobs_completed = 0;
    pthread_cond_broadcast(&cv);

    // wait or all jobs to complete.
    while (jobs_completed != num)
        pthread_cond_wait(&cv, &mtx);

    // we own this coming out, so let it go.
    pthread_mutex_unlock(&mtx);
}

// main entry point.
int main()
{
    // number of threads in our crew
    static const size_t N = 7;
    pthread_t thrds[N] = {0};

    // startup thread crew.
    intptr_t id = 0;
    for (size_t i=0; i<N; ++i)
        pthread_create(thrds + i, NULL, worker_proc, (void*)(++id));

    // run through batches. each batch is one larger
    //  than the prior batch. this should result in some
    //  interesting job-counts per-thread.
    for (int i=0; i<64; ++i)
        run_batch(i);

    // flag for shutdown state.
    pthread_mutex_lock(&mtx);
    finished = true;
    pthread_cond_broadcast(&cv);
    pthread_mutex_unlock(&mtx);
    for (size_t i=0; i<N; pthread_join(thrds[i++], NULL));

    return 0;
}

サンプル出力 #1

3: jobs completed = 256
6: jobs completed = 282
5: jobs completed = 292
2: jobs completed = 242
1: jobs completed = 339
4: jobs completed = 260
7: jobs completed = 409

サンプル出力 #2

6: jobs completed = 882
1: jobs completed = 210
4: jobs completed = 179
5: jobs completed = 178
2: jobs completed = 187
7: jobs completed = 186
3: jobs completed = 194

サンプル出力 #3

1: jobs completed = 268
6: jobs completed = 559
3: jobs completed = 279
5: jobs completed = 270
2: jobs completed = 164
4: jobs completed = 317
7: jobs completed = 159

固定バッチサイズ

同じコードですが、これを変更します。

for (int i=0; i<64; ++i)
    run_batch(i);

これに:

for (int i=0; i<64; ++i)
    run_batch(N);

これは、おそらくあなたが本当に探しているものにさらに近いでしょう。

サンプル出力 #1

4: jobs completed = 65
2: jobs completed = 63
5: jobs completed = 66
3: jobs completed = 63
1: jobs completed = 64
7: jobs completed = 63
6: jobs completed = 64

サンプル出力 #2

3: jobs completed = 65
5: jobs completed = 62
1: jobs completed = 67
7: jobs completed = 63
2: jobs completed = 65
6: jobs completed = 61
4: jobs completed = 65

サンプル出力 #3

2: jobs completed = 58
4: jobs completed = 61
5: jobs completed = 69
7: jobs completed = 68
3: jobs completed = 61
1: jobs completed = 64
6: jobs completed = 67
于 2013-03-28T09:36:17.323 に答える
1

関数の最後にを 3 回連続して呼び出す可能性pthread_mutex_unlockがあるため、未定義の動作が発生します。実際には、内側の 2 つは必要ありません。jobs_completeである場合true、スレッドはループを終了してロックを解放します。それ以外の場合は、ループしてcan_work条件の待機に必要になります。

また、そこで

 pthread_cond_wait(&jobs_complete);

あなたはおそらく次のことを意味します:

pthread_cond_wait(&jobs_complete,&job_lock);

その上、その関数はではなくpthread_cond_t *と を期待するので、それでもそのコードは明らかに壊れています。pthread_mutex_t *int

条件変数でのシグナルまたはブロードキャストは、変数で既に待機しているスレッドにのみ影響することに注意してください。シグナルは将来の待機のために保持されません。そのため、ブロック中にスレッドがループしてjobs_complete再び待機する場合、作業を再開するには、スレッドに再度シグナルを送信する必要があります。

別のこと: job_completeasint およびjob_completedasのタイプについて言及しboolていますが、コードは一致していないようです:

        if(jobs_completed == NUM_THREADS)
        {
            jobs_complete = true;

私のアドバイスは次のとおりです。セマフォとバリアの抽象モデルについて学び、可能であれば既存の実装 (boostまたはstdC++11) を使用するか、pthreadAPI を使用して再実装してください。これらは、cond 変数を操作するよりもはるかに簡単に状況を処理するのに役立ちます。既存のソリューションについては、まさにこの Web サイトを参照してください。たとえば、この質問は非常によく似た問題を扱っており、私が提供したソリューションは、要件に合わせて pthread API を使用するように簡単に変更できます。

于 2013-03-28T07:48:12.100 に答える