0

問題:他の質問の 1 つと同様に 、C でプログラムを作成しようとしています。このプログラムを使用すると、可変量のスレッドを使用して 10 個のテキスト ファイルを検索し、最大のプライムを見つけることができます。また、ワーカー スレッドの最大プライム数を読み取ることができる (変更しない) マネージャー スレッドも必要です。マネージャー スレッドは、ワーカー スレッドがそれを読み取って使用できるように、すべてのワーカー スレッドによって検出された最大の素数もポストします。ワーカー スレッドは、ローカルの Largest Prime をグローバル配列 (privateLargestPrime) にポストする必要があります。これを行う前に、ワーカー スレッドが更新するまでマネージャー スレッドがそれを読み取らないようにロックする必要があります。

奇妙な部分:ワーカースレッドがロックを呼び出したいときにプログラムをステップ実行すると、スレッドがロックを呼び出してロックが付与されたマネージャーに切り替わり、ワーカースレッドを飢えさせてループし続けます。私はそれで何が起こっているのか分かりません。この問題について何らかの洞察を得ることができれば、それは大歓迎です。

/*
 * The Reason for Worker Initialization + Manager Initialization is that we need both types of threads to exist at the same time
 * so I just combined them into one loop, although I believe that they could have been created seperatly.
 * Basically just call pthread_Join at the end
 */

    #include <stdio.h>
    #include <stdlib.h>
    #include <math.h>
    #include <pthread.h>
    #include <time.h>
    #include <string.h>
    #include <fileTest.h>

    clock_t Start, End;
    double elapsed = 0;
    pthread_cond_t managerVar;
    pthread_mutex_t mutex;
    unsigned int globalLargestPrime = 0;
    int numThreads = 1;//Number of Threads
    int LINES_PER_THREAD;
    pthread_cond_t *WorkerConditionaVar;
    pthread_cond_t *ManagerConditionaVar;
    unsigned int *privateLocalLargest;//will need to be changed
    int *statusArray;


    FILE *fileOut;
    typedef enum{
      FREE,
      IN_USE
    }lrgstPrm;
    lrgstPrm monLargestPrime;//create enum
    lrgstPrm workerLargestPrime;//create enum

    typedef enum{
      Finished,
      Not_Finished
    }Status;
    Status is_Finished;

    typedef struct threadFields{
      int id;
      int StartPos;//gets seek for worker thread
      int EndPos;
    }tField;



    int ChkPrim(unsigned int n){
      unsigned int i;
      unsigned int root = sqrt(n);
      for(i=2; i<root; i++){
            if(n % i == 0)
              return 0;

         }
      //printf("%d \n", isPrime);
      return 1;
    }

    void *Worker(void *threadStruct){//Create Threads
      struct threadFields *info = threadStruct;
      int index;
      int id = info->id;
      unsigned int currentNum = 0;
      int Seek = info->StartPos;
      unsigned int localLargestPrime = 0;
      char *buffer = malloc(50);
      int isPrime = 0;

        while(Seek<info->EndPos){
        for(index = 0; index < 1000; index++){//Loop 1000 times
        fseek(fileOut,Seek*sizeof(char)*20, SEEK_SET);
        fgets(buffer,20,fileOut);
        Seek++;
        currentNum = atoi(buffer);
        if(currentNum>localLargestPrime && currentNum > 0){
          isPrime = ChkPrim(currentNum);
          if( isPrime == 1)
            localLargestPrime = currentNum;
        }
      }

        //while(monLargestPrime == IN_USE)
        //pthread_cond_wait(&monitor[id], &mutex);//wait untill mutex is unlocked
        //monLargestPrime = IN_USE;
        //Critical Zone
        //printf("Entering Critical Zone My ID: %d\n",id);

        /*Should Lock the Private Largest Prime from any other thread using it*/
        if(pthread_mutex_lock(&mutex) != 0)//Lock
          printf("Failed To Lock");
        while(workerLargestPrime == IN_USE)//Wait untill Workers largest prime is free
          pthread_cond_wait(ManagerConditionaVar, &mutex);
        workerLargestPrime = IN_USE;//Local Largest is in use
        privateLocalLargest[id] = localLargestPrime;//Assign Local Largest to each workers Shared Variable
        workerLargestPrime = FREE;
        pthread_cond_signal(ManagerConditionaVar);//Signal to any waiting thread that wants to touch(read) this workers privateLocalLargest
        pthread_mutex_unlock(&mutex);



        /*
        pthread_mutex_lock(&mutex);
       while(workerLargestPrime == FREE){
         workerLargestPrime = IN_USE;
         //pthread_cond_wait(&managerVar,&mutex);
        */
        if(localLargestPrime < globalLargestPrime)
          localLargestPrime = globalLargestPrime;
        /*
       workerLargestPrime = FREE;
       pthread_mutex_unlock(&mutex);

       // for(index = 0; index < numThreads; index++)
         // if(index != id)
           // pthread_cond_signal(&monitor[id]);//signal all threads that mutex is unlocked
        //monLargestPrime = FREE;
        //printf("Exiting Critical Zone My ID: %d\n",id);
    */
        //pthread_mutex_unlock(&mutex);
       }//End of While
    statusArray[id] = 1;
    void *i = 0;
    return i;
    }

    void *manager(){
        int index, MlocalLargestPrime;


        while(is_Finished==Not_Finished){
        /*Should Lock the Private Largest Prime from any other thread using it*/
            if(pthread_mutex_lock(&mutex) != 0)//Lock
              printf("Failed To Lock");
            while(workerLargestPrime == IN_USE)//Wait untill Workers largest prime is free
              pthread_cond_wait(ManagerConditionaVar, &mutex);
            workerLargestPrime = IN_USE;//Local Largest is in use
            //Critical Zone
            for(index =0; index < numThreads; index++)
                  if(privateLocalLargest[index] > MlocalLargestPrime)
                    MlocalLargestPrime = privateLocalLargest[index];
            //Critical Zone
            workerLargestPrime = FREE;
            pthread_cond_signal(ManagerConditionaVar);//Signal to any waiting thread that wants to touch(read) this workers privateLocalLargest
            pthread_mutex_unlock(&mutex);
    /*
       pthread_mutex_lock(&mutex);
       while(workerLargestPrime == FREE){
         workerLargestPrime = IN_USE;
         globalLargestPrime = MlocalLargestPrime;
         workerLargestPrime = FREE;
         pthread_cond_signal(&managerVar);

       }
       pthread_mutex_unlock(&mutex);
    */
       /*check if workers have finished*/
    for(index = 0; index < numThreads; index++)
      if(statusArray[index] == 0)
        is_Finished = Not_Finished;
    }
        void *i = 0;
        return i;
       }

    int main(){
      //setFile();
      LINES_PER_THREAD = (getLineNum()/numThreads);
      fileOut = fopen("TextFiles/dataBin.txt", "rb");
      Start = clock();
      //pthread_t managerThread;
      pthread_t threads[numThreads];
      pthread_cond_t monitor[numThreads];
      pthread_cond_t managerCon;
      WorkerConditionaVar = monitor;//Global Pointer points to the array created in main
      ManagerConditionaVar = &managerCon;
      unsigned int WorkerSharedVar[numThreads];
      privateLocalLargest = WorkerSharedVar;
      pthread_mutex_init(&mutex, NULL);
      int finishedArr[numThreads];
      statusArray = finishedArr;
      is_Finished = Not_Finished;
      int index;


          /*Worker Initialization + Manager Initialization*/
          pthread_cond_init(&managerCon,NULL);
          /*Worker Thread Struct Initalization*/
          tField *threadFields[numThreads];//sets number of thread structs
          rewind(fileOut);
          for(index = 0; index < numThreads; index++){//run through threads; inizilize the Struct for workers
              pthread_cond_init(&monitor[index], NULL);//Initialize all the conditional variables
              threadFields[index] = malloc(sizeof(tField));
              threadFields[index]->id = index;
              threadFields[index]->StartPos = index*LINES_PER_THREAD;// Get Position for start of block
              threadFields[index]->EndPos = (index+1)*LINES_PER_THREAD-1;// Get Position for end of block
        }
          /*Worker Thread Struct Initalization*/

          for(index = 0; index<numThreads+1; index++)
            if(index == numThreads)//Last Thread is Manager Thread
              pthread_create(&threads[index],NULL,manager,NULL);//Create Manager
            else//Worker Threads
              pthread_create(&threads[index],NULL,Worker,(void *)threadFields[index]);//Pass struct to each worker

          for(index = 0; index<numThreads+1; index++)
            pthread_join(threads[index], NULL);
          /*Worker Initialization + Manager Initialization*/


      /*Destroy the mutexes & conditional signals*/
          for(index = 0; index < numThreads; index++){
             pthread_cond_destroy(&WorkerConditionaVar[index]);
             }
          pthread_cond_destroy(&managerCon);

      pthread_mutex_destroy(&mutex);
    End = clock();
    elapsed = ((double) (End - Start)) / CLOCKS_PER_SEC;

    printf("This is the Time %f\n", elapsed);
    printf("This is the Largest Prime Number: %u", globalLargestPrime);
    return 0;
    }



  [1]: https://stackoverflow.com/questions/13672456/slightly-complicated-thread-synchronization

