9

私はマルチスレッドに精通しており、Java と Objective-C で多くのマルチスレッド プログラムを成功裏に開発してきました。しかし、メイン スレッドからの結合を使用しないと、pthreads を使用して C で次のことを達成できませんでした。

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

#define NUM_OF_THREADS 2

struct thread_data {
    int start;
    int end;
    int *arr;
};

void print(int *ints, int n);
void *processArray(void *args);

int main(int argc, const char * argv[])
{
    int numOfInts = 10;
    int *ints = malloc(numOfInts * sizeof(int));
    for (int i = 0; i < numOfInts; i++) {
        ints[i] = i;
    }
    print(ints, numOfInts); // prints [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

    pthread_t threads[NUM_OF_THREADS];
    struct thread_data thread_data[NUM_OF_THREADS];

    // these vars are used to calculate the index ranges for each thread
    int remainingWork = numOfInts, amountOfWork;
    int startRange, endRange = -1;

    for (int i = 0; i < NUM_OF_THREADS; i++) {

        amountOfWork = remainingWork / (NUM_OF_THREADS - i);
        startRange = endRange + 1;
        endRange   = startRange + amountOfWork - 1;

        thread_data[i].arr   = ints;
        thread_data[i].start = startRange;
        thread_data[i].end   = endRange;

        pthread_create(&threads[i], NULL, processArray, (void *)&thread_data[i]);

        remainingWork -= amountOfWork;      
    }

    // 1. Signal to the threads to start working


    // 2. Wait for them to finish


    print(ints, numOfInts); // should print [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

    free(ints);
    return 0;
}

void *processArray(void *args)
{
    struct thread_data *data = (struct thread_data *)args;
    int *arr  = data->arr;
    int start = data->start;
    int end   = data->end;

    // 1. Wait for a signal to start from the main thread


    for (int i = start; i <= end; i++) {
        arr[i] = arr[i] + 1;
    }

    // 2. Signal to the main thread that you're done

    pthread_exit(NULL);
}

void print(int *ints, int n)
{
    printf("[");
    for (int i = 0; i < n; i++) {
        printf("%d", ints[i]);
        if (i+1 != n)
            printf(", ");
    }
    printf("]\n");
}

上記のコードで次のことを実現したいと思います。

main() で:

  1. 作業を開始するようにスレッドに通知します。
  2. バックグラウンド スレッドが終了するまで待ちます。

processArray() では:

  1. メインスレッドからシグナルが開始されるのを待ちます
  2. 完了したことをメインスレッドに通知する

メイン スレッドで結合を使用したくありません。実際のアプリケーションでは、メイン スレッドがスレッドを 1 回作成し、バックグラウンド スレッドに何度も動作するように通知するためです。すべてのバックグラウンド スレッドの処理が終了しない限り、スレッドは続行されます。processArray関数では、次のように無限ループを配置します。

void *processArray(void *args)
{
    struct thread_data *data = (struct thread_data *)args;

    while (1)
    {
      // 1. Wait for a signal to start from the main thread

      int *arr  = data->arr;
      int start = data->start;
      int end   = data->end;          

      // Process
      for (int i = start; i <= end; i++) {
          arr[i] = arr[i] + 1;
      }

      // 2. Signal to the main thread that you're done

    }

    pthread_exit(NULL);
}

私は C と posix API が初めてなので、明らかな何かが欠けていたらすみません。しかし、ミューテックス、セマフォの配列、および両方の混合物を使用することから始めて、実際に多くのことを試しましたが、成功しませんでした。条件変数が役立つと思いますが、どのように使用できるかわかりませんでした。

御時間ありがとうございます。

問題が解決しました:

