3

これは私のサーバーがどのように見えるかです:

-WorkerThread(s):

  • epoll_wait を呼び出し、接続を受け入れ、fd ノンブロッキングを設定します (EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP)
  • EPOLLIN イベントで EAGAIN になるまで recv を呼び出し、すべてのデータをグローバル RecvBuffer (pthread_mutex 同期) にプッシュします。
  • EPOLLOUT イベント時: グローバル SendBuffer にアクセスし、現在準備ができている fd に送信するデータがある場合は、それを実行します (while ループで EAGAIN まで、またはすべてのデータが送信されるまで; パケット全体が送信されると、SendBuffer からポップします)。

-IOThread(s)

  • グローバル RecvBuffer からデータを取得し、それらを処理します
  • 最初にすぐに send を呼び出そうとすることで、応答を送信します。すべてのデータが送信されていない場合は、残りのデータをグローバル SendBuffer にプッシュして WorkerThread から送信します)。

問題は、サーバーがキューに入れられたすべてのデータを送信せず (SendBuffer に残っている)、クライアントの数が増えると「送信されない」データの量が増えることです。テストのために、1 つのワーカースレッドと 1 つの iothread のみを使用していますが、それ以上使用しても違いはないようです。グローバル バッファへのアクセスは、pthread_mutex で保護されています。また、私の応答データ サイズは 130k バイトです (この量のデータを送信するには、少なくとも 3 回の送信呼び出しが必要です)。反対側には、ブロッキング ソケットを使用する Windows クライアントがあります。

どうもありがとうございました!MJ

編集:

はい、デフォルトでは、送信するものがない場合でも EPOLLOUT イベントを待機しています。実装の簡素化とマニュアルページのガイドのために、私はこのようにしました。また、それに対する私の理解は次のようなものでした:

何も送信したくないときにEPOLLOUTイベントを「見逃した」としても、データを送信したいときは、EAGAINとEPOLLOUTが将来トリガーされるまでsendを呼び出すため、問題ありません(ほとんどの場合)

ここで、IN/OUT イベントを切り替えるようにコードを変更しました。

受け入れ時:

event.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
epoll_ctl (pNetServer->m_EventFD, EPOLL_CTL_ADD, infd, &event);

すべてのデータが送信されたとき:

event.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
epoll_ctl (pNetServer->m_EventFD, EPOLL_CTL_MOD, events[i].data.fd, &event);

IOThread で send を呼び出して EAGAIN に到達すると、次のようになります。

event.events = EPOLLOUT | EPOLLET | EPOLLRDHUP;
epoll_ctl (pNetServer->m_EventFD, EPOLL_CTL_MOD, events[i].data.fd, &event);

..そして、私は同じ動作をします。また、EPOLLET フラグを削除してみましたが、何も変わりませんでした

1 つの副次的な質問: EPOLL_CTL_MOD フラグを指定した epoll_ctl は、イベント メンバーを置き換えますか、それとも指定された引数で単に OR しますか?

EDIT3: すべてのデータが送信されるまで、または EAGAIN まで継続的に送信するように IOThread 関数を更新しました。また、すべてのデータを送信したとしても送信しようとしましたが、ほとんどの場合、非ソケットで errno 88 Socket operation を取得していました

EDIT4:「送信コード」のいくつかのバグを修正したので、キューに入れられたデータが送信されていません..送信が完了した直後にクライアントが recv を呼び出したときに取得し、クライアントの数とともに大きくなります。クライアントの send 呼び出しと recv 呼び出しの間に 2 秒の遅延を置くと (呼び出しのブロック)、実行中のクライアントの数に応じて、サーバー上のデータはほとんど失われません (クライアント テスト コードには、1 つの send と 1 つの recv 呼び出しを持つ単純な for ループが含まれています)。繰り返しますが、ET モードの有無にかかわらず試してみました。以下は、データの受信を担当する更新された WorkerThread 関数です。@Admins/Mods 問題が少し違うので、今すぐ新しいトピックを開く必要がありますか?