1 つの方法しか使用しない別の C ソースがあり、10 個のテキスト ファイルから行数を取得するためのものです。これも投稿します (必須ではありません)。

/*
 * fileTest.c
 *
 *  Created on: Dec 8, 2012
 *      Author: kevin
 *
 *  count number of lines
 *  divide by number of threads
 *  get the positions to hand to each thread
 *  to get positions, one needs to get the number of lines per thread,
 *  add number of lines to each: Seek*sizeof(char)*10, SEEK_SET.
 *  and hand out these positions to each thread
 *
 */

#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <pthread.h>
#include <time.h>
#include <string.h>

  FILE *filesIn[10], *fileOut;
  int Seek;

void createText(){
  FILE *fOUT = fopen("data9.txt", "w");
  int i;
  srand(time(NULL));
  for(i=0; i<10000; i++)
  fprintf(fOUT, "%d\n",rand()%9000);
  fclose(fOUT);
}

void setFile(){
  int index;
  Seek = 0;
  char *buffer = malloc(50);
  filesIn[0] = fopen("TextFiles/primes1.txt", "r");//read Text
  filesIn[1] = fopen("TextFiles/primes2.txt", "r");//read Text
  filesIn[2] = fopen("TextFiles/primes3.txt", "r");//read Text
  filesIn[3] = fopen("TextFiles/primes4.txt", "r");//read Text
  filesIn[4] = fopen("TextFiles/primes5.txt", "r");//read Text
  filesIn[5] = fopen("TextFiles/primes6.txt", "r");//read Text
  filesIn[6] = fopen("TextFiles/primes7.txt", "r");//read Text
  filesIn[7] = fopen("TextFiles/primes8.txt", "r");//read Text
  filesIn[8] = fopen("TextFiles/primes9.txt", "r");//read Text
  filesIn[9] = fopen("TextFiles/primes10.txt", "r");//read Text
  fileOut = fopen("TextFiles/dataBin.txt", "wb");//write in bin

for(index = 0; index < 10; index++)//Run through 10 files
  while(!feof(filesIn[index])){

    fscanf(filesIn[index],"%s", buffer);//take line from input
    fseek(fileOut,Seek*sizeof(char)*20, SEEK_SET);
    fputs(buffer,fileOut);//Print line to output file
    Seek++;
  }
  fclose(filesIn[0]);
  fclose(filesIn[1]);
  fclose(filesIn[2]);
  fclose(filesIn[3]);
  fclose(filesIn[4]);
  fclose(filesIn[5]);
  fclose(filesIn[6]);
  fclose(filesIn[7]);
  fclose(filesIn[8]);
  fclose(filesIn[9]);
  fclose(fileOut);
}

