2

この質問からepoll_ctl(2)、別のスレッドがブロックしているときに呼び出すことができることがわかりましたepoll_wait(2)。まだ質問があります。

フラグを使用epollすると、EPOLLONESHOT1 つのイベントのみが発生し、fd を使用して再準備する必要がありますepoll_ctl(2)。これは、1 つのスレッドのみが fd から読み取り、結果を適切に処理するために必要です。

以下は、想定される問題をある程度視覚化したタイムラインです。

Thread1:                       Thread2:                  Kernel:
-----------------------------------------------------------------------
epoll_wait();
                                                         Receives chunk
dispatch chunk to thread 2
epoll_wait();                  Handle chunk
                               Still handle chunk        Receives chunk
                               Rearm fd for epoll
?

チャンクが受信された後に fd がリアームされると、疑問符はどうなりますか? epollイベントを発生させますEPOLLINか、それともソケットが読み取り可能であっても無期限にブロックしますか? 私のアーキテクチャはまったく賢明ですか?

4

1 に答える 1

3

あなたのアーキテクチャは合理的であり、動作します。epollファイル記述子を読み取り可能としてマークし、EPOLLINイベントを発生させます。

これに関するドキュメントは少なく、微妙です。の Q/A セクションでman 7 epollは、これについて簡単に説明しています。

Q8 ファイル記述子に対する操作は、収集済みでまだ報告されていないイベントに影響しますか?

A8 既存のファイル記述子に対して 2 つの操作を実行できます。この場合、削除は意味がありません。変更すると、使用可能な I/O が再読み取りされます。

既存のファイル記述子 (既存のファイル記述子は、過去に epoll セットに追加されたファイル記述子です。これには、再準備を待機しているファイル記述子が含まれます) に対して実行できる 2 つの操作は、削除と変更です。マンページに記載されているように、ここでは削除は意味がなく、変更するとファイル記述子の条件が再評価されます。

ただし、実際の実験に勝るものはありません。次のプログラムは、このエッジ ケースをテストします。

#include <stdio.h>
#include <pthread.h>
#include <signal.h>
#include <stdlib.h>
#include <assert.h>
#include <semaphore.h>
#include <sys/epoll.h>
#include <unistd.h>

static pthread_t tids[2];
static int epoll_fd;
static char input_buff[512];
static sem_t chunks_sem;

void *dispatcher(void *arg) {
    struct epoll_event epevent;

    while (1) {
        printf("Dispatcher waiting for more chunks\n");
        if (epoll_wait(epoll_fd, &epevent, 1, -1) < 0) {
            perror("epoll_wait(2) error");
            exit(EXIT_FAILURE);
        }

        ssize_t n;
        if ((n = read(STDIN_FILENO, input_buff, sizeof(input_buff)-1)) <= 0) {
            if (n < 0)
                perror("read(2) error");
            else
                fprintf(stderr, "stdin closed prematurely\n");
            exit(EXIT_FAILURE);
        }

        input_buff[n] = '\0';
        sem_post(&chunks_sem);
    }

    return NULL;
}

void *consumer(void *arg) {
    sigset_t smask;
    sigemptyset(&smask);
    sigaddset(&smask, SIGUSR1);

    while (1) {
        sem_wait(&chunks_sem);
        printf("Consumer received chunk: %s", input_buff);
        /* Simulate some processing... */
        sleep(2);
        printf("Consumer finished processing chunk.\n");
        printf("Please send SIGUSR1 after sending more data to stdin\n");

        int signo;
        if (sigwait(&smask, &signo) < 0) {
            perror("sigwait(3) error");
            exit(EXIT_FAILURE);
        }

        assert(signo == SIGUSR1);

        struct epoll_event epevent;
        epevent.events = EPOLLIN | EPOLLONESHOT;
        epevent.data.fd = STDIN_FILENO;

        if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, STDIN_FILENO, &epevent) < 0) {
            perror("epoll_ctl(2) error when attempting to readd stdin");
            exit(EXIT_FAILURE);
        }

        printf("Readded stdin to epoll fd\n");
    }
}

int main(void) {

    sigset_t sigmask;
    sigfillset(&sigmask);
    if (pthread_sigmask(SIG_SETMASK, &sigmask, NULL) < 0) {
        perror("pthread_sigmask(3) error");
        exit(EXIT_FAILURE);
    }

    if ((epoll_fd = epoll_create(1)) < 0) {
        perror("epoll_create(2) error");
        exit(EXIT_FAILURE);
    }

    struct epoll_event epevent;
    epevent.events = EPOLLIN | EPOLLONESHOT;
    epevent.data.fd = STDIN_FILENO;

    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, STDIN_FILENO, &epevent) < 0) {
        perror("epoll_ctl(2) error");
        exit(EXIT_FAILURE);
    }

    if (sem_init(&chunks_sem, 0, 0) < 0) {
        perror("sem_init(3) error");
        exit(EXIT_FAILURE);
    }

    if (pthread_create(&tids[0], NULL, dispatcher, NULL) < 0) {
        perror("pthread_create(3) error on dispatcher");
        exit(EXIT_FAILURE);
    }

    if (pthread_create(&tids[1], NULL, consumer, NULL) < 0) {
        perror("pthread_create(3) error on consumer");
        exit(EXIT_FAILURE);
    }

    size_t i;
    for (i = 0; i < sizeof(tids)/sizeof(tids[0]); i++) {
        if (pthread_join(tids[i], NULL) < 0) {
            perror("pthread_join(3) error");
            exit(EXIT_FAILURE);
        }
    }

    return 0;
}

これは次のように機能します: ディスパッチャ スレッドはstdinepoll セットに追加し、読み取り可能になるたびにepoll_wait(2)入力をフェッチするために使用します。stdin入力が到着すると、ディスパッチャーはワーカー スレッドを起動します。ワーカー スレッドは入力を出力し、2 秒間スリープして処理時間をシミュレートします。その間、ディスパッチャはメイン ループに戻り、epoll_wait(2)再びブロックします。

stdinワーカー スレッドは、送信して指示するまでリアームしませんSIGUSR1。そのため、さらに何かを に書き込んでから、プロセスstdinに送信SIGUSR1します。ワーカー スレッドはシグナルを受信し、再起動します。stdinこれはその時点ですでに読み取り可能であり、ディスパッチャはすでにepoll_wait(2).

出力から、ディスパッチャーが正しく起動され、すべてが魅力的に機能することがわかります。

Dispatcher waiting for more chunks
testing 1 2 3 // Input
Dispatcher waiting for more chunks // Dispatcher notified worker and is waiting again
Consumer received chunk: testing 1 2 3
Consumer finished processing chunk.
Please send SIGUSR1 after sending more data to stdin
hello world // Input
Readded stdin to epoll fd // Rearm stdin; dispatcher is already waiting
Dispatcher waiting for more chunks // Dispatcher saw new input and is now waiting again
Consumer received chunk: hello world
Consumer finished processing chunk.
Please send SIGUSR1 after sending more data to stdin
于 2015-07-21T11:56:26.600 に答える