1

I want all threads to read from the same structure. I did it in the past by adding threads inside the loop which reads from the structure but this time I need the structure to be opened inside void "dowork" as my example shows.

I have the following code:

struct word_list {
    char word[20];
    struct word_list * next;
};

struct word_list * first_word = NULL;
//other function which loads into struct is missing cause it's not relevant


//in main()
pthread_t thread_id[MAX_THREADS];
int max_thread = 10;
for(t = 0 ; t < max_thread; t++)
{
    pthread_mutex_lock(&thrd_list);
    arg_struct *args = calloc(1, sizeof(*args));
    args->file = file;
    args->t = t;
    args->e = ex;
    pthread_mutex_unlock(&thrd_list);
    if(pthread_create(&thread_id[t],NULL,dowork,args) != 0)
    {
        t--;
        fprintf(stderr,RED "\nError in creating thread\n" NONE);
    }
}

for(t = 0 ; t < max_thread; t++)
    if(pthread_join(thread_id[t],NULL) != 0)
    {
        fprintf(stderr,RED "\nError in joining thread\n" NONE);
    }




void *dowork(void *arguments)
{
    struct word_list * curr_word = first_word;
    char myword[20];
    while( curr_word != NULL )
    {
        pthread_mutex_lock(&thrd_list);
        strncpy(myword,curr_word->word,sizeof(myword) - 1);
        pthread_mutex_unlock(&thrd_list);

        //some irrelevant code is missing

        pthread_mutex_lock(&thrd_list);
        curr_word = curr_word->next;
        pthread_mutex_unlock(&thrd_list);
    }

}

How can I read different elements from the same structure in all threads?

4

3 に答える 3

2

私が今あなたの要件を理解できたら (そして最終的に理解できたと思います)、単語リストを作業キューとして扱う必要があります。これを行うには、アイテムの「プッシャー」がキューに入れ、新しいデータが利用可能であることを「プラー」に通知できるようにする通知メカニズムが必要です。そのようなシステムは pthreads に存在します:条件変数ミューテックス、およびそれらが制御フローのために管理する述語の結合です。

使い方の一例です。各ステップで何が起こっているかを文書化しようとしましたが、理解していただければ幸いです。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>

// defined the number of threads in our queue and the number
//  of test items for this demonstration.
#define MAX_THREADS  16
#define MAX_ITEMS    128*1024

typedef struct word_list
{
    char word[20];
    struct word_list * next;

} word_list;

// predicate values for the word list
struct word_list * first_word = NULL;   // current word.
int word_shutdown = 0;                  // shutdown state

// used for protecting our list.
pthread_mutex_t wq_mtx = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t wq_cv = PTHREAD_COND_INITIALIZER;

// worker proc
void *dowork(void*);

int main()
{
    pthread_t thread_id[MAX_THREADS];
    int i=0;

    // start thread pool
    for(i=0; i < MAX_THREADS; ++i)
        pthread_create(thread_id+i, NULL, dowork, NULL);

    // add MAX_ITEMS more entries, we need to latch since the
    //  work threads are actively processing the queue as we go.
    for (i=0;i<MAX_ITEMS;++i)
    {
        word_list *node = malloc(sizeof(*node));
        sprintf(node->word, "Word-%d", i);

        // latch before updating the queue head.
        pthread_mutex_lock(&wq_mtx);
        node->next = first_word;
        first_word = node;

        // no longer need the latch. unlock and inform any
        // potential waiter.
        pthread_mutex_unlock(&wq_mtx);
        pthread_cond_signal(&wq_cv);
    }

    // wait for the condition that the queue is empty
    pthread_mutex_lock(&wq_mtx);
    while (first_word != NULL)
        pthread_cond_wait(&wq_cv, &wq_mtx);
    pthread_mutex_unlock(&wq_mtx);

    // queue is empty, but threads are all still there waiting. So
    //  do it again, just to proves the pool is still intact.
    for (i=0;i<MAX_ITEMS;++i)
    {
        word_list *node = malloc(sizeof(*node));
        sprintf(node->word, "Word-%d", i);

        // latch before updating the queue head.
        pthread_mutex_lock(&wq_mtx);
        node->next = first_word;
        first_word = node;

        // no longer need the latch. unlock and inform any
        // potential waiter.
        pthread_mutex_unlock(&wq_mtx);
        pthread_cond_signal(&wq_cv);
    }

    // again wait for the condition that the queue is empty
    pthread_mutex_lock(&wq_mtx);
    while (first_word != NULL)
        pthread_cond_wait(&wq_cv, &wq_mtx);

    // queue is empty, and we're not adding anything else. latch
    //  the mutex, set the shutdown flag, and tell all the threads.
    //  they need to terminate.
    word_shutdown = 1;
    pthread_mutex_unlock(&wq_mtx);
    pthread_cond_broadcast(&wq_cv);

    for (i=0;i<MAX_THREADS; ++i)
        pthread_join(thread_id[i], NULL);

    return EXIT_SUCCESS;
}