どうもありがとうございました!あなたのヒントに従って、結合を使用せずに、これを安全に機能させることができました。解決策はやや醜いものですが、仕事は完了し、パフォーマンスの向上はそれだけの価値があります (以下で説明するように)。興味のある方のために説明すると、これは私が取り組んでいる実際のアプリケーションのシミュレーションです。このアプリケーションでは、メイン スレッドがバックグラウンド スレッドに継続的に作業を与え続けています。

 #include <stdio.h>
 #include <stdlib.h>
 #include <pthread.h>

 #define NUM_OF_THREADS 5

 struct thread_data {
     int id;
     int start;
     int end;
     int *arr;
 };

 pthread_mutex_t currentlyIdleMutex = PTHREAD_MUTEX_INITIALIZER;
 pthread_cond_t  currentlyIdleCond  = PTHREAD_COND_INITIALIZER;
 int currentlyIdle;

 pthread_mutex_t workReadyMutex = PTHREAD_MUTEX_INITIALIZER;
 pthread_cond_t  workReadyCond  = PTHREAD_COND_INITIALIZER;
 int workReady;

 pthread_cond_t  currentlyWorkingCond = PTHREAD_COND_INITIALIZER;
 pthread_mutex_t currentlyWorkingMutex= PTHREAD_MUTEX_INITIALIZER;
 int currentlyWorking;

 pthread_mutex_t canFinishMutex = PTHREAD_MUTEX_INITIALIZER;
 pthread_cond_t  canFinishCond  = PTHREAD_COND_INITIALIZER;
 int canFinish;

 void print(int *ints, int n);
 void *processArray(void *args);
 int validateResult(int *ints, int num, int start);

 int main(int argc, const char * argv[])
 {
     int numOfInts = 10;
     int *ints = malloc(numOfInts * sizeof(int));
     for (int i = 0; i < numOfInts; i++) {
         ints[i] = i;
     }
 //   print(ints, numOfInts);

     pthread_t threads[NUM_OF_THREADS];
     struct thread_data thread_data[NUM_OF_THREADS];
     workReady = 0;
     canFinish = 0;
     currentlyIdle = 0;
     currentlyWorking = 0;

     // these vars are used to calculate the index ranges for each thread
     int remainingWork = numOfInts, amountOfWork;
     int startRange, endRange = -1;
     // Create the threads and give each one its data struct.
     for (int i = 0; i < NUM_OF_THREADS; i++) {

         amountOfWork = remainingWork / (NUM_OF_THREADS - i);
         startRange = endRange + 1;
         endRange   = startRange + amountOfWork - 1;

         thread_data[i].id    = i;
         thread_data[i].arr   = ints;
         thread_data[i].start = startRange;
         thread_data[i].end   = endRange;

         pthread_create(&threads[i], NULL, processArray, (void *)&thread_data[i]);
         remainingWork -= amountOfWork;
     }

     int loops = 1111111;
     int expectedStartingValue = ints[0] + loops; // used to validate the results
     // The elements in ints[] should be incremented by 1 in each loop
     while (loops-- != 0) {

         // Make sure all of them are ready
         pthread_mutex_lock(&currentlyIdleMutex);
         while (currentlyIdle != NUM_OF_THREADS) {
             pthread_cond_wait(&currentlyIdleCond, &currentlyIdleMutex);
         }
         pthread_mutex_unlock(&currentlyIdleMutex);

         // All threads are now blocked; it's safe to not lock the mutex.
         // Prevent them from finishing before authorized.
         canFinish = 0;
         // Reset the number of currentlyWorking threads
         currentlyWorking = NUM_OF_THREADS;

         // Signal to the threads to start
         pthread_mutex_lock(&workReadyMutex);
         workReady = 1;
         pthread_cond_broadcast(&workReadyCond );
         pthread_mutex_unlock(&workReadyMutex);      

         // Wait for them to finish
         pthread_mutex_lock(&currentlyWorkingMutex);
         while (currentlyWorking != 0) {
             pthread_cond_wait(&currentlyWorkingCond, &currentlyWorkingMutex);
         }
         pthread_mutex_unlock(&currentlyWorkingMutex);

         // The threads are now waiting for permission to finish
         // Prevent them from starting again
         workReady = 0;
         currentlyIdle = 0;

         // Allow them to finish
         pthread_mutex_lock(&canFinishMutex);
         canFinish = 1;
         pthread_cond_broadcast(&canFinishCond);
         pthread_mutex_unlock(&canFinishMutex);
     }

 //   print(ints, numOfInts);

     if (validateResult(ints, numOfInts, expectedStartingValue)) {
         printf("Result correct.\n");
     }
     else {
         printf("Result invalid.\n");      
     }

     // clean up
     for (int i = 0; i < NUM_OF_THREADS; i++) {
         pthread_cancel(threads[i]);
     }
     free(ints);

     return 0;
 }

 void *processArray(void *args)
 {
     struct thread_data *data = (struct thread_data *)args;
     int *arr  = data->arr;
     int start = data->start;
     int end   = data->end;

     while (1) {

         // Set yourself as idle and signal to the main thread, when all threads are idle main will start
         pthread_mutex_lock(&currentlyIdleMutex);
         currentlyIdle++;
         pthread_cond_signal(&currentlyIdleCond);
         pthread_mutex_unlock(&currentlyIdleMutex);

         // wait for work from main
         pthread_mutex_lock(&workReadyMutex);
         while (!workReady) {
             pthread_cond_wait(&workReadyCond , &workReadyMutex);
         }
         pthread_mutex_unlock(&workReadyMutex);

         // Do the work
         for (int i = start; i <= end; i++) {
             arr[i] = arr[i] + 1;
         }

         // mark yourself as finished and signal to main
         pthread_mutex_lock(&currentlyWorkingMutex);
         currentlyWorking--;
         pthread_cond_signal(&currentlyWorkingCond);
         pthread_mutex_unlock(&currentlyWorkingMutex);

         // Wait for permission to finish
         pthread_mutex_lock(&canFinishMutex);
         while (!canFinish) {
             pthread_cond_wait(&canFinishCond , &canFinishMutex);
         }
         pthread_mutex_unlock(&canFinishMutex);
     }

     pthread_exit(NULL);
 }

 int validateResult(int *ints, int n, int start)
 {
     int tmp = start;
     for (int i = 0; i < n; i++, tmp++) {
         if (ints[i] != tmp) {
             return 0;
         }
     }
     return 1;
 }

 void print(int *ints, int n)
 {
     printf("[");
     for (int i = 0; i < n; i++) {
         printf("%d", ints[i]);
         if (i+1 != n)
             printf(", ");
     }
     printf("]\n");
 }

