6

Stackoverflowは私にとって大きな助けになり、コミュニティに何かを返したいと思います。Stackoverflowから学んだことを使用して、TinyThread++ウェブサイトC++ポータブルスレッドライブラリを使用して単純なスレッドプールを実装しています。私はスレッドプログラミングに慣れていないので、ミューテックスなどにあまり慣れていません。コードを提示した後に最もよく聞かれる質問があります(Linuxで非常にうまく動作します)。

// ThreadPool.h

class ThreadPool
{
 public:

 ThreadPool();
~ThreadPool();

// Creates a pool of threads and gets them ready to be used
void CreateThreads(int numOfThreads);

// Assigns a job to a thread in the pool, but doesn't start the job
// Each SubmitJob call will use up one thread of the pool.
// This operation can only be undone by calling StartJobs and
// then waiting for the jobs to complete. On completion,
// new jobs may be submitted.
void SubmitJob( void (*workFunc)(void *), void *workData );

// Begins execution of all the jobs in the pool.
void StartJobs();

// Waits until all jobs have completed.
// The wait will block the caller.
// On completion, new jobs may be submitted.
void WaitForJobsToComplete();

private:

enum typeOfWorkEnum { e_work, e_quit };

 class ThreadData
 {
   public:

    bool ready;  // thread has been created and is ready for work  
    bool haveWorkToDo;
    typeOfWorkEnum  typeOfWork;

    // Pointer to the work function each thread has to call.
    void (*workFunc)(void *);

    // Pointer to work data
    void *workData;

    ThreadData() : ready(false), haveWorkToDo(false) {  };
 };

struct ThreadArgStruct
{
    ThreadPool *threadPoolInstance;
    int         threadId;
};

// Data for each thread
ThreadData  *m_ThreadData;

ThreadPool(ThreadPool const&); // copy ctor hidden
ThreadPool& operator=(ThreadPool const&); // assign op. hidden

// Static function that provides the function pointer that a thread can call
// By including the ThreadPool instance in the void * parameter,
// we can use it to access other data and methods in the ThreadPool instance.
static void ThreadFuncWrapper(void *arg)
{
    ThreadArgStruct *threadArg = static_cast<ThreadArgStruct *>(arg);
    threadArg->threadPoolInstance->ThreadFunc(threadArg->threadId);
}

// The function each thread calls    
void ThreadFunc( int threadId );

// Called by the thread pool destructor
void DestroyThreadPool();

// Total number of threads available
// (fixed on creation of thread pool)
int m_numOfThreads;
int m_NumOfThreadsDoingWork;
int m_NumOfThreadsGivenJobs;

// List of threads
std::vector<tthread::thread *> m_ThreadList;

// Condition variable to signal each thread has been created and executing
tthread::mutex              m_ThreadReady_mutex;
tthread::condition_variable m_ThreadReady_condvar;

 // Condition variable to signal each thread to start work
tthread::mutex              m_WorkToDo_mutex;
tthread::condition_variable m_WorkToDo_condvar;

// Condition variable to signal the main thread that 
// all threads in the pool have completed their work
tthread::mutex              m_WorkCompleted_mutex;
tthread::condition_variable m_WorkCompleted_condvar;
};

cppファイル:

//
//  ThreadPool.cpp
//

#include "ThreadPool.h"    

// This is the thread function for each thread.
// All threads remain in this function until
// they are asked to quit, which only happens
// when terminating the thread pool.
void ThreadPool::ThreadFunc( int threadId )
{
 ThreadData *myThreadData = &m_ThreadData[threadId]; 
 std::cout << "Hello world: Thread " << threadId << std::endl;

 // Signal that this thread is ready
 m_ThreadReady_mutex.lock();
       myThreadData->ready = true;
       m_ThreadReady_condvar.notify_one(); // notify the main thread
 m_ThreadReady_mutex.unlock();       

 while(true)
 {
    //tthread::lock_guard<tthread::mutex> guard(m);
    m_WorkToDo_mutex.lock();

    while(!myThreadData->haveWorkToDo) // check for work to do
         m_WorkToDo_condvar.wait(m_WorkToDo_mutex); // if no work, wait here 
    myThreadData->haveWorkToDo = false; // need to do this before unlocking the mutex

    m_WorkToDo_mutex.unlock();

    // Do the work
    switch(myThreadData->typeOfWork)
    {
        case e_work:
            std::cout << "Thread " << threadId << ": Woken with work to do\n";

            // Do work
            myThreadData->workFunc(myThreadData->workData);

            std::cout << "#Thread " << threadId  << ": Work is completed\n";
            break;

         case e_quit:
             std::cout << "Thread " << threadId << ": Asked to quit\n";
             return; // ends the thread
    }

    // Now to signal the main thread that my work is completed
    m_WorkCompleted_mutex.lock();
       m_NumOfThreadsDoingWork--;

      // Unsure if this 'if' would make the program more efficient
      // if(m_NumOfThreadsDoingWork == 0)
           m_WorkCompleted_condvar.notify_one(); // notify the main thread
    m_WorkCompleted_mutex.unlock();       
  }

}


