2

(要するに:main()のWaitForSingleObjectは以下のプログラムでハングします)。

スレッドをディスパッチし、スレッドが終了するのを待ってから再開するコードを書き込もうとしています。コストのかかる毎回スレッドを作成する代わりに、スレッドをスリープ状態にしました。メインスレッドは、CREATE_SUSPENDED状態でXスレッドを作成します。

同期は、MaximumCountとしてXを使用するセマフォを使用して行われます。セマフォのカウンタがゼロになり、スレッドがディスパッチされます。スレッドはいくつかのばかげたループを実行し、スリープ状態になる前にReleaseSemaphoreを呼び出します。次に、メインスレッドはWaitForSingleObjectをX回使用して、すべてのスレッドがジョブを終了し、スリープしていることを確認します。次に、ループしてすべてをやり直します。

時々、プログラムは終了しません。プログラムをくちばしにすると、WaitForSingleObjectがハングしていることがわかります。これは、スレッドのReleaseSemaphoreが機能しなかったことを意味します。何も印刷されていないので、おそらく何も問題はありませんでした。

たぶん、2つのスレッドがReleaseSemaphoreを同時に呼び出すべきではありませんが、それはセマフォの目的を無効にするでしょう...

私はそれを食べません...

スレッドを同期する他のソリューションはありがたいことに受け入れられています!

#define TRY  100
#define LOOP 100

HANDLE *ids;
HANDLE semaphore;

DWORD WINAPI Count(__in LPVOID lpParameter)
{ 
 float x = 1.0f;   
 while(1)
 { 
  for (int i=1 ; i<LOOP ; i++)
   x = sqrt((float)i*x);
  while (ReleaseSemaphore(semaphore,1,NULL) == FALSE)
   printf(" ReleaseSemaphore error : %d ", GetLastError());
  SuspendThread(ids[(int) lpParameter]);
 }
 return (DWORD)(int)x;
}

int main()
{
 SYSTEM_INFO sysinfo;
 GetSystemInfo( &sysinfo );
 int numCPU = sysinfo.dwNumberOfProcessors;

 semaphore = CreateSemaphore(NULL, numCPU, numCPU, NULL);
 ids = new HANDLE[numCPU];

 for (int j=0 ; j<numCPU ; j++)
  ids[j] = CreateThread(NULL, 0, Count, (LPVOID)j, CREATE_SUSPENDED, NULL);

 for (int j=0 ; j<TRY ; j++)
 {
  for (int i=0 ; i<numCPU ; i++)
  {
   if (WaitForSingleObject(semaphore,1) == WAIT_TIMEOUT)
    printf("Timed out !!!\n");
   ResumeThread(ids[i]);  
  }
  for (int i=0 ; i<numCPU ; i++)
   WaitForSingleObject(semaphore,INFINITE);
  ReleaseSemaphore(semaphore,numCPU,NULL);
 }
 CloseHandle(semaphore);
 printf("Done\n");
 getc(stdin);
}
4

5 に答える 5

5

セマフォを (少なくとも直接的に) 使用したり、メインで明示的にスレッドを起動して作業を完了させたりする代わりに、私は常にスレッドセーフ キューを使用してきました。main がワーカー スレッドに何かを実行させたい場合、実行するジョブの説明をキューにプッシュします。ワーカー スレッドはそれぞれ 1 つのジョブを実行し、次にキューから別のジョブをポップしようとし、実行するジョブがキューに存在するまで中断されます。

キューのコードは次のようになります。

#ifndef QUEUE_H_INCLUDED
#define QUEUE_H_INCLUDED

#include <windows.h>

template<class T, unsigned max = 256>
class queue { 
    HANDLE space_avail; // at least one slot empty
    HANDLE data_avail;  // at least one slot full
    CRITICAL_SECTION mutex; // protect buffer, in_pos, out_pos

    T buffer[max];
    long in_pos, out_pos;
public:
    queue() : in_pos(0), out_pos(0) { 
        space_avail = CreateSemaphore(NULL, max, max, NULL);
        data_avail = CreateSemaphore(NULL, 0, max, NULL);
        InitializeCriticalSection(&mutex);
    }

    void push(T data) { 
        WaitForSingleObject(space_avail, INFINITE);       
        EnterCriticalSection(&mutex);
        buffer[in_pos] = data;
        in_pos = (in_pos + 1) % max;
        LeaveCriticalSection(&mutex);
        ReleaseSemaphore(data_avail, 1, NULL);
    }

    T pop() { 
        WaitForSingleObject(data_avail,INFINITE);
        EnterCriticalSection(&mutex);
        T retval = buffer[out_pos];
        out_pos = (out_pos + 1) % max;
        LeaveCriticalSection(&mutex);
        ReleaseSemaphore(space_avail, 1, NULL);
        return retval;
    }