void getFile(){
  int Seek = 0;
  int currentSeek = 0;
  int currentNum = 0;
  int localLargestPrime = 0;
  char *buffer = malloc(50);
  fileOut = fopen("TextFiles/dataBin.txt", "rb");
  rewind(fileOut);
  while(!feof(fileOut)){
  fseek(fileOut,Seek*sizeof(char)*20, SEEK_SET);
  fgets(buffer,10,fileOut);
  Seek++;
  currentNum = atoi(buffer);
  if(currentNum>localLargestPrime)
        if(ChkPrim(currentNum) == 1){
          localLargestPrime = currentNum;
          currentSeek = Seek*sizeof(char)*20;
          printf("the current seek is: %d\n", currentSeek);
        }
  }
  printf("This is the largest Prime: %d\n", localLargestPrime);
}

int getLineNum(){
  Seek = 0;
  int index;
  char c;
  filesIn[0] = fopen("TextFiles/primes1.txt", "r");//read Text
  filesIn[1] = fopen("TextFiles/primes2.txt", "r");//read Text
  filesIn[2] = fopen("TextFiles/primes3.txt", "r");//read Text
  filesIn[3] = fopen("TextFiles/primes4.txt", "r");//read Text
  filesIn[4] = fopen("TextFiles/primes5.txt", "r");//read Text
  filesIn[5] = fopen("TextFiles/primes6.txt", "r");//read Text
  filesIn[6] = fopen("TextFiles/primes7.txt", "r");//read Text
  filesIn[7] = fopen("TextFiles/primes8.txt", "r");//read Text
  filesIn[8] = fopen("TextFiles/primes9.txt", "r");//read Text
  filesIn[9] = fopen("TextFiles/primes10.txt", "r");//read Text
  for(index = 0; index < 10; index++)
    while((c = fgetc(filesIn[index])) != EOF)
      if(c == '\n')
        Seek++;

  return Seek;
}