// the work crew will start by locking the mutex, then entering the
//  work loop, looking for entries or a shutdown state
void *dowork(void *arguments)
{
    int n_processed = 0;
    while (1)
    {
        pthread_mutex_lock(&wq_mtx);
        while (first_word == NULL && word_shutdown == 0)
            pthread_cond_wait(&wq_cv, &wq_mtx);

        // we own the mutex, and thus current access to the predicate
        //  values it protects.
        if (first_word != NULL)
        {
            // pull the item off the queue. once we do that we own the
            //  item, so we can unlatch and let another waiter know there
            //  may be more data on the queue.
            word_list *p = first_word;
            first_word = p->next;
            if (p->next)
                pthread_cond_signal(&wq_cv);
            pthread_mutex_unlock(&wq_mtx);

            //
            // TODO: process item here.
            //
            ++n_processed;
            free(p);
        }
        else if (word_shutdown != 0)
            break;
    }

    // we still own the mutex. report on how many items we received, then
    //  one more signal to let someone (anyone, actually) know we're done.
    pthread_t self = pthread_self();
    printf("%p : processed %d items.\n",self, n_processed);
    pthread_mutex_unlock(&wq_mtx);
    pthread_cond_signal(&wq_cv);
    return NULL;
}

サンプル出力: MAX_THREADS = 4 (出力は異なります)

0x100387000 : processed 64909 items.
0x100304000 : processed 64966 items.
0x1000b5000 : processed 64275 items.
0x100281000 : processed 67994 items.

出力例: MAX_THREADS = 8

0x100304000 : processed 31595 items.
0x1000b5000 : processed 33663 items.
0x100593000 : processed 34298 items.
0x10040a000 : processed 32304 items.
0x10048d000 : processed 32406 items.
0x100387000 : processed 31878 items.
0x100281000 : processed 32317 items.
0x100510000 : processed 33683 items.

サンプル出力: MAX_THREADS = 16

0x10079f000 : processed 17239 items.
0x101081000 : processed 16530 items.
0x101104000 : processed 16662 items.
0x100699000 : processed 16562 items.
0x10040a000 : processed 16672 items.
0x100593000 : processed 15158 items.
0x10120a000 : processed 17365 items.
0x101187000 : processed 14184 items.
0x100387000 : processed 16332 items.
0x100616000 : processed 16497 items.
0x100281000 : processed 16632 items.
0x100304000 : processed 16222 items.
0x100510000 : processed 17188 items.
0x10048d000 : processed 15367 items.
0x1000b5000 : processed 16912 items.
0x10071c000 : processed 16622 items.

そして、完全なグローバル最適化が有効になっているという理由だけで

サンプル出力: MAX_THREADS = 32、MAX_ITEMS = 4194304

0x109c58000 : processed 260000 items.
0x109634000 : processed 263433 items.
0x10973a000 : processed 262125 items.
0x10921c000 : processed 261201 items.
0x108d81000 : processed 262325 items.
0x109a4c000 : processed 262318 items.
0x108f8d000 : processed 263107 items.
0x109010000 : processed 261382 items.
0x109946000 : processed 262299 items.
0x109199000 : processed 261930 items.
0x10929f000 : processed 263506 items.
0x109093000 : processed 262362 items.
0x108e87000 : processed 262069 items.
0x108e04000 : processed 261890 items.
0x109acf000 : processed 261875 items.
0x1097bd000 : processed 262040 items.
0x109840000 : processed 261686 items.
0x1093a5000 : processed 262547 items.
0x109b52000 : processed 261980 items.
0x109428000 : processed 264259 items.
0x108f0a000 : processed 261620 items.
0x1095b1000 : processed 263062 items.
0x1094ab000 : processed 261811 items.
0x1099c9000 : processed 262709 items.
0x109116000 : processed 261628 items.
0x109bd5000 : processed 260905 items.
0x10952e000 : processed 262741 items.
0x1098c3000 : processed 260608 items.
0x109322000 : processed 261970 items.
0x1000b8000 : processed 262061 items.
0x100781000 : processed 262669 items.
0x1096b7000 : processed 262490 items.

うーん。そして、私はこれのいずれにも使用volatileしませんでした. 宝くじを買う時が来たに違いない。

とにかく、pthread、特にミューテックスと条件変数の制御とそれらの相互作用について調査することをお勧めします。これがお役に立てば幸いです。

于 2013-09-13T18:13:06.157 に答える
2

これを正しく理解しているかどうか見てみましょう。

  • struct word_listある種のリンクされたリストを記述します
  • そのリストの要素をスレッド間で広げたいとします。

これが必要な場合は、リストから要素を 1 つずつポップし、残りへのポインターを書き戻します。

volatile struct word_list * first_word = NULL; // important to make it volatile

void *dowork(void *arguments)
{
    struct word_list * curr_word;
    char myword[20];
    do {
        // gain exclusive access to the control structures
        pthread_mutex_lock(&thrd_list);

        // get the next element
        curr_word = first_word;
        if (curr_word == NULL) {
            pthread_mutex_unlock(&thrd_list);
            break;
        }

        // notify the remaining threads what the next element is
        first_word = curr_word->next;

        pthread_mutex_unlock(&thrd_list);

        // do whatever you have to do

    } while (1);

}

volatile struct word_list * next_word変更したくない場合は、追加のグローバルを作成しますfirst_word。そうしないとvolatile、コンパイラが最適化を実行して、奇妙な結果が生じる可能性があります。

于 2013-09-13T15:36:38.393 に答える