void CNetServer::WorkerThread(void* param)
{
    CNetServer* pNetServer =(CNetServer*)param;
    struct epoll_event event;
    struct epoll_event *events;
    int s = 0;

//  events = (epoll_event*)calloc (MAXEVENTS, sizeof event);

    while (1)
    {
        int n, i;

//      printf ("BLOCKING NOW! epoll_wait thread %d\n",pthread_self());
        n = pNetServer->m_epollCtrl.Wait(-1);
//      printf ("epoll_wait thread %d\n",pthread_self());
        pthread_mutex_lock(&g_mtx_WorkerThd);
        for (i = 0; i < n; i++)
        {
            if ((pNetServer->m_epollCtrl.Event(i)->events & EPOLLERR))
            {
                // An error has occured on this fd, or the socket is not ready for reading (why were we notified then?)

            //  g_SendBufferArray.RemoveAll( 0 );

                char szFileName[30] = {0};
                sprintf( (char*)szFileName,"fd_%d.txt",pNetServer->m_epollCtrl.Event(i)->data.fd );
                remove(szFileName);

            /*  printf( "\n\n\n");
                printf( "\tDATA LEFT COUNT:%d\n",g_SendBufferArray.size());
                for (int k=0;k<g_SendBufferArray.size();k++)
                    printf( "\tSD: %d DATA LEFT:%d\n",g_SendBufferArray[i]->sd,g_SendBufferArray[i]->nBytesSent );
*/

            //  fprintf (stderr, "epoll error\n");
            //  fflush(stdout);
                close (pNetServer->m_epollCtrl.Event(i)->data.fd);
                continue;
            }
            else if (pNetServer->m_ListenSocket == pNetServer->m_epollCtrl.Event(i)->data.fd)
            {
                // We have a notification on the listening socket, which   means one or more incoming connections. 
                while (1)
                {
                    struct sockaddr in_addr;
                    socklen_t in_len;
                    int infd;
                    char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];

                    in_len = sizeof in_addr;
                    infd = accept (pNetServer->m_ListenSocket, &in_addr, &in_len);
                    if (infd == -1)
                    {
                        if ((errno == EAGAIN) ||
                            (errno == EWOULDBLOCK))
                        {
                            // We have processed all incoming connections.
                            break;
                        }
                        else
                        {
                            perror ("accept");
                            break;
                        }
                    }

                    s = getnameinfo (&in_addr, in_len,
                        hbuf, sizeof hbuf,
                        sbuf, sizeof sbuf,
                        NI_NUMERICHOST | NI_NUMERICSERV);
                    if (s == 0)
                    {
                        printf("Accepted connection on descriptor %d "
                            "(host=%s, port=%s) thread %d\n", infd, hbuf, sbuf,pthread_self());
                    }

                    // Make the incoming socket non-blocking and add it to the list of fds to monitor.
                    CEpollCtrl::SetNonBlock(infd,true);
                    if ( !pNetServer->m_epollCtrl.Add( infd, EPOLLIN, NULL ))
                    {
                        perror ("epoll_ctl");
                        abort ();
                    }

                }
                continue;
            }
            if( (pNetServer->m_epollCtrl.Event(i)->events & EPOLLOUT) )
            {

                pNetServer->DoSend( pNetServer->m_epollCtrl.Event(i)->data.fd );
            }
            if( pNetServer->m_epollCtrl.Event(i)->events & EPOLLIN )
            {
                printf("EPOLLIN TRIGGERED FOR SD: %d\n",pNetServer->m_epollCtrl.Event(i)->data.fd);
                // We have data on the fd waiting to be read. 
                int done = 0;
                ssize_t count = 0;
                char buf[512];
                while (1)
                {
                    count = read (pNetServer->m_epollCtrl.Event(i)->data.fd, buf, sizeof buf);
                    printf("recv sd %d size %d thread %d\n",pNetServer->m_epollCtrl.Event(i)->data.fd,count,pthread_self());
                    if (count == -1)
                    {
                        // If errno == EAGAIN, that means we have read all data. So go back to the main loop.
                        if ( errno != EAGAIN )
                        {
                            perror ("read");
                            done = 1;
                        }
                        break;
                    }
                    else if (count == 0)
                    {
                        //connection is closed by peer.. do a cleanup and close
                        done = 1;
                        break;
                    }
                    else if (count > 0)
                    {
                        static int nDataCounter = 0;
                        nDataCounter+=count;
                        printf("RECVDDDDD %d\n",nDataCounter);
                        CNetServer::s_pRecvContainer->OnData( pNetServer->m_epollCtrl.Event(i)->data.fd, buf, count );
                    }
                }

                if (done)
                {
                    printf ("Closed connection on descriptor %d\n",pNetServer->m_epollCtrl.Event(i)->data.fd);
                    // Closing the descriptor will make epoll remove it from the set of descriptors which are monitored. 
                    close (pNetServer->m_epollCtrl.Event(i)->data.fd);
                }
            }

        }
//      

        pNetServer->IOThread( (void*)pNetServer );

        pthread_mutex_unlock(&g_mtx_WorkerThd);
    }

}

