0

私は、循環バッファーに情報を格納できるいくつかのスレッド化されたクライアントを持つプログラムを作成しました。サーバーは、循環バッファーからメッセージを読み取る (および「削除する」) ことができます。私が抱えている問題は、バッファがいっぱいになったときです。現在バッファに書き込んでいるクライアントはミューテックスを解放する必要があります (バッファからすべてのデータを削除するサーバー) が、ミューテックスを解放するときにクライアントが競合し、既にビジー状態だったクライアントが最初に送信を続行するとは限りません。だから彼のメッセージは混乱します。これを解決する方法を知っている人はいますか?または、これを行うためのより良い方法について提案がありますか。最終的には、メッセージがバッファに保存され、サーバーによってすべてのクライアントに再送信されるマルチスレッド tcp サーバーを作成したいので、何に使用するかがわかります。

#include <stdio.h>
#include <malloc.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>

struct cBuf{
    int     size;   /* maximum number of slots */
    int     start;  /* index of oldest element (last element) */
    int     end;    /* index at which to write new element (first element) */
    char    *buf;  /* buffer array */
};

struct cBuf cb;

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

/* Initialize buffer*/
void buf_Init(struct cBuf *cb, int size) {
    cb->size  = size + 1; /* empty element */
    cb->start = 0;
    cb->end   = 0; 
    cb->buf = (char *)calloc(cb->size, sizeof(char)); //pointer to beginning of allocated memory
}
/* Clean up buffer*/
void buf_Free(struct cBuf *cb) {
    free(cb->buf);
}

/* Is the the buffer full? if it equals size meaning the buffer is full: end will point to zero (ex. 7%7 is 0) */ 
int buf_IsFull(struct cBuf *cb) {
    return (cb->end + 1) % cb->size == cb->start; 
}

/* Is the buffer empty? Yes if both start & end are zero (are equal) */
int cbIsEmpty(struct cBuf *cb) {
    return cb->end == cb->start; 
}

/* Write an element, overwriting oldest element if buffer is full. App can
   choose to avoid the overwrite by checking cbIsFull(). */
/* When inserting an element acces needs to be blocked (only 1 insert at a time =>mutexes 
 * When buffer is full it should return -1 so we can block the request*/
int buf_Insert(struct cBuf *cb, char *elem) {
    if (buf_IsFull(cb)){
        //buffer is full; cannot write into the buffer
        //sleep(2);
        return 0;
    } else {
        /* When writing to the buffer, the buffer has to be locked first (will block when already locked)*/
        pthread_mutex_lock(&mutex);
        cb->buf[cb->end] = *elem; //we put the new character in the last free slot
        /* we move the index for the next slot one further */
        cb->end = (cb->end + 1) % cb->size; 
        pthread_mutex_unlock(&mutex);
        return 1;
    }
}

/* Read oldest element. Ensure !cbIsEmpty() first. */
int buf_Read(struct cBuf *cb, char *out) {
    if (!cbIsEmpty(cb)){
        /* When reading from the buffer, the buffer has to be locked first (will block when already locked)*/
        pthread_mutex_lock(&mutex);
        *out = cb->buf[cb->start];
        cb->start = (cb->start + 1) % cb->size; //are news oldest character is now the next one
        pthread_mutex_unlock(&mutex);
        return 1;
    }
    //buffer is emtpy
    return 0;
}

void * client(void *cb){
    //sleep (2);
    #ifdef DEBUG
        printf("\nDEBUG (Thread ID:%lu) - Entering thread\t",(unsigned long)pthread_self());
    #endif

    /* main thread does not need to wait and this way all resources are released when terminating*/
    pthread_detach(pthread_self());

    struct cBuf *myData;
    myData = (struct cBuf*) cb;
    int i;

    /* client will post message in the buffer */
    char input[]="Hello World!";

    //printf("I:\t");
    for (i=0; i < sizeof(input); ++ i){ 
        if (buf_Insert(myData, &input[i]) ){
            //printf("[I]%c",input[i]);
        } else {
            //the buffer is full; wait until it has been emptied + go back a step because of lost character
            i--;    
            #ifdef DEBUG
            printf("\nDEBUG (Main Thread) - Full Buffer");  
            #endif
        }
    }
    return 0;
}

int main(void) {
    int bufferSize = 20; /* arbitrary size */
    char out;
    pthread_t thread;
    int j;

    buf_Init(&cb, bufferSize);

    //we make 1 client for starters
    for (j = 0; j<4; j++){
        if(pthread_create (&thread,NULL, client, (void *) &cb) !=0){
            #ifdef DEBUG
            printf("\nDEBUG (Main Thread) - Error while creating thread");
            #endif
        } else {
            #ifdef DEBUG
            printf("\nDEBUG (Main Thread) - Thread created");
            #endif
        }
    }

    while (1){
        /* Server will read (and thus remove) the message */
        if (cbIsEmpty(&cb)){
            //sleep(2);
            #ifdef DEBUG
            printf("\nDEBUG (Main Thread) - Buffer is empty");
            #endif
        } else {
            printf("\tO:\t");
            while (buf_Read(&cb,&out)){
                printf("%c", out);
            }
            #ifdef DEBUG        
            if (buf_Read(&cb, &out) == 0){
                printf("\nDEBUG (Main Thread) - Finished reading");
            }
            #endif
            printf("\n");   
        }
    }

    //empty the buffer; free the allocated memory
    buf_Free(&cb);
    return 0;
}
4

1 に答える 1

0

あなたが説明するのは、スレッドを選択的に通知(およびチェック)するpthread_cond_tために加えて使用する典型的なユースケースです。pthread_mutex_t

于 2012-05-27T17:40:53.243 に答える