ThreadPool::ThreadPool() 
{ 
   m_numOfThreads = 0;  m_NumOfThreadsDoingWork = 0; m_NumOfThreadsGivenJobs = 0;
}


ThreadPool::~ThreadPool()
{
    if(m_numOfThreads)
    {
    DestroyThreadPool(); 
    delete [] m_ThreadData;
    }
}


void ThreadPool::CreateThreads(int numOfThreads)
{
// Check if a thread pool has already been created
if(m_numOfThreads > 0) 
   return;

m_NumOfThreadsGivenJobs = 0;
m_NumOfThreadsDoingWork = 0;
m_numOfThreads = numOfThreads;
m_ThreadData = new ThreadData[m_numOfThreads];
ThreadArgStruct threadArg;

for(int i=0; i<m_numOfThreads; ++i)
 {   
    threadArg.threadId = i;
    threadArg.threadPoolInstance = this;

    // Creates the thread and saves it in a list so we can destroy it later
    m_ThreadList.push_back( new tthread::thread( ThreadFuncWrapper, (void *)&threadArg  ) ); 

    // It takes a little time for a thread to get established.
    // Best wait until it gets established before creating the next thread.
    m_ThreadReady_mutex.lock();
    while(!m_ThreadData[i].ready)  // Check if thread is ready
        m_ThreadReady_condvar.wait(m_ThreadReady_mutex); // If not, wait here
    m_ThreadReady_mutex.unlock();    
 } 
} 


// Assigns a job to a thread, but doesn't start the job
void ThreadPool::SubmitJob(void (*workFunc)(void *), void *workData)
{
 // Check if the thread pool has been created
 if(!m_numOfThreads) 
    return;

 if(m_NumOfThreadsGivenJobs >= m_numOfThreads)
    return;

 m_ThreadData[m_NumOfThreadsGivenJobs].workFunc = workFunc;
 m_ThreadData[m_NumOfThreadsGivenJobs].workData = workData;  

 std::cout << "Submitted job " << m_NumOfThreadsGivenJobs << std::endl;

 m_NumOfThreadsGivenJobs++;  
}

void ThreadPool::StartJobs()
{
// Check that the thread pool has been created
// and some jobs have been assigned
if(!m_numOfThreads || !m_NumOfThreadsGivenJobs) 
   return;

// Set 'haveworkToDo' flag for all threads 
m_WorkToDo_mutex.lock();
   for(int i=0; i<m_NumOfThreadsGivenJobs; ++i)
   {
       m_ThreadData[i].typeOfWork = e_work;  // forgot to do this !
       m_ThreadData[i].haveWorkToDo = true;
   }
   m_NumOfThreadsDoingWork = m_NumOfThreadsGivenJobs;

   // Reset this counter so we can resubmit jobs later
   m_NumOfThreadsGivenJobs = 0;

   // Notify all threads they have work to do
   m_WorkToDo_condvar.notify_all();
   m_WorkToDo_mutex.unlock();
}


void ThreadPool::WaitForJobsToComplete()
{
  // Check that a thread pool has been created
  if(!m_numOfThreads) 
   return;

 m_WorkCompleted_mutex.lock();
 while(m_NumOfThreadsDoingWork > 0)  // Check if all threads have completed their work
   m_WorkCompleted_condvar.wait(m_WorkCompleted_mutex); // If not, wait here
 m_WorkCompleted_mutex.unlock();    
}


void ThreadPool::DestroyThreadPool()
{
std::cout << "Ask threads to quit\n";
m_WorkToDo_mutex.lock();
   for(int i=0; i<m_numOfThreads; ++i)
   {
     m_ThreadData[i].haveWorkToDo = true;
     m_ThreadData[i].typeOfWork = e_quit;
   }
   m_WorkToDo_condvar.notify_all();
m_WorkToDo_mutex.unlock();

// As each thread terminates, catch them here
for(int i=0; i<m_numOfThreads; ++i)
 {
     tthread::thread *t = m_ThreadList[i];

     // Wait for thread to complete
     t->join();
 }
 m_numOfThreads = 0;
}

使用例:(これは、平方の逆数を合計することによってpi-squared / 6を計算します)実際、この使用例は、同じ計算を10回並行して実行します。より実用的な使用法は、各スレッドが合計された項の異なるセットを計算することです。プールジョブが完了したら、すべてのスレッド結果を追加して最終結果を取得します。