void CNetServer::IOThread(void* param)
{

    BYTEARRAY* pbPacket = new BYTEARRAY;
    int fd;
    struct epoll_event event;
    CNetServer* pNetServer =(CNetServer*)param;

    printf("IOThread startin' !\n");

    for (;;)
    {
        bool bGotIt = CNetServer::s_pRecvContainer->GetPacket( pbPacket, &fd );

        if( bGotIt )
        {

            //process packet here
            printf("Got 'em packet yo !\n");
            BYTE* re = new BYTE[128000];
            memset((void*)re,0xCC,128000);
            buffer_t* responsebuff = new buffer_t( fd, re, 128000 ) ;

            pthread_mutex_lock(&g_mtx_WorkerThd);

            while( 1 )
            {
                    int s;
                    int nSent = send( responsebuff->sd, ( responsebuff->pbBuffer + responsebuff->nBytesSent ),responsebuff->nSize - responsebuff->nBytesSent,0 );
                    printf ("IOT: Trying to send nSent: %d buffsize: %d \n",nSent,responsebuff->nSize - responsebuff->nBytesSent);

                    if (nSent == -1)
                    {

                        if (errno == EAGAIN || errno == EWOULDBLOCK )
                        {
                                g_vSendBufferArray.push_back( *responsebuff );
                                printf ("IOT: now waiting for EPOLLOUT\n");
                                event.data.fd = fd;
                                event.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP;
                                s = epoll_ctl (pNetServer->m_EventFD, EPOLL_CTL_MOD, fd, &event);
                                break;
                                if (s == -1)
                                {
                                    perror ("epoll_ctl");
                                    abort ();
                                }

                        }
                        else
                        {
                            printf( "%d\n",errno );
                            perror ("send");
                            break;
                        }
                        printf ("IOT: WOOOOT\n");
                        break;
                    }
                    else if (nSent == responsebuff->nSize - responsebuff->nBytesSent)
                    {
                        printf ("IOT:all is sent! wOOhOO\n");
                        responsebuff->sd = 0;
                        responsebuff->nBytesSent += nSent;
                        delete responsebuff;
                        break;
                    }
                    else if (nSent < responsebuff->nSize - responsebuff->nBytesSent)
                    {
                        printf ("IOT: partial send!\n");
                        responsebuff->nBytesSent += nSent;

                    }

            }
            delete [] re;

            pthread_mutex_unlock(&g_mtx_WorkerThd);

        }
    }

}
4

1 に答える 1

5
  1. エポレットの使用を中止してください。正しくすることはほとんど不可能です。

  2. 送信するものが何もない場合は、EPOLLOUT イベントを要求しないでください。

  3. 接続で送信するデータがある場合は、次のロジックに従います

    。A) その接続の送信キューにデータが既にある場合は、新しいデータを追加するだけです。あなたは終わった。

    B) すぐにデータを送信してみてください。全部送れば完了です。

    C) この接続の送信キューに残ったデータを保存します。ここで、この接続の EPOLLOUT を要求します。

于 2012-08-03T13:14:35.623 に答える