状況
Unix Socket ProgrammingのW.Richard Stevenを読んだ後、メイン スレッドが 5 つのサブスレッドが存在するスレッド プールを作成する P2P プログラムを作成しています。次に、 で 50 個のソケットを監視しkqueue()
ます。指定されたソケットでイベントが発生すると (ソケットでデータを受信するなど)、メイン スレッドはソケット記述子を共有配列にコピーし、スレッド プール内の 1 つのスレッドを呼び起こします。次に、サブスレッドがソケットからの要求を処理します。また、ミューテックス変数と条件変数の両方を使用して共有配列を保護しました。
質問
著者は、本書の 30.12 節と 30.13 節に、ソース コード「server/serv08.c」と「server/pthread08.c」を、あたかもこのコードに問題がないかのように提示しています。しかし、ある著者が提示したようなコード スニペットを書いたところ、スレッド同期がうまく機能しません。なぜメインスレッドで iput
等しくなるのですか?iget
コード
--グローバル変数--
typedef struct tagThread_information
{
int sockfd;
} Thread_information;
Thread_information peer_fds[MAX_THREAD];
pthread_mutex_t peerfd_mutex;
pthread_cond_t peerfd_cond;
pthread_mutex_t STDOUT_mutex;
int iput;
int iget;
--メインスレッド--
void Wait_for_Handshake(download_session *pSession, int nMaxPeers)
{
struct kevent ev[50], result[50];
int kq, i, nfd;
int c = 1;
if( (kq = kqueue()) == -1)
{
fprintf(stderr, "fail to initialize kqueue.\n");
exit(0);
}
for(i = 0 ; i < nMaxPeers; i++)
{
EV_SET(&ev[i], pSession->Peers[i].sockfd, EVFILT_READ, EV_ADD, 0, 0, 0);
printf("socket : %d\n", (int)ev[i].ident);
}
// create thread pool. initialize mutex and conditional variable.
iput = 0;
iget = 0;
pthread_mutex_init(&STDOUT_mutex, NULL);
pthread_mutex_init(&peerfd_mutex, NULL);
pthread_cond_init(&peerfd_cond, NULL);
// Assume that MAX_THREAD is set to 5.
for(i = 0 ; i < MAX_THREAD; i++)
thread_make(i);
while(1)
{
nfd = kevent(kq, ev, nMaxPeers, result, nMaxPeers, NULL);
if(nfd == -1)
{
fprintf(stderr, "fail to monitor kqueue. error : %d\n", errno);
nMaxPeers = Update_peer(ev, pSession->nPeers);
pSession->nPeers = nMaxPeers;
continue;
}
for(i = 0 ; i < nfd; i++)
{
pthread_mutex_lock(&peerfd_mutex);
peer_fds[iput].sockfd = (int)result[i].ident;
if( ++iput == MAX_THREAD)
iput = 0;
if(iput == iget) // Here is my question.
{
exit(0);
}
pthread_cond_signal(&peerfd_cond);
pthread_mutex_unlock(&peerfd_mutex);
}
}
}
-- サブスレッド --
void * thread_main(void *arg)
{
int connfd, nbytes;
char buf[2048];
for( ; ; )
{
/* get socket descriptor */
pthread_mutex_lock(&peerfd_mutex);
while( iget == iput)
pthread_cond_wait(&peerfd_cond, &peerfd_mutex);
connfd = peer_fds[iget].sockfd;
if ( ++iget == MAX_THREAD )
iget = 0;
pthread_mutex_unlock(&peerfd_mutex);
/* process a request on socket descriptor. */
nbytes = (int)read(connfd, buf, 2048);
if(nbytes == 0)
{
pthread_mutex_lock(&STDOUT_mutex);
printf("\n\nthread %ld, socket : %d, nbytes : %d\n\n\n", (long int)pthread_self(), connfd, nbytes);
printf("socket closed\n\n");
pthread_mutex_unlock(&STDOUT_mutex);
close(connfd);
continue;
}
else if(nbytes == -1)
{
close(connfd);
pthread_mutex_lock(&STDOUT_mutex);
printf("\n\nthread %ld, socket : %d, nbytes : %d\n\n\n", (long int)pthread_self(), connfd, nbytes);
perror("socket error : ");
write(STDOUT_FILENO, buf, nbytes);
printf("\n\n\n\n");
pthread_mutex_unlock(&STDOUT_mutex);
continue;
}
pthread_mutex_lock(&STDOUT_mutex);
printf("\n\nthread %ld, socket : %d, nbytes : %d\n\n\n", (long int)pthread_self(), connfd, nbytes);
write(STDOUT_FILENO, buf, nbytes);
printf("\n\n\n\n");
pthread_mutex_unlock(&STDOUT_mutex);
}
}