Douglas Niehaus のクラスpthreads
に従って使用する方法を学ぼうとしています。
今、私は生産者と消費者の問題の真っ只中にいます。これは私がこれまでに到達した解決策です:
プロデューサー:
void *producer (void *parg) {
queue *fifo;
int item_produced;
pcdata *mydata;
int my_tid;
int *total_produced;
mydata = (pcdata *) parg;
fifo = mydata->q;
total_produced = mydata->count;
my_tid = mydata->tid;
/*
* Continue producing until the total produced reaches the
* configured maximum
*/
while (1) {
/*
* Do work to produce an item. Tthe get a slot in the queue for
* it. Finally, at the end of the loop, outside the critical
* section, announce that we produced it.
*/
do_work(PRODUCER_CPU, PRODUCER_BLOCK);
pthread_mutex_lock(fifo->mutex);
/*
* If the queue is full, we have no place to put anything we
* produce, so wait until it is not full.
*/
while(fifo->full && *total_produced != WORK_MAX) {
pthread_cond_wait(fifo->notFull,fifo->mutex);
}
if(*total_produced >= WORK_MAX) {
printf("PRODUCER %d is exitting because the production reached the MAX\n",my_tid);
pthread_cond_signal(fifo->notFull);
pthread_mutex_unlock(fifo->mutex);
break;
}
/*
* OK, so we produce an item. Increment the counter of total
* widgets produced, and add the new widget ID, its number, to the
* queue.
*/
item_produced = (*total_produced)++;
queueAdd (fifo, item_produced);
pthread_cond_signal(fifo->notEmpty);
pthread_mutex_unlock(fifo->mutex);
/*
* Announce the production outside the critical section
*/
printf("prod %d:\t %d.\n", my_tid, item_produced);
}
printf("prod %d:\texited\n", my_tid);
return (NULL);
}
消費者:
void *consumer (void *carg)
{
queue *fifo;
int item_consumed;
pcdata *mydata;
int my_tid;
int *total_consumed;
mydata = (pcdata *) carg;
fifo = mydata->q;
total_consumed = mydata->count;
my_tid = mydata->tid;
/*
* Continue producing until the total consumed by all consumers
* reaches the configured maximum
*/
while (1) {
/*
* If the queue is empty, there is nothing to do, so wait until it
* si not empty.
*/
pthread_mutex_lock(fifo->mutex);
while(fifo->empty && *total_consumed != WORK_MAX) {
pthread_cond_wait(fifo->notEmpty,fifo->mutex);
}
if(*total_consumed >= WORK_MAX) {
printf("CONSUMER %d is exitting because the compsuption reached the MAX\n",my_tid);
pthread_cond_signal(fifo->notEmpty);
pthread_mutex_unlock(fifo->mutex);
break;
}
/*
* Remove the next item from the queue. Increment the count of the
* total consumed. Note that item_consumed is a local copy so this
* thread can retain a memory of which item it consumed even if
* others are busy consuming them.
*/
queueRemove (fifo, &item_consumed);
(*total_consumed)++;
pthread_cond_signal(fifo->notFull);
pthread_mutex_unlock(fifo->mutex);
/*
* Do work outside the critical region to consume the item
* obtained from the queue and then announce its consumption.
*/
do_work(CONSUMER_CPU,CONSUMER_CPU);
printf ("con %d:\t %d.\n", my_tid, item_consumed);
}
printf("con %d:\texited\n", my_tid);
return (NULL);
}
これは、プロデューサーとコンシューマーの数がかなり少ない場合 (たとえば、100 人のコンシューマーと 50 人のプロデューサー、またはその逆) でうまく機能するようです。しかし、2000 を超えるプロデューサーやコンシューマーでコードを試すと、実行がハングします。デッドロックに達したと思いますが、それを見つけることができません。
どんな助けでも大歓迎です。
ノート:
内のシグナルif-block
:
if(*total_consumed >= WORK_MAX) { ... }
そしてif-block
:
if(*total_consumed >= WORK_MAX) {...}
それらなしで実行すると、5 つを超えるプロデューサーやコンシューマーでハングアップするため、私が追加したものです。
私の推論は、作業制限に達した場合、他のプロデューサーにも通知して終了できるようにする必要があるということです。そして、同じことが消費者にも当てはまります。
注 2:
次のコードは、すでに学生に提供されています。
typedef struct {
int buf[QUEUESIZE]; /* Array for Queue contents, managed as circular queue */
int head; /* Index of the queue head */
int tail; /* Index of the queue tail, the next empty slot */
int full; /* Flag set when queue is full */
int empty; /* Flag set when queue is empty */
pthread_mutex_t *mutex; /* Mutex protecting this Queue's data */
pthread_cond_t *notFull; /* Used by producers to await room to produce*/
pthread_cond_t *notEmpty; /* Used by consumers to await something to consume*/
} queue;
typedef struct {
queue *q;
int *count;
int tid;
} pcdata;
/******************************************************/
queue *queueInit (void) {
queue *q;
/*
* Allocate the structure that holds all queue information
*/
q = (queue *)malloc (sizeof (queue));
if (q == NULL) return (NULL);
/*
* Initialize the state variables. See the definition of the Queue
* structure for the definition of each.
*/
q->empty = 1;
q->full = 0;
q->head = 0;
q->tail = 0;
/*
* Allocate and initialize the queue mutex
*/
q->mutex = (pthread_mutex_t *) malloc (sizeof (pthread_mutex_t));
pthread_mutex_init (q->mutex, NULL);
/*
* Allocate and initialize the notFull and notEmpty condition
* variables
*/
q->notFull = (pthread_cond_t *) malloc (sizeof (pthread_cond_t));
pthread_cond_init (q->notFull, NULL);
q->notEmpty = (pthread_cond_t *) malloc (sizeof (pthread_cond_t));
pthread_cond_init (q->notEmpty, NULL);
return (q);
}
そして、main:
int main (int argc, char *argv[]) {
// More initializations
pthread_t *con;
int cons;
int *concount;
queue *fifo;
int i;
pthread_t *pro;
int *procount;
int pros;
pcdata *thread_args;
fifo = queueInit ();
if (fifo == NULL) {
fprintf (stderr, "main: Queue Init failed.\n");
exit (1);
}
// More initializations and creation of the threads:
for (i=0; i<pros; i++){
/*
* Allocate memory for each producer's arguments
*/
thread_args = (pcdata *)malloc (sizeof (pcdata));
if (thread_args == NULL) {
fprintf (stderr, "main: Thread_Args Init failed.\n");
exit (1);
}
/*
* Fill them in and then create the producer thread
*/
thread_args->q = fifo;
thread_args->count = procount;
thread_args->tid = i;
pthread_create (&pro[i], NULL, producer, thread_args);
}
}
}