3

複数のクライアントが異なる長さのメッセージをバッファに書き込む循環バッファを作成しました。サーバーはそれらを読み取ります。それはコードを消費者/生産者の問題に基づいていました。問題は、バッファがいっぱいになり、サーバーがバッファからすべてのデータを削除すると、クライアントは書き込み操作を再開するように通知されますが、代わりに別のクライアント (別のスレッド内) がバッファにメッセージを書き始めることです。メッセージが順不同で到着しないように、バッファーがいっぱいになる前に既に書き込みを行っていたクライアントが操作を再開するようにします。

これは私のコードです(多くのテストコードを削除しました)

#include <stdio.h>
#include <malloc.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>

#define BUFFER_SIZE 8
#define NUM_THREADS 4

struct cBuf{
    char    *buf; 
    int     size;   
    int     start;
    int     end;   
    pthread_mutex_t mutex;
    pthread_cond_t  buffer_full;
    pthread_cond_t  buffer_empty;
};

struct cBuf cb;


void buf_Init(struct cBuf *cb, int size) {
    int i;
    cb->size  = size + 1; 
    cb->start = 0;
    cb->end   = 0; 
    cb->buf = (char *)calloc(cb->size, sizeof(char)); 
    for (i=0;i<size;i++) cb->buf[i]='_';

}

void buf_Free(struct cBuf *cb) {
    free(cb->buf);
}

int buf_IsFull(struct cBuf *cb) {
    return (cb->end + 1) % cb->size == cb->start; 
}

int buf_IsEmpty(struct cBuf *cb) {
    return cb->end == cb->start; 
}

int buf_Insert(struct cBuf *cb, char *elem) {

    int i,j;

    pthread_mutex_lock(&(cb->mutex));
    for (i=0; i < strlen(elem); ++ i){
        if (buf_IsFull(cb)==1) printf("\nProducer (buf_Insert) is waiting because of full buffer");
        while(buf_IsFull(cb)){  
            pthread_cond_signal(&(cb->buffer_full));            
            pthread_cond_wait(&(cb->buffer_empty),&(cb->mutex));
        } 
        cb->buf[cb->end] = elem[i]; 
        cb->end = (cb->end + 1) % cb->size;     
        printf("%c [INPUT]",elem[i]);
    }

    pthread_cond_signal(&(cb->buffer_full));
    pthread_mutex_unlock(&(cb->mutex));     
    return 0;       
}

int buf_Read(struct cBuf *cb, char *out) {
    int i,j;

    pthread_mutex_lock(&(cb->mutex));
    if (buf_IsEmpty(cb))printf("\nConsumer (buf_Read) is waiting because of empty buffer\n");
    while(buf_IsEmpty(cb)){
        pthread_cond_wait(&(cb->buffer_full),&(cb->mutex));
    }
    for (i=0;i<BUFFER_SIZE-1;i++){
    printf("\n");
        if (cb->start == cb->end) break;        
        out[i] = cb->buf[cb->start];
        cb->buf[cb->start] = '_';
        cb->start = (cb->start + 1) % cb->size; 
        printf("%c [OUTPUT]",out[i]);
    }
    pthread_cond_signal(&(cb->buffer_empty));
    pthread_mutex_unlock(&(cb->mutex)); 
    return 0;
}

void * client(void *cb){

    pthread_detach(pthread_self());

    struct cBuf *myData;
    myData = (struct cBuf*) cb;
    char input[]="Hello World!";
    if (buf_Insert(myData, input)){
        //succes on return 0
        printf("\n");
    } 

    return 0;
}

int main(void) {
    char out[60];
    pthread_t thread;
    int i;
    /* Initialise conditioners*/
    pthread_cond_init(&(cb.buffer_full),NULL);
    pthread_cond_init(&(cb.buffer_empty),NULL);

    buf_Init(&cb, BUFFER_SIZE);

    for (i = 0; i<NUM_THREADS; i++){
            if(pthread_create (&thread,NULL, client, (void *) &cb) !=0){
        } else {

        }
    }

    while (1){
        if (buf_Read(&cb,out)){
        } 
    }

    //empty the buffer; free the allocated memory
    buf_Free(&cb);
    return 0;
}
4

1 に答える 1

3

Producer/consumerのコメントで既に説明しましたが、 buffer が producer からの入力よりも小さい場合、デッドロックになっているようですが、それらはコメントであるため、ここに答えがあります:

キューに部分的なメッセージを入れてはいけません。絶対に書かないようにしてください。

メッセージの書き込みを開始する前に十分なスペースがあるかどうかを確認し、ない場合はすぐに buffer_empty を待つか、キューを変更して割り当てられたデータに共有ポインターを送信することができます (コンシューマーまたは参照カウントに所有権を渡す) または何か、そのため、各メッセージはキュー内の 1 つのスロットのみを占有し、残りのためにメモリを割り当てます。何が最善かは、メッセージの正確な性質によって異なります。部分的なメッセージがない限り、何でも構いません。

どの特定のライターがメッセージを終了し、それだけで目を覚ます必要があるかを記録することは可能ですが、非常に複雑になります。同期はそのままでは困難です。追加の要件を課して、これ以上困難にしないでください。

実際、これが宿題でない限り (同期がどのように機能するかを学ぶために行うという意味で)、既製のメッセージ キューを探してください。SysV-IPC ソケットまたはデータグラム モードの unix-domain ソケットは、頭に浮かぶ 2 つのオプションです。そうするライブラリを探すこともできます。

于 2012-05-28T14:09:05.803 に答える