    ~queue() { 
        DeleteCriticalSection(&mutex);
        CloseHandle(data_avail);
        CloseHandle(space_avail);
    }
};

#endif

そして、それを使用するスレッド内のコードの大まかな同等物は、次のようになります。私はあなたのスレッド関数が何をしていたのかを正確に整理しませんでしたが、それは平方根を合計するものでした.今のところ、スレッドが実際に行うことよりもスレッド同期に興味があるようです.

編集:(コメントに基づく):main()いくつかのタスクが完了するのを待つ必要がある場合は、さらに作業を行ってから、さらにタスクを割り当てる必要があります。通常、各タスクにイベント(たとえば)を入れて処理するのが最善です。スレッド関数はイベントを設定します。これを行うために修正されたコードは次のようになります (キュー コードは影響を受けないことに注意してください)。

#include "queue.hpp"

#include <iostream>
#include <process.h>
#include <math.h>
#include <vector>

struct task { 
    int val;
    HANDLE e;

    task() : e(CreateEvent(NULL, 0, 0, NULL)) { }
    task(int i) : val(i), e(CreateEvent(NULL, 0, 0, NULL)) {}
};

void process(void *p) { 
    queue<task> &q = *static_cast<queue<task> *>(p);

    task t;
    while ( -1 != (t=q.pop()).val) {
        std::cout << t.val << "\n";
        SetEvent(t.e);
    }
}

int main() { 
    queue<task> jobs;

    enum { thread_count = 4 };
    enum { task_count = 10 };

    std::vector<HANDLE> threads;
    std::vector<HANDLE> events;

    std::cout << "Creating thread pool" << std::endl;
    for (int t=0; t<thread_count; ++t)
        threads.push_back((HANDLE)_beginthread(process, 0, &jobs));
    std::cout << "Thread pool Waiting" << std::endl;

    std::cout << "First round of tasks" << std::endl;

    for (int i=0; i<task_count; ++i) {
        task t(i+1);
        events.push_back(t.e);
        jobs.push(t);
    }

    WaitForMultipleObjects(events.size(), &events[0], TRUE, INFINITE);

    events.clear();

    std::cout << "Second round of tasks" << std::endl;

    for (int i=0; i<task_count; ++i) {
        task t(i+20);
        events.push_back(t.e);
        jobs.push(t);
    }

    WaitForMultipleObjects(events.size(), &events[0], true, INFINITE);
    events.clear();

    for (int j=0; j<thread_count; ++j)
        jobs.push(-1);

    WaitForMultipleObjects(threads.size(), &threads[0], TRUE, INFINITE);

    return 0;
}
于 2010-03-03T22:07:07.083 に答える
3

次の場合に問題が発生します。

メインスレッドはワーカースレッドを再開します。

  for (int i=0 ; i<numCPU ; i++)
  {
   if (WaitForSingleObject(semaphore,1) == WAIT_TIMEOUT)
    printf("Timed out !!!\n");
   ResumeThread(ids[i]);  
  }

ワーカースレッドは作業を行い、セマフォを解放します。

  for (int i=1 ; i<LOOP ; i++)
   x = sqrt((float)i*x);
  while (ReleaseSemaphore(semaphore,1,NULL) == FALSE)

メインスレッドはすべてのワーカースレッドを待機し、セマフォをリセットします。

  for (int i=0 ; i<numCPU ; i++)
   WaitForSingleObject(semaphore,INFINITE);
  ReleaseSemaphore(semaphore,numCPU,NULL);

メインスレッドは次のラウンドに入り、ワーカースレッドを再開しようとします(ワーカースレッドはまだイベントが一時停止されていないことに注意してください!ここから問題が発生します...必ずしも一時停止されていないスレッドを再開しようとしていますまだ):

  for (int i=0 ; i<numCPU ; i++)
  {
   if (WaitForSingleObject(semaphore,1) == WAIT_TIMEOUT)
    printf("Timed out !!!\n");
   ResumeThread(ids[i]);  
  }

最後に、ワーカースレッドは一時停止します(ただし、次のラウンドはすでに開始されているはずです)。

  SuspendThread(ids[(int) lpParameter]);

そして、すべてのワーカーが現在中断されているため、メインスレッドは永久に待機します。

  for (int i=0 ; i<numCPU ; i++)
   WaitForSingleObject(semaphore,INFINITE);

生産者/消費者問題を正しく解決する方法を示すリンクは次のとおりです。

http://en.wikipedia.org/wiki/Producer-consumer_problem

また、クリティカルセクションはセマフォやミューテックスよりもはるかに高速だと思います。また、ほとんどの場合(imo)理解しやすいです。

于 2010-03-03T22:23:28.980 に答える
3

コードはわかりませんが、スレッドの同期は明らかに悪いです。スレッドが特定の順序で SuspendThread() を呼び出すと仮定します。WaitForSingleObject() 呼び出しが成功しても、どのスレッドが ReleaseSemaphore()を呼び出したかはわかりません。したがって、中断されていないスレッドで ReleaseThread() を呼び出します。これにより、プログラムはすぐにデッドロックします。