ここにリンクの説明を入力

4

3 に答える 3

0

わかりましたので、条件変数でファンキーなことをしていました(多すぎました!)ので、ここに私の答えを投稿します:

/*
 * The Reason for Worker Initialization + Manager Initialization is that we need both types of threads to exist at the same time
 * so I just combined them into one loop, although I believe that they could have been created seperatly.
 * Basically just call pthread_Join at the end
 */

#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <pthread.h>
#include <time.h>
#include <string.h>
#include "fileTest.h"

clock_t Start, End;
double elapsed = 0;
pthread_cond_t managerVar;
pthread_mutex_t mutex;
unsigned int globalLargestPrime = 0;
int *numThreads;//Number of Threads ptr
int LINES_PER_THREAD;
pthread_cond_t *WorkerConditionaVar;
pthread_cond_t *ManagerConditionaVar;
unsigned int *privateLocalLargest;//will need to be changed
int *statusArray;


FILE *fileOut;
typedef enum{
  FREE,
  IN_USE
}lrgstPrm;
lrgstPrm managerLargestPrime;//create enum
lrgstPrm workerLargestPrime;//create enum

typedef enum{
  Finished,
  Not_Finished
}Status;
Status is_Finished;

typedef struct threadFields{
  int id;
  int StartPos;//gets seek for worker thread
  int EndPos;
}tField;



int ChkPrim(unsigned int n){
  unsigned int i;
  unsigned int root = sqrt(n);
  for(i=2; i<root; i++){
        if(n % i == 0)
          return 0;

     }
  //printf("%d \n", isPrime);
  return 1;
}

void *Worker(void *threadStruct){//Create Threads
  struct threadFields *info = threadStruct;
  int index;
  int id = info->id;
  unsigned int currentNum = 0;
  int Seek = info->StartPos;
  unsigned int localLargestPrime = 0;
  char *buffer = malloc(50);
  int isPrime = 0;

    while(Seek<info->EndPos){
    for(index = 0; index < 1000; index++){//Loop 1000 times
    fseek(fileOut,Seek*sizeof(char)*20, SEEK_SET);
    fgets(buffer,20,fileOut);
    Seek++;
    currentNum = atoi(buffer);
    if(currentNum>localLargestPrime && currentNum > 0){
      isPrime = ChkPrim(currentNum);
      if( isPrime == 1)
        localLargestPrime = currentNum;
    }
  }
/*Here is where I block the manager thread read while I Write*/
    pthread_mutex_lock(&mutex);
    while(workerLargestPrime == IN_USE)
      pthread_cond_wait(WorkerConditionaVar,&mutex);
    //Critical Zone
    privateLocalLargest[id] = localLargestPrime;
    //Critical Zone
    pthread_cond_signal(WorkerConditionaVar);
    pthread_mutex_unlock(&mutex);
/*Here is where I block the manager thread read while I Write*/

/*I need to wait here until it is free to read the Managers Shared variable (GlobaLargestPrime)*/
    pthread_mutex_lock(&mutex);
      while(managerLargestPrime == IN_USE)
        pthread_cond_wait(ManagerConditionaVar,&mutex);
      //Critical Zone
    if(localLargestPrime < globalLargestPrime)
      localLargestPrime = globalLargestPrime;
      //Critical Zone
    pthread_cond_signal(ManagerConditionaVar);
    pthread_mutex_unlock(&mutex);
/*I need to wait here until it is free to read the Managers Shared variable (GlobaLargestPrime)*/
   }//End of While
statusArray[id] = 1;
return NULL;
}