struct CalculationDataStruct
{
int inputVal;
double outputVal;
};

void LongCalculation( void *theSums )
{
CalculationDataStruct *sums = (CalculationDataStruct *)theSums;

int terms = sums->inputVal;
double sum;
for(int i=1; i<terms; i++)
    sum += 1.0/( double(i)*double(i) );
sums->outputVal = sum;
}


int main(int argc, char** argv)
{ 
int numThreads = 10;

// Create pool
ThreadPool threadPool;
threadPool.CreateThreads(numThreads);

// Create thread workspace
CalculationDataStruct sums[numThreads];

// Set up jobs
for(int i=0; i<numThreads; i++)
{
    sums[i].inputVal = 3000*(i+1);
    threadPool.SubmitJob(LongCalculation, &sums[i]);
}

// Run the jobs
threadPool.StartJobs();
threadPool.WaitForJobsToComplete();

// Print results
for(int i=0; i<numThreads; i++)
   std::cout << "Sum of " << sums[i].inputVal << " terms is " << sums[i].outputVal << std::endl;

 return 0;
}

質問:ThreadPool :: ThreadFuncメソッドでは、次のifステートメントを使用するとパフォーマンスが向上します。

if(NumOfThreadsDoingWork == 0)

含まれていましたか?また、批判やコードを改善する方法にも感謝します。同時に、このコードが他の人にも役立つことを願っています。

4

1 に答える 1

1

まず、C++11 の " std::thread " と "std::mutex"を調べることをお勧めします。Intel の「 Threading Building Blocks 」を調査することもできます。これは、作業分散の多くのパターンを提供します。移植可能なクロスプラットフォームの C++ カプセル化 API については、一般的にOpenThreads ライブラリを使用しました。最後に、 ZeroMQなどのメッセージ パッシング ライブラリを使用して、mutex を使用せずにスケーラブルで分散された作業負荷を構築できます。

現在のコードを見ると、私の最大の懸念は、作業をスレッドに割り当てるために使用される変数をロックしていないように見えることです。これは、SubmitJob と StartWork を分離したためだと思います。

しかし、結局のところ、ThreadPool はスレッド セーフではありません。

また、作業の種類などを含む少し複雑な API です。おそらく、「作業」の概念を抽象化する必要があります。これは私がそれを行った例です。コードの大部分を ThreadPool クラスにカプセル化することをお勧めします。また、終了方法 (NULL ジョブ) はちょっと人為的です。おそらく pthread_cancel を使用したいと思うでしょうが、これはこのデモンストレーションにかなり役立ちました。

#include <queue>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

static int jobNo = 0;
class Job {
public:
    Job() : m_i(++jobNo) { printf("Created job %d.\n", m_i); }
    int m_i;
    void Execute() { printf("Job %d executing.\n", m_i); usleep(500 * 1000); }
};

std::queue<Job*> queue;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

void AddJob(Job* job) {
    pthread_mutex_lock(&mutex);
    queue.push(job);
    pthread_cond_signal(&cond);
    pthread_mutex_unlock(&mutex);
}

void* QueueWorker(void* /*threadInfo*/) {
    Job* job = NULL;
    for (;;) {
        pthread_mutex_lock(&mutex);
        while ( queue.empty() ) {
            // unlock the mutex until the cond is signal()d or broadcast() to.
            // if this call succeeds, we will have the mutex locked again on the other side.
            pthread_cond_wait(&cond, &mutex);
        }
        // take the first task and then release the lock.
        job = queue.front();
        queue.pop();
        pthread_mutex_unlock(&mutex);

        if ( job == NULL ) {
            // in this demonstration, NULL ends the run, so forward to any other threads.
            AddJob(NULL);
            break;
        }
        job->Execute();
        delete job;
    }
    return NULL;
}

int main(int argc, const char* argv[]) {
    pthread_t worker1, worker2;
    pthread_create(&worker1, NULL, &QueueWorker, NULL);
    pthread_create(&worker2, NULL, &QueueWorker, NULL);

    srand(time(NULL));

    // queue 5 jobs with delays.
    for ( size_t i = 0; i < 5; ++i ) {
        long delay = (rand() % 800) * 1000;
        printf("Producer sleeping %fs\n", (float)delay / (1000*1000));
        usleep(delay);
        Job* job = new Job();
        AddJob(job);
    }
    // 5 more without delays.
    for ( size_t i = 0; i < 5; ++i ) {
        AddJob(new Job);
    }
    // null to end the run.
    AddJob(NULL);

    printf("Done with jobs.\n");
    pthread_join(worker1, NULL);
    pthread_join(worker2, NULL);

    return 0;
}
于 2013-05-29T05:43:45.130 に答える