0

状況

Unix Socket ProgrammingW.Richard Stevenを読んだ後、メイン スレッドが 5 つのサブスレッドが存在するスレッド プールを作成する P2P プログラムを作成しています。次に、 で 50 個のソケットを監視しkqueue()ます。指定されたソケットでイベントが発生すると (ソケットでデータを受信するなど)、メイン スレッドはソケット記述子を共有配列にコピーし、スレッド プール内の 1 つのスレッドを呼び起こします。次に、サブスレッドがソケットからの要求を処理します。また、ミューテックス変数と条件変数の両方を使用して共有配列を保護しました。

質問

著者は、本書の 30.12 節と 30.13 節に、ソース コード「server/serv08.c」と「server/pthread08.c」を、あたかもこのコードに問題がないかのように提示しています。しかし、ある著者が提示したようなコード スニペットを書いたところ、スレッド同期がうまく機能しません。なぜメインスレッドで iput等しくなるのですか?iget

コード
--グローバル変数--

typedef struct tagThread_information
{

    int     sockfd;

} Thread_information;
Thread_information      peer_fds[MAX_THREAD];
pthread_mutex_t         peerfd_mutex;
pthread_cond_t          peerfd_cond;
pthread_mutex_t         STDOUT_mutex;
int                     iput;
int                     iget;

--メインスレッド--

void Wait_for_Handshake(download_session *pSession, int nMaxPeers)
{
struct kevent   ev[50], result[50];
int             kq, i, nfd;
int c = 1;

if( (kq = kqueue()) == -1)
{
    fprintf(stderr, "fail to initialize kqueue.\n");
    exit(0);
}

for(i = 0 ; i < nMaxPeers; i++)
{
    EV_SET(&ev[i], pSession->Peers[i].sockfd, EVFILT_READ, EV_ADD, 0, 0, 0);
    printf("socket : %d\n", (int)ev[i].ident);
}

// create thread pool. initialize mutex and conditional variable.
iput = 0;
iget = 0;
pthread_mutex_init(&STDOUT_mutex, NULL);
pthread_mutex_init(&peerfd_mutex, NULL);
pthread_cond_init(&peerfd_cond, NULL);

// Assume that MAX_THREAD is set to 5.
for(i = 0 ; i < MAX_THREAD; i++) 
    thread_make(i); 


while(1)    
{        
    nfd = kevent(kq, ev, nMaxPeers, result, nMaxPeers, NULL);

    if(nfd == -1)
    {
        fprintf(stderr, "fail to monitor kqueue. error : %d\n", errno);
        nMaxPeers           =   Update_peer(ev, pSession->nPeers);
        pSession->nPeers    =   nMaxPeers;
        continue;
    }

    for(i = 0 ; i < nfd; i++)
    {

        pthread_mutex_lock(&peerfd_mutex);
        peer_fds[iput].sockfd = (int)result[i].ident;
        if( ++iput == MAX_THREAD)
            iput = 0;
        if(iput == iget) // Here is my question.
        {
            exit(0);
        }

        pthread_cond_signal(&peerfd_cond);
        pthread_mutex_unlock(&peerfd_mutex);


    }

}

}

-- サブスレッド --

void * thread_main(void *arg)
{
    int     connfd, nbytes;
    char    buf[2048];
    for( ; ; )
    {
        /* get socket descriptor */
        pthread_mutex_lock(&peerfd_mutex);
        while( iget == iput)
            pthread_cond_wait(&peerfd_cond, &peerfd_mutex);
        connfd  =   peer_fds[iget].sockfd;
        if ( ++iget == MAX_THREAD )
            iget = 0;
        pthread_mutex_unlock(&peerfd_mutex);


    /* process a request on socket descriptor. */
    nbytes  =   (int)read(connfd, buf, 2048);

    if(nbytes == 0)
    {
        pthread_mutex_lock(&STDOUT_mutex);
        printf("\n\nthread %ld, socket : %d, nbytes : %d\n\n\n", (long int)pthread_self(), connfd, nbytes);
        printf("socket closed\n\n");
        pthread_mutex_unlock(&STDOUT_mutex);
        close(connfd);
        continue;
    }
    else if(nbytes == -1)
    {
        close(connfd);
        pthread_mutex_lock(&STDOUT_mutex);
        printf("\n\nthread %ld, socket : %d, nbytes : %d\n\n\n", (long int)pthread_self(), connfd, nbytes);
        perror("socket error : ");
        write(STDOUT_FILENO, buf, nbytes);
        printf("\n\n\n\n");
        pthread_mutex_unlock(&STDOUT_mutex);

        continue;
    }

    pthread_mutex_lock(&STDOUT_mutex);
    printf("\n\nthread %ld, socket : %d, nbytes : %d\n\n\n", (long int)pthread_self(), connfd, nbytes);
    write(STDOUT_FILENO, buf, nbytes);
    printf("\n\n\n\n");
    pthread_mutex_unlock(&STDOUT_mutex);

}
}
4

3 に答える 3

0

あなたが持っているのは、典型的な循環バッファの実装です。

循環バッファーが空の場合、ヘッドとテールのポインター/インデックスは同じ場所を指します。while (iget == iput) ...これは、「キューが空の間...」という意味のコードでテストされていることがわかります。

循環バッファーの先頭に挿入した後、head が tail を指している場合、これは問題です。バッファがオーバーフローしました。バッファがいっぱいになっているにもかかわらず、バッファが空に見えるようになったため、これは問題です。

つまり、未使用の場所が 1 つバッファ内に予約されています。バッファーに 4096 のエントリがある場合、4095 しか満たすことができません。4096 を埋めると、オーバーフローが発生します。空の循環バッファーのように見えます。

(あいまいさを解決するために余分なビットを使用して、インデックスを 0 から 8192 まで移動できるようにすれば、4096 のすべての場所を使用できます。これにより、4095 を超えてゼロにラップする代わりに、ポインターは 4096 ... 8191 に移動し続けます。もちろん、モジュロ 4096 の配列にアクセスすることを覚えておく必要があります.1 つの無駄な要素を回復するためには、複雑さが大きくなります.)

この状態が発生しないように構成されており、内部エラーを構成しているため、コードは循環バッファー オーバーフローを回避しているように見えます。プロデューサーからコンシューマーに一度に渡される記述子が多すぎると、循環バッファーがオーバーフローします。

一般に、循環バッファー コードは、バッファーがいっぱいになったときにただ保釈することはできません。挿入操作がボークしてエラーを返す必要があるか、より多くのスペースを確保するためにブロックする必要があります。したがって、これはサンプル プログラムに固有の仮定に基づく特殊なケースです。

于 2013-08-09T16:24:30.640 に答える
0

「UNIX ネットワーク プログラミング第 1 巻、第 2 版」、第 27.12 章、757 ページ、注釈から の 27 ~ 38 行目までserver/serv08.c:

iputまた、インデックスがインデックスに追いついていないことも確認しigetます。これは、配列が十分に大きくないことを示しています。

参考までに、上記の行を参照してください(ここから取得):

27 for ( ; ; ) {
28   clilen = addrlen;
29   connfd = Accept(listenfd, cliaddr, &clilen);   
30   Pthread_mutex_lock(&clifd_mutex);
31   clifd[iput] = connfd;
32   if (++iput == MAXNCLI)
33     iput = 0;
34   if (iput == iget)
35     err_quit("iput = iget = %d", iput);
36   Pthread_cond_signal(&clifd_cond);
37   Pthread_mutex_unlock(&clifd_mutex);
38 }
于 2013-08-09T16:14:03.510 に答える