void *manager(){
    int index;
    int ManagerLocalLargest = 0;


    while(is_Finished==Not_Finished){
/*I need to wait here until it is free to read the workers Shared variable (PrivateLocalLargest)*/
      pthread_mutex_lock(&mutex);
      while(workerLargestPrime == IN_USE)
        pthread_cond_wait(WorkerConditionaVar,&mutex);
      //Critical Zone
      for(index = 0; index < *numThreads; index++)
        if(privateLocalLargest[index] > ManagerLocalLargest)
          ManagerLocalLargest = privateLocalLargest[index];
      //Critical Zone
      pthread_cond_signal(WorkerConditionaVar);
      pthread_mutex_unlock(&mutex);

/*Here is where I block the worker thread read while I Write*/
      pthread_mutex_lock(&mutex);
            while(managerLargestPrime == IN_USE)
              pthread_cond_wait(ManagerConditionaVar,&mutex);
            //Critical Zone
            for(index = 0; index < *numThreads; index++)
              if(privateLocalLargest[index] > globalLargestPrime)
                globalLargestPrime = privateLocalLargest[index];
            //Critical Zone
            pthread_cond_signal(ManagerConditionaVar);
            pthread_mutex_unlock(&mutex);
/*Here is where I block the worker thread read while I Write*/

   /*check if workers have finished*/
for(index = 0; index < *numThreads; index++){
  is_Finished = Finished;
  if(statusArray[index] != 1){
    is_Finished = Not_Finished;
    }
  }
}
    return NULL;
   }

 int main(int argc, char *argv[]){
  //setFile();
int argument;
switch(argc){

case 1:
  printf("You didn't Type the number of threads you wanted... \n");
  printf("argument format: [# of Threads]\n");
  return -1;
  break;
case 2:
  if(strcmp(argv[1],"--help") == 0){
      printf("argument format: [# of Threads]\n");
      return 0;
  }
  else
    argument = atoi(argv[1]);
  break;
}
  printf("The number of threads is %d\n", argument);
  numThreads = &argument;
  LINES_PER_THREAD = (getLineNum()/(*numThreads));
  fileOut = fopen("TextFiles/dataBin.txt", "rb");
  //pthread_t managerThread;
  pthread_t threads[*numThreads];
  pthread_cond_t monitor[*numThreads];
  pthread_cond_t managerCon;
  WorkerConditionaVar = monitor;//Global Pointer points to the array created in main
  ManagerConditionaVar = &managerCon;
  unsigned int WorkerSharedVar[*numThreads];
  privateLocalLargest = WorkerSharedVar;
  pthread_mutex_init(&mutex, NULL);
  int finishedArr[*numThreads];
  statusArray = finishedArr;
  is_Finished = Not_Finished;
  int index;


      /*Worker Initialization + Manager Initialization*/
      pthread_cond_init(&managerCon,NULL);
      /*Worker Thread Struct Initalization*/
      tField *threadFields[*numThreads];//sets number of thread structs
      rewind(fileOut);
      for(index = 0; index < *numThreads; index++){//run through threads; inizilize the Struct for workers
        privateLocalLargest[index] = 0;
          pthread_cond_init(&monitor[index], NULL);//Initialize all the conditional variables
          threadFields[index] = malloc(sizeof(tField));
          threadFields[index]->id = index;
          threadFields[index]->StartPos = index*LINES_PER_THREAD;// Get Position for start of block
          threadFields[index]->EndPos = (index+1)*LINES_PER_THREAD-1;// Get Position for end of block
    }
      /*Worker Thread Struct Initalization*/
      Start = clock();
      for(index = 0; index<*numThreads+1; index++)
        if(index == *numThreads)//Last Thread is Manager Thread
          pthread_create(&threads[index],NULL,manager,NULL);//Create Manager
        else//Worker Threads
          pthread_create(&threads[index],NULL,Worker,(void *)threadFields[index]);//Pass struct to each worker

      for(index = 0; index<*numThreads+1; index++)
        pthread_join(threads[index], NULL);
      /*Worker Initialization + Manager Initialization*/


  /*Destroy the mutexes & conditional signals*/
      for(index = 0; index < *numThreads; index++){
         pthread_cond_destroy(&WorkerConditionaVar[index]);
         }
      pthread_cond_destroy(&managerCon);

  pthread_mutex_destroy(&mutex);
End = clock();
elapsed = ((double) (End - Start)) / CLOCKS_PER_SEC;

printf("This is the Time %f\n", elapsed);
printf("This is the Largest Prime Number: %u\n", globalLargestPrime);
return 0;
}