もう 1 つの悪い仮定は、WFSO が返された後に、スレッドが既に SuspendThread を呼び出しているということです。通常はい、常にではありません。スレッドは、RS 呼び出しの直後に横取りされる可能性があります。中断されていないスレッドで ReleaseThread() を再度呼び出します。通常、プログラムがデッドロックするまでに 1 日ほどかかります。

そして、1 つの ReleaseSemaphore 呼び出しが多すぎると思います。間違いなく、それを解き放とうとしている。

Suspend/ReleaseThread() でスレッド化を制御することはできません。試してはいけません。

于 2010-03-03T21:59:57.433 に答える
0

問題は、信号を送るよりも頻繁に待っていることです。

ループはfor (int j=0 ; j<TRY ; j++)セマフォを 8 回待機しますが、4 つのスレッドはそれぞれ 1 回だけシグナルを送信し、ループ自体は 1 回シグナルを送信します。初めてループを通過するときは、セマフォに 4 の初期カウントが与えられるため、これは問題ではありません。2 回目以降は、待機しているシグナルが多すぎます。これは、最初の 4 回の待機で時間を制限し、エラーで再試行しないという事実によって軽減されます。そのため、うまくいく場合もあれば、待機がハングする場合もあります。

次の(テストされていない)変更が役立つと思います。

セマフォをゼロ カウントに初期化します。

semaphore = CreateSemaphore(NULL, 0, numCPU, NULL);

スレッド再開ループの待機を取り除きます (つまり、以下を削除します)。

   if (WaitForSingleObject(semaphore,1) == WAIT_TIMEOUT)  
      printf("Timed out !!!\n");  

try ループの最後から不要なシグナルを削除します (つまり、以下を削除します)。

ReleaseSemaphore(semaphore,numCPU,NULL);
于 2010-03-03T22:30:24.427 に答える
0

ここに実用的な解決策があります。

メイン プログラムでスレッドを使用して (その後、複数のコアを使用して) ジョブをむしゃむしゃ食べ、すべてのスレッドが完了するのを待ってから、再開して他の処理を行うようにしたかったのです。スレッドが死んで新しいスレッドを作成するのは遅いので、やりたくありませんでした。私の質問では、スレッドを一時停止することでそれを行おうとしていましたが、これは自然に思えました。しかし、nobugz が指摘したように、「Suspend/ReleaseThread() でスレッドを制御できます」。

解決策には、私がスレッドの制御に使用していたようなセマフォが含まれます。実際には、メイン スレッドを制御するためにもう 1 つのセマフォが使用されます。これで、スレッドを制御するスレッドごとに 1 つのセマフォと、メインを制御する 1 つのセマフォができました。

解決策は次のとおりです。

#include <windows.h>
#include <stdio.h>
#include <math.h>
#include <process.h>

#define TRY  500000
#define LOOP 100

HANDLE *ids;
HANDLE *semaphores;
HANDLE allThreadsSemaphore;

DWORD WINAPI Count(__in LPVOID lpParameter)
{   
    float x = 1.0f;         
    while(1)
    {   
        WaitForSingleObject(semaphores[(int)lpParameter],INFINITE);
        for (int i=1 ; i<LOOP ; i++)
            x = sqrt((float)i*x+rand());
        ReleaseSemaphore(allThreadsSemaphore,1,NULL);
    }
    return (DWORD)(int)x;
}

int main()
{
    SYSTEM_INFO sysinfo;
    GetSystemInfo( &sysinfo );
    int numCPU = sysinfo.dwNumberOfProcessors;

    ids = new HANDLE[numCPU];
    semaphores = new HANDLE[numCPU]; 

    for (int j=0 ; j<numCPU ; j++)
    {
        ids[j] = CreateThread(NULL, 0, Count, (LPVOID)j, NULL, NULL);
        // Threads blocked until main releases them one by one
        semaphores[j] = CreateSemaphore(NULL, 0, 1, NULL);
    }
    // Blocks main until threads finish
    allThreadsSemaphore = CreateSemaphore(NULL, 0, numCPU, NULL);

    for (int j=0 ; j<TRY ; j++)
    {
        for (int i=0 ; i<numCPU ; i++) // Let numCPU threads do their jobs
            ReleaseSemaphore(semaphores[i],1,NULL);
        for (int i=0 ; i<numCPU ; i++) // wait for numCPU threads to finish
            WaitForSingleObject(allThreadsSemaphore,INFINITE);
    }
    for (int j=0 ; j<numCPU ; j++)
        CloseHandle(semaphores[j]);
    CloseHandle(allThreadsSemaphore);
    printf("Done\n");
    getc(stdin);
}
于 2010-03-04T14:23:19.587 に答える