1

Linux 環境で C プログラミング言語を使用してスレッド プールを実装します。ブロッキング タスク キューがあり、タスクをタスク キューに入れることができ、スレッド プール内のスレッドはタスク キューからタスクを取得し、タスク キューが空の場合は、タスクキューが空ではないことを示す条件信号を待っているブロックを巻きました。スレッド プール内のスレッドが実行する関数は次のとおりです。

    static void *do_worker(void *arg)
    {
        printf("new thread %d\n", pthread_self());
        ThreadPool *pool = (ThreadPool *)arg;
        while (1) {
           printf("thread %d try to get new task\n", pthread_self());
           Task task;
           //printf("before get_task\n");
           task = get_task(&pool->task_queue);
           printf("thread %d get_task\n", pthread_self());
           if (task.ftn == NULL ) {
               break;
           }
           task.ftn(task.arg);
           //printf("finish new task\n");
           //printf("I'm thread %d\n", pthread_self());
        }
        printf("thread %d exit\n", pthread_self());
        pthread_exit(NULL);
    }

通常のタスクをキューに追加すると問題ないように見えますが、毒タスクを使用してスレッドプールを破棄しようとすると、次のようになります。

    void destroy_threadpool(ThreadPool *pool)
    {
     int i;
     /* put poison to let the threads exit */
     for (i = 0; i < pool->t_cnt; i++) {
         Task null_task;
         null_task.ftn = NULL;
         null_task.arg = NULL;
         printf("insert poison task\n");
         insert_task(pool, null_task);
     }

     for (i = 0; i < pool->t_cnt; i++) {
         pthread_join(pool->tids[i], NULL);
     }
    }

うまくいきません。そして、この実装をテストするとき、最初にスレッドプール内のスレッドを空でない状態で待機中にブロックしてから、destroy_threadpool 関数を呼び出します。ただし、一部のスレッドは終了できますが、一部のスレッドは終了できず、条件の待機中に依然としてブロックされ、その結果、メイン スレッドが pthread_join でブロックされます。問題は何ですか?

get_task と put_task のコードは次のとおりです。

    Task get_task(TaskQueue *queue)
    {
     Task task;

     pthread_mutex_lock(&queue->mutex);

     while (queue->front == queue->back) {

         printf("thread %d blocks!\n", pthread_self());
         pthread_cond_wait(&queue->no_empty, &queue->mutex);

     }

     /* get the front task in the queue */
     memcpy(&task, &queue->tasks[queue->front], sizeof(Task));
     if (++queue->front == MAXTASK) {
         queue->front = 0;
     }
     pthread_cond_signal(&queue->no_full);
     pthread_mutex_unlock(&queue->mutex);

     return task;
    }

    void put_task(TaskQueue *queue, Task task)
    {
     pthread_mutex_lock(&queue->mutex);

     while (((queue->back + 1) == queue->front) || (queue->back - queue->front == MAXTASK - 1)) {
         pthread_cond_wait(&queue->no_full, &queue->mutex);
     }

     memcpy(&queue->tasks[queue->back], &task, sizeof(task));
     if (++queue->back == MAXTASK) {
         queue->back = 0;
     }

     pthread_cond_signal(&queue->no_empty);

     pthread_mutex_unlock(&queue->mutex);

    }

ThreadPool と TaskQueue の定義は次のとおりです。

    typedef struct {
        void *(*ftn)(void *arg);
        void *arg;
    } Task;

    typedef struct {
        Task tasks[MAXTASK];
        int front;
        int back;
        pthread_mutex_t mutex;
        pthread_cond_t no_empty;
        pthread_cond_t no_full;
    } TaskQueue;

    typedef struct {
        pthread_t tids[MAXTHREAD];
        int t_cnt;
        TaskQueue task_queue;
    } ThreadPool;
4

0 に答える 0