8

C/Linux でマルチスレッド UDP サーバーを開発したいと考えています。サービスは単一のポート x で実行されているため、単一の UDP ソケットをバインドする可能性しかありません。高負荷の下で動作するために、n 個のスレッド (静的に定義)、たとえば CPU ごとに 1 つのスレッドがあります。epoll_wait を使用して作業をスレッドに配信できるため、スレッドは必要に応じて 'EPOLLET | エポロンショット」。コード例を添付しました:

static int epfd;
static sig_atomic_t sigint = 0;

...

/* Thread routine with epoll_wait */
static void *process_clients(void *pevents)
{
    int rc, i, sock, nfds;
    struct epoll_event ep, *events = (struct epoll_event *) pevents;

    while (!sigint) {
        nfds = epoll_wait(epfd, events, MAX_EVENT_NUM, 500);

        for (i = 0; i < nfds; ++i) {
           if (events[i].data.fd < 0)
                continue;

           sock = events[i].data.fd;

           if((events[i].events & EPOLLIN) == EPOLLIN) {
               printf("Event dispatch!\n");
               handle_request(sock); // do a recvfrom
           } else
               whine("Unknown poll event!\n");

           memset(&ep, 0, sizeof(ep));
           ep.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
           ep.data.fd = sock;

           rc = epoll_ctl(epfd, EPOLL_CTL_MOD, sock, &ep);
           if(rc < 0)
               error_and_die(EXIT_FAILURE, "Cannot add socket to epoll!\n");
       }
    }

    pthread_exit(NULL);
}

int main(int argc, char **argv)
{
    int rc, i, cpu, sock, opts;
    struct sockaddr_in sin;
    struct epoll_event ep, *events;
    char *local_addr = "192.168.1.108";
    void *status;
    pthread_t *threads = NULL;
    cpu_set_t cpuset;

    threads = xzmalloc(sizeof(*threads) * MAX_THRD_NUM);
    events = xzmalloc(sizeof(*events) * MAX_EVENT_NUM);

    sock = socket(PF_INET, SOCK_DGRAM, 0);
    if (sock < 0)
        error_and_die(EXIT_FAILURE, "Cannot create socket!\n");

    /* Non-blocking */
    opts = fcntl(sock, F_GETFL);
    if(opts < 0)
        error_and_die(EXIT_FAILURE, "Cannot fetch sock opts!\n");
    opts |= O_NONBLOCK;
    rc = fcntl(sock, F_SETFL, opts);
    if(rc < 0)
        error_and_die(EXIT_FAILURE, "Cannot set sock opts!\n");

    /* Initial epoll setup */
    epfd = epoll_create(MAX_EVENT_NUM);
    if(epfd < 0)
        error_and_die(EXIT_FAILURE, "Error fetching an epoll descriptor!\n");

    memset(&ep, 0, sizeof(ep));
    ep.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
    ep.data.fd = sock;

    rc = epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ep);
    if(rc < 0)
        error_and_die(EXIT_FAILURE, "Cannot add socket to epoll!\n");

    /* Socket binding */
    sin.sin_family = AF_INET;
    sin.sin_addr.s_addr = inet_addr(local_addr);
    sin.sin_port = htons(port_xy);

    rc = bind(sock, (struct sockaddr *) &sin, sizeof(sin));
    if (rc < 0)
        error_and_die(EXIT_FAILURE, "Problem binding to port! "
                      "Already in use?\n");

    register_signal(SIGINT, &signal_handler);

    /* Thread initialization */
    for (i = 0, cpu = 0; i < MAX_THRD_NUM; ++i) {
        rc = pthread_create(&threads[i], NULL, process_clients, events);
        if (rc != 0)
            error_and_die(EXIT_FAILURE, "Cannot create pthread!\n");

        CPU_ZERO(&cpuset);
        CPU_SET(cpu, &cpuset);

        rc = pthread_setaffinity_np(threads[i], sizeof(cpuset), &cpuset);
        if (rc != 0)
            error_and_die(EXIT_FAILURE, "Cannot create pthread!\n");

        cpu = (cpu + 1) % NR_CPUS_ON;
    }

    printf("up and running!\n");

    /* Thread joining */
    for (i = 0; i < MAX_THRD_NUM; ++i) {
        rc = pthread_join(threads[i], &status);
        if (rc != 0)
            error_and_die(EXIT_FAILURE, "Error on thread exit!\n");
    }

    close(sock);
    xfree(threads);
    xfree(events);

    printf("shut down!\n");

    return 0;
}

これは epoll でこのシナリオを処理する適切な方法ですか? 今回はソケットのイベントキューがブロックされているため、_handle_request_ 関数はできるだけ早く戻る必要がありますか?!

返信ありがとうございます。

4

2 に答える 2

8

単一の UDP ソケットしか使用していないため、epoll を使用しても意味がありません。代わりにブロッキング recvfrom を使用してください。

ここで、処理する必要があるプロトコルに応じて (各 UDP パケットを個別に処理できる場合)、(スレッド プール内の) 複数のスレッドから同時に recvfrom を実際に呼び出すことができます。OS は、1 つのスレッドだけが UDP パケットを受信するようにします。このスレッドは、handle_request で実行する必要があることは何でも実行できます。

ただし、UDP パケットを特定の順序で処理する必要がある場合、プログラムを並列化する機会はあまりないでしょう...

于 2010-10-18T19:36:28.440 に答える
-1

いいえ、これは思い通りにはいきません。ワーカー スレッドが epoll インターフェイス経由で到着するイベントを処理するには、別のアーキテクチャが必要です。

設計例 (これを行うにはいくつかの方法があります) 用途: SysV/POSIX セマフォ。

  • マスタースレッドに n 個のサブスレッドとセマフォを生成させてから、ソケット (または何でも) の epolling をブロックします。

  • セマフォのダウン時に各サブスレッドをブロックします。

  • マスター スレッドがブロックを解除すると、イベントが何らかのグローバル構造に格納され、イベントごとにセマフォが 1 回アップされます。

  • サブスレッドはブロックを解除し、イベントを処理し、セマフォが 0 に戻ると再びブロックします。

すべてのスレッド間で共有されるパイプを使用して、セマフォと非常によく似た機能を実現できます。これにより、セマフォの代わりにブロックselect()でき、他のイベント (タイムアウト、他のパイプなど) でスレッドを起動するために使用できます。

この制御を逆にして、ワーカーがタスクを要求したときにマスター スレッドを起動させることもできます。ただし、上記のアプローチはあなたの場合に適していると思います。

于 2010-10-18T16:14:41.610 に答える