pthread_cancelクリーンアップに十分かどうかはわかりませんが!障壁に関しては、 @ Jeremyが言及したように、一部の OS に限定されていなければ、非常に役立ちました。

ベンチマーク:

これらの多くの条件が実際にアルゴリズムの速度を低下させていないことを確認したかったので、このベンチマークを設定して 2 つのソリューションを比較しました。

 #include <stdio.h>
 #include <stdlib.h>
 #include <pthread.h>
 #include <unistd.h>
 #include <sys/time.h>
 #include <sys/resource.h>

 #define NUM_OF_THREADS 5
 struct thread_data {
     int start;
     int end;
     int *arr;
 };
 pthread_mutex_t currentlyIdleMutex = PTHREAD_MUTEX_INITIALIZER;
 pthread_cond_t  currentlyIdleCond  = PTHREAD_COND_INITIALIZER;
 int currentlyIdle;
 pthread_mutex_t workReadyMutex = PTHREAD_MUTEX_INITIALIZER;
 pthread_cond_t  workReadyCond  = PTHREAD_COND_INITIALIZER;
 int workReady;
 pthread_cond_t  currentlyWorkingCond = PTHREAD_COND_INITIALIZER;
 pthread_mutex_t currentlyWorkingMutex= PTHREAD_MUTEX_INITIALIZER;
 int currentlyWorking;
 pthread_mutex_t canFinishMutex = PTHREAD_MUTEX_INITIALIZER;
 pthread_cond_t  canFinishCond  = PTHREAD_COND_INITIALIZER;
 int canFinish;

 void *processArrayMutex(void *args);
 void *processArrayJoin(void *args);
 double doItWithMutex(pthread_t *threads, struct thread_data *data, int loops);
 double doItWithJoin(pthread_t *threads, struct thread_data *data, int loops);

 int main(int argc, const char * argv[])
 {
     int numOfInts = 10;
     int *join_ints = malloc(numOfInts * sizeof(int));
     int *mutex_ints = malloc(numOfInts * sizeof(int));
     for (int i = 0; i < numOfInts; i++) {
         join_ints[i] = i;
         mutex_ints[i] = i;
     }

     pthread_t join_threads[NUM_OF_THREADS];
     pthread_t mutex_threads[NUM_OF_THREADS];
     struct thread_data join_thread_data[NUM_OF_THREADS];
     struct thread_data mutex_thread_data[NUM_OF_THREADS];
     workReady = 0;
     canFinish = 0;
     currentlyIdle = 0;
     currentlyWorking = 0;

     int remainingWork = numOfInts, amountOfWork;
     int startRange, endRange = -1;
     for (int i = 0; i < NUM_OF_THREADS; i++) {
         amountOfWork = remainingWork / (NUM_OF_THREADS - i);
         startRange = endRange + 1;
         endRange   = startRange + amountOfWork - 1;

         join_thread_data[i].arr   = join_ints;
         join_thread_data[i].start = startRange;
         join_thread_data[i].end   = endRange;
         mutex_thread_data[i].arr   = mutex_ints;
         mutex_thread_data[i].start = startRange;
         mutex_thread_data[i].end   = endRange;

         pthread_create(&mutex_threads[i], NULL, processArrayMutex, (void *)&mutex_thread_data[i]);
         remainingWork -= amountOfWork;
     }

     int numOfBenchmarkTests = 100;
     int numberOfLoopsPerTest= 1000;

     double join_sum = 0.0, mutex_sum = 0.0;
     for (int i = 0; i < numOfBenchmarkTests; i++)
     {
         double joinTime = doItWithJoin(join_threads, join_thread_data, numberOfLoopsPerTest);
         double mutexTime= doItWithMutex(mutex_threads, mutex_thread_data, numberOfLoopsPerTest);

         join_sum += joinTime;
         mutex_sum+= mutexTime;      
     }

     double join_avg = join_sum / numOfBenchmarkTests;
     double mutex_avg= mutex_sum / numOfBenchmarkTests;

     printf("Join average : %f\n", join_avg);
     printf("Mutex average: %f\n", mutex_avg);

     double diff = join_avg - mutex_avg;
     if (diff > 0.0)
         printf("Mutex is %.0f%% faster.\n", 100 * diff / join_avg);
     else if (diff < 0.0)
         printf("Join  is %.0f%% faster.\n", 100 * diff / mutex_avg);
     else
         printf("Both have the same performance.");

     free(join_ints);
     free(mutex_ints);

     return 0;
 }

 // From https://stackoverflow.com/a/2349941/408286
 double get_time()
 {
     struct timeval t;
     struct timezone tzp;
     gettimeofday(&t, &tzp);
     return t.tv_sec + t.tv_usec*1e-6;
 }

 double doItWithMutex(pthread_t *threads, struct thread_data *data, int num_loops)
 {
     double start = get_time();

     int loops = num_loops;
     while (loops-- != 0) {
         // Make sure all of them are ready
         pthread_mutex_lock(&currentlyIdleMutex);
         while (currentlyIdle != NUM_OF_THREADS) {
             pthread_cond_wait(&currentlyIdleCond, &currentlyIdleMutex);
         }
         pthread_mutex_unlock(&currentlyIdleMutex);

         // All threads are now blocked; it's safe to not lock the mutex.
         // Prevent them from finishing before authorized.
         canFinish = 0;
         // Reset the number of currentlyWorking threads
         currentlyWorking = NUM_OF_THREADS;

         // Signal to the threads to start
         pthread_mutex_lock(&workReadyMutex);
         workReady = 1;
         pthread_cond_broadcast(&workReadyCond );
         pthread_mutex_unlock(&workReadyMutex);

         // Wait for them to finish
         pthread_mutex_lock(&currentlyWorkingMutex);
         while (currentlyWorking != 0) {
             pthread_cond_wait(&currentlyWorkingCond, &currentlyWorkingMutex);
         }
         pthread_mutex_unlock(&currentlyWorkingMutex);

         // The threads are now waiting for permission to finish
         // Prevent them from starting again
         workReady = 0;
         currentlyIdle = 0;

         // Allow them to finish
         pthread_mutex_lock(&canFinishMutex);
         canFinish = 1;
         pthread_cond_broadcast(&canFinishCond);
         pthread_mutex_unlock(&canFinishMutex);
     }

     return get_time() - start;
 }

 double doItWithJoin(pthread_t *threads, struct thread_data *data, int num_loops)
 {
     double start = get_time();

     int loops = num_loops;
     while (loops-- != 0) {
         // create them
         for (int i = 0; i < NUM_OF_THREADS; i++) {
             pthread_create(&threads[i], NULL, processArrayJoin, (void *)&data[i]);
         }
         // wait
         for (int i = 0; i < NUM_OF_THREADS; i++) {
             pthread_join(threads[i], NULL);
         }
     }

     return get_time() - start;
 }

 void *processArrayMutex(void *args)
 {
     struct thread_data *data = (struct thread_data *)args;
     int *arr  = data->arr;
     int start = data->start;
     int end   = data->end;

     while (1) {

         // Set yourself as idle and signal to the main thread, when all threads are idle main will start
         pthread_mutex_lock(&currentlyIdleMutex);
         currentlyIdle++;
         pthread_cond_signal(&currentlyIdleCond);
         pthread_mutex_unlock(&currentlyIdleMutex);

         // wait for work from main
         pthread_mutex_lock(&workReadyMutex);
         while (!workReady) {
             pthread_cond_wait(&workReadyCond , &workReadyMutex);
         }
         pthread_mutex_unlock(&workReadyMutex);

         // Do the work
         for (int i = start; i <= end; i++) {
             arr[i] = arr[i] + 1;
         }

         // mark yourself as finished and signal to main
         pthread_mutex_lock(&currentlyWorkingMutex);
         currentlyWorking--;
         pthread_cond_signal(&currentlyWorkingCond);
         pthread_mutex_unlock(&currentlyWorkingMutex);

         // Wait for permission to finish
         pthread_mutex_lock(&canFinishMutex);
         while (!canFinish) {
             pthread_cond_wait(&canFinishCond , &canFinishMutex);
         }
         pthread_mutex_unlock(&canFinishMutex);
     }

     pthread_exit(NULL);
 }

 void *processArrayJoin(void *args)
 {
     struct thread_data *data = (struct thread_data *)args;
     int *arr  = data->arr;
     int start = data->start;
     int end   = data->end;

     // Do the work
     for (int i = start; i <= end; i++) {
         arr[i] = arr[i] + 1;
     }

     pthread_exit(NULL);
 }

