3

この実行ループ (疑似コード) を無期限に実行する 4 つのスレッドを持つ pthread ベースのマルチスレッド プログラムがあります。

while(keepRunning)
{
   pthread_barrier_wait(&g_stage_one_barrier);

   UpdateThisThreadsStateVariables();  

   pthread_barrier_wait(&g_stage_two_barrier);

   DoComputationsThatReadFromAllThreadsStateVariables();
}

これは、ステージ 1 で各スレッドが独自の状態変数を更新するという点で非常にうまく機能します。ステージ 1 では、他のスレッドが他のスレッドの状態変数を読み取っていないため、問題ありません。次に、ステージ 2 では、スレッドが互いの状態を読み取る限り自由に使用できますが、ステージ 2 ではローカル状態変数を変更するスレッドがないため、事実上読み取り専用であるため、問題ありません。

唯一残っている問題は、アプリケーションを終了するときにこれらのスレッドをクリーンかつ確実にシャットダウンするにはどうすればよいかということです。(「クリーンかつ確実に」とは、潜在的なデッドロックや競合状態を導入することなく、理想的には pthread_barrier_wait() 呼び出しからスレッドを強制的に除外するために UNIX シグナルを送信する必要がないことを意味します)

私の main() スレッドはもちろん、スレッドごとに keepRunning を false に設定できますが、各スレッドに対して pthread_barrier_wait() を返すにはどうすればよいでしょうか? AFAICT pthread_barrier_wait() を返す唯一の方法は、4 つのスレッドすべての実行場所を pthread_barrier_wait() 内に同時に配置することですが、一部のスレッドが既に終了している可能性がある場合、これを行うのは困難です。

pthread_barrier_destroy() を呼び出すことは私がやりたいことのように思えますが、スレッドがバリアで待機している可能性がある間にそれを行うのは未定義の動作です。

この問題に対する適切な既知の解決策はありますか?

4

4 に答える 4

2

同じバリアで同期する追加のスレッドを持つことができますが、「シャットダウン マスター」としてのみ存在します。ワーカー スレッドは、質問にある正確なコードを使用し、「シャットダウン マスター」スレッドは次のようになります。

while (keepRunning)
{
    pthread_barrier_wait(&g_stage_one_barrier);

    pthread_mutex_lock(&mkr_lock);
    if (!mainKeepRunning)
        keepRunning = 0;
    pthread_mutex_unlock(&mkr_lock);

    pthread_barrier_wait(&g_stage_two_barrier);
}

メインスレッドが他のスレッドをシャットダウンしたい場合は、次のようにします。

pthread_mutex_lock(&mkr_lock);
mainKeepRunning = 0;
pthread_mutex_unlock(&mkr_lock);

(つまり、keepRunning変数は、ステージ 2 では読み取り専用の共有スレッド状態の一部になり、ステージ 1 ではシャットダウン マスター スレッドによって所有されます)。

もちろん、専用のスレッドを使用するのではなく、他のスレッドの 1 つを「シャットダウン マスター スレッド」として選択することもできます。

于 2015-03-04T04:23:14.453 に答える
2

2 つのフラグがあり、次のようなものを使用すると機能するはずです。

for (;;)
{
    pthread_barrier_wait(&g_stage_one_barrier);           +
                                                          |
    UpdateThisThreadsStateVariables();                    |
                                                          |
    pthread_mutex_lock(&shutdownMtx);                     | Zone 1
    pendingShutdown = !keepRunning;                       |
    pthread_mutex_unlock(&shutdownMtx);                   |
                                                          |
    pthread_barrier_wait(&g_stage_two_barrier);           +
                                                          |
    if (pendingShutdown)                                  |
        break;                                            | Zone 2
                                                          |
    DoComputationsThatReadFromAllThreadsStateVariables(); |
}

shutdownMtx表示されていませんが、の設定も保護する必要がありkeepRunningます。

ロジックは、pendingShutdownが に設定されるまでに、すべてのスレッドがゾーン 1true内にある必要があるということです。(これは、一部のスレッドのみが であることを確認した場合でも当てはまります。そのため、レースを続けても問題ありません。)したがって、それらはすべて に達し、ゾーン 2に入るとすべてブレイクアウトします。keepRunningfalsekeepRunningpthread_barrier_wait(&g_stage_two_barrier)

PTHREAD_BARRIER_SERIAL_THREADまた、forによって返されるスレッドの 1 つだけをチェックしてpthread_barrier_wait()、そのスレッドでのみロックと更新を行うpendingShutdownこともできます。これにより、パフォーマンスが向上する可能性があります。

于 2015-03-04T00:26:55.210 に答える
1

要件の矛盾があります: バリア セマンティクスではすべてのスレッドがin継続する必要があり、シャットダウンではスレッドが実行ブロック間で共有されている場合 (異なるバリア内にある可能性があります)、終了する必要があります。

cancelバリアを extern呼び出しをサポートするカスタム実装に置き換えることをお勧めします。

例 (実行されない可能性がありますが、アイデアは...):

struct _barrier_entry
{
  pthread_cond_t cond;
  volatile bool released;
  volatile struct _barrier_entry *next;
};

typedef struct
{
  volatile int capacity;
  volatile int count;
  volatile struct _barrier_entry *first;
  pthread_mutex_t lock;
} custom_barrier_t;

初期化:

int custom_barrier_init(custom_barrier_t *barrier, int capacity)
{
   if (NULL == barrier || capacity <= 0)
   {
     errno = EINVAL;
     return -1;
   }
   barrier->capacity = capacity;
   barrier->count = 0;
   barrier->first = NULL;
   return pthread_mutex_init(&barrier->lock, NULL);
   return -1;
}

ヘルパー:

static void _custom_barrier_flush(custom_barrier_t *barrier)
{
   struct _barrier_entry *ptr;
   for (ptr = barrier->first; NULL != ptr;)
   {
     struct _barrier_entry *next = ptr->next;
     ptr->released = true;
     pthread_cond_signal(&ptr->cond);
     ptr = next;
   }
   barrier->first = NULL;
   barrier->count = 0;
}

ブロッキング待機:

int custom_barrier_wait(custom_barrier_t *barrier)
{
   struct _barrier_entry entry;
   int result;
   pthread_cond_init(&barrier->entry, NULL);
   entry->next = NULL;
   entry->released = false;

   pthread_mutex_lock(&barrier->lock);
   barrier->count++;
   if (barrier->count == barrier->capacity)
   {
     _custom_barrier_flush(barrier);
     result = 0;
   }
   else
   {
     entry->next = barrier->first;
     barrier->first = entry;
     while (true)
     {
       pthread_cond_wait(&entry->cond, &barrier->lock);
       if (entry->released)
       {
         result = 0;
         break;
       }
       if (barrier->capacity < 0)
       {
         errno = ECANCELLED;
         result = -1;
         break;
       }
     }
   }
   pthread_mutex_unlock(&barrier->lock);
   pthread_cond_destroy(&entry->cond);
   return result;
}

キャンセル:

 int custom_barrier_cancel(custom_barrier_t *barrier)
 {
   pthread_mutex_lock(barrier->lock);
   barrier->capacity = -1;
   _custom_barrier_flush(barrier);
   pthread_mutex_unlock(barrier->lock);
   return 0;
 }

ECANCELLEDしたがって、呼び出し後にエラーが発生するまで、スレッドコードはループ内で実行できますcustom_barrier_wait

于 2015-03-04T00:27:30.053 に答える