また、スレッド数と条件変数をハードコーディングする必要があるという問題を解決しました。これで、パラメーターとして入力するだけで済みます。すべてのサポートに感謝します。

PS。2 つのスレッドを使用してもプロセスが高速化されないことに気付きました (そうなると思います)。私の PC はデュアル コアです。Mutex ロックとすべてのブロッキングが原因である可能性があります。また、スレッドが多いほど、データの処理に時間がかかることに気付きました...誰かがこれを見て、洞察を得ることができる場合は、私に連絡するか、コメントを書いてください。ありがとう(他のcファイルは同じままでした)。

于 2012-12-13T19:16:16.297 に答える
0

あなたの問題を考えると、ロックなしでできるし、そうすべきだと思います。

グローバル配列を使用して、ワーカー スレッドからマネージャー スレッドを更新します。各スレッドのワーカーは個別の配列インデックスに書き込むため、配列インデックスごとにライターは 1 つだけです。メインスレッドは、同じ配列からの読み取りを続けることができます。

これまでに見つかった最大の素数に 1 つのグローバル変数を使用します (すべてのスレッドで共有)。この変数では、メイン スレッドが唯一のライターであり、ワーカー スレッドはすべてリーダーです。

唯一の変数であるため、一貫性は問題になりません。一緒に更新する必要がある変数がさらにある場合は、ロックの取得について心配する必要があります。

お役に立てれば。

于 2012-12-12T06:28:34.420 に答える
0

へのアクセスの同期をやりすぎているようですglobalLargestPrime。しかし、各スレッドの値をマネージャーに伝えるためのより良い方法があるかもしれないということを修正しようとする代わりに、スレッド関数が見つけた値unsigned intを a へのキャストとして返すようにするだけvoid*です。pthread_join()次に、マネージャーは、各スレッドが終了するのを待つだけで、これらの値を収集できます。

次の擬似コードのようなもの:

void *Worker(void *threadStruct)
{
    unsigned int largest_prime;

    // do whatever you need to do to find the largest prime in the set of numbers
    //  this thread has to deal with
    //
    // Note that nothing here should require synchronization, since the data should be
    //  completely independent of other threads

    return (void*) largest_prime;
}

 void *manager()
 {
    unsigned int largest_prime = 0;

    // do whatever to spin up the threads and keep track of them in a
    //  pthread_t[] array...

    // now wait for the threads to finish up and keep deal with the value
    //  each thread has found:
    for each (pthread* p in the pthread_t[]) { // remember - pseudo code
        void* result = 0;

        // get the result that thread found
        pthread_join( p, &result);
        unsigned int thread_prime = (unsigned int) result;

        if (largest_prime < thread_prime) {
            largest_prime = thread_prime;
        }
    }

    printf("largest prime: %u\n", largest_prime);
 }

これで、面倒な同期はすべて によって処理されpthread_join()ます。

于 2012-12-11T23:00:11.490 に答える