出力は次のとおりです。

Join average : 0.153074
Mutex average: 0.071588
Mutex is 53% faster.

ありがとうございました。あなたの助けに本当に感謝します!

4

4 に答える 4

4

使用できる同期メカニズムがいくつかあります (条件変数など)。pthread_barrier最も簡単なのは、 a を使用してスレッドの開始を同期することだと思います。

ループの反復ごとにすべてのスレッドを「同期」させたいと仮定すると、バリアを再利用できます。より柔軟なものが必要な場合は、条件変数がより適切かもしれません。

スレッドが終了する時が来たと判断した場合 (スレッドが無限ループから抜け出す方法を示していません。そのために単純な共有変数が使用される場合があります。共有変数はアトミック型または保護されている可能性があります)ミューテックスを使用する場合)、スレッドはすべてのスレッドが完了するまで待機するためにmain()使用する必要があります。pthread_join()

于 2012-09-05T13:33:13.473 に答える
3

とは異なる同期手法を使用する必要があることはjoin明らかです。

残念ながら、多くのオプションがあります。1 つは「同期バリア」です。これは基本的に、到達する各スレッドがすべて到達するまでブロックするものです (事前にスレッド数を指定します)。を見てくださいpthread_barrier

もう 1 つは、条件変数/ミューテックスのペアを使用することです ( pthread_cond_*)。各スレッドが終了すると、mutex が取得され、カウントがインクリメントされ、condvar に通知されます。メインスレッドは、カウントが期待値に達するまで condvar を待機します。コードは次のようになります。

// thread has finished
mutex_lock
++global_count
// optional optimization: only execute the next line when global_count >= N
cond_signal
mutex_unlock

// main is waiting for N threads to finish
mutex_lock
while (global_count < N) {
    cond_wait
}
mutex_unlock

もう 1 つは、スレッドごとにセマフォを使用する方法です。スレッドが終了すると、独自のセマフォを送信し、メイン スレッドは各スレッドに順番に参加するのではなく、各セマフォを順番に待機します。

また、次のジョブのスレッドを再起動するために同期が必要です。これは、最初のオブジェクトと同じタイプの 2 番目の同期オブジェクトである可能性があります。ただし、1 人のポスターと N 人のウェイターが逆ではなく、1 人いるという事実のために詳細が変更されています。その周り。または、両方の目的で同じオブジェクトを (注意して) 再利用することもできます。

これらのことを試してもコードが機能しない場合は、試したコードについて具体的な質問をしてください。それらのすべては、タスクに適しています。

于 2012-09-05T14:10:51.130 に答える
2

間違った抽象化レベルで作業しています。この問題はすでに解決されています。ワーク キュー + スレッド プールを再実装しています。

OpenMPはあなたの問題に適しているようです。#pragma注釈をスレッド化されたコードに変換します。やろうとしていることをかなりダイレクトに表現できると思います。

libdispatchを使用すると、あなたがしようとしていることはdispatch_apply、同時キューをターゲットにすることとして表現されます。これは、すべての子タスクが完了するのを暗黙的に待機します。OS X では、移植性のない pthread ワークキュー インターフェイスを使用して実装されています。FreeBSD では、pthread のグループを直接管理していると思います。

移植性の問題で未加工の pthread を使用する場合は、pthread バリアを使用しないでください。バリアは、基本的な POSIX スレッドを超える追加の拡張機能です。たとえば、OS X はサポートしていません。詳細については、POSIXを参照してください。

すべての子スレッドが完了するまでメイン スレッドをブロックするには、条件変数で保護されたカウントを使用するか、さらに単純に、パイプと読み取りバイト数がスレッド数と一致するブロッキング読み取りを使用します。各スレッドは、作業の完了時に 1 バイトを書き込み、メイン スレッドから新しい作業を取得するまでスリープします。各スレッドが「I'm done!」と書き込むと、メイン スレッドのブロックが解除されます。バイト。

子スレッドへの作業の受け渡しは、作業記述子を保護するミューテックスと、新しい作業を通知する条件を使用して行うことができます。すべてのスレッドが取得する作業記述子の単一の配列を使用できます。合図で、それぞれがミューテックスを取得しようとします。ミューテックスを取得すると、いくつかの作業をキューから取り出し、キューが空でない場合は新たにシグナルを送信し、その作業を処理した後、マスター スレッドに完了をシグナルします。

この「作業キュー」を再利用して、結果キューの長さがスレッドの数と一致するまでメイン スレッドを待機させ、結果をキューに入れることでメイン スレッドのブロックを解除できます。パイプ アプローチは、ブロッキングreadを使用してこのカウントを行うだけです。

于 2012-09-05T14:31:14.927 に答える
1

すべてのスレッドに作業を開始するように指示するには、ゼロに初期化されたグローバル整数変数と同じくらい簡単で、スレッドはゼロ以外になるまで単に待機します。while (1)この方法では、スレッド関数にループは必要ありません。

それらがすべて完了するまで待機するpthread_joinのは、参加しているスレッドが完了するまで実際にブロックされるため、最も簡単です。また、スレッドの後にシステムのものをクリーンアップする必要があります (そうしないと、スレッドからの戻り値が残りのプログラムのために保存されます)。スレッドのすべての配列があるので、pthread_tそれらを 1 つずつループするだけです。プログラムのその部分は他に何もせず、すべてのスレッドが完了するまで待機する必要があるため、順番に待機するだけで問題ありません。

于 2012-09-05T13:32:15.290 に答える