4

IOCP を使用する既存の Windows C++ コードを Linux に移植しようとしています。epoll_wait高い同時実行性を達成するために使用することに決めたので、受信したデータをいつ処理しようとするかという理論的な問題にすでに直面しています。

2 つのスレッドが を呼び出しepoll_wait、Linux が最初のスレッドのブロックを解除し、すぐに 2 番目のスレッドのブロックを解除するような 2 つの結果メッセージを受信したとします。

例 :

Thread 1 blocks on epoll_wait
Thread 2 blocks on epoll_wait
Client sends a chunk of data 1
Thread 1 deblocks from epoll_wait, performs recv and tries to process data
Client sends a chunk of data 2
Thread 2 deblocks, performs recv and tries to process data.

このシナリオは考えられますか?つまり、それは発生することができますか?

recv/processing コードでの同期の実装を避けるために、それを防ぐ方法はありますか?

4

5 に答える 5

5

複数のスレッドが同じ epoll ハンドルのセットから読み取る場合は、epoll ハンドルをワンショット レベル トリガー モードにすることをお勧めしますEPOLLONESHOT。これにより、1 つのスレッドがトリガーされたハンドルを監視した後、 を使用epoll_ctlしてハンドルを再装備するまで、他のスレッドはそれを監視しません。

読み取りパスと書き込みパスを個別に処理する必要がある場合は、読み取りスレッド プールと書き込みスレッド プールを完全に分割することをお勧めします。読み取りイベント用に 1 つの epoll ハンドルを持ち、書き込みイベント用に 1 つの epoll ハンドルを持ち、スレッドをいずれか一方に排他的に割り当てます。さらに、読み取りパスと書き込みパスに個別のロックを設定します。もちろん、ソケットごとの状態を変更する限り、読み取りスレッドと書き込みスレッド間の相互作用には注意する必要があります。

その分割アプローチを使用する場合は、ソケット クロージャの処理方法を検討する必要があります。ほとんどの場合、読み取りパスと書き込みパスの両方に対して、追加の共有データ ロックと、共有データ ロックの下に設定された「クロージャの確認」フラグが必要になります。その後、読み取りスレッドと書き込みスレッドは応答するために競合する可能性があり、最後に応答したスレッドが共有データ構造をクリーンアップします。つまり、次のようなものです。

void OnSocketClosed(shareddatastructure *pShared, int writer)
{
  epoll_ctl(myepollhandle, EPOLL_CTL_DEL, pShared->fd, NULL);
  LOCK(pShared->common_lock);
  if (writer)
    pShared->close_ack_w = true;
  else
    pShared->close_ack_r = true;

  bool acked = pShared->close_ack_w && pShared->close_ack_r;
  UNLOCK(pShared->common_lock);

  if (acked)
    free(pShared);
}
于 2011-04-04T17:31:58.160 に答える
3

ここでは、処理しようとしている状況は次のようなものであると想定しています。

一度にデータを受信したい複数の(おそらく非常に多くの)ソケットがあります。

スレッドAの最初の接続からデータを最初に受信したときにデータの処理を開始し、スレッドAでデータを処理し終えるまで、この接続からのデータが他のスレッドで処理されないようにします。

これを行っているときに、別の接続でデータを受信した場合は、スレッドBがそのデータを選択して処理すると同時に、スレッドBがこの接続を処理するまで他のユーザーがこの接続を処理できないようにします。

このような状況では、複数のスレッドで同じepoll fdを使用してepoll_wait()を使用することが、かなり効率的なアプローチであることがわかります(必ずしも最も効率的であるとは言えません)。

ここでの秘訣は、EPOLLONESHOTフラグを使用して個々の接続fdsをepollfdに追加することです。これにより、epoll_wait()からfdが返されると、epollに再度監視するように特に指示するまで、fdは監視されなくなります。これにより、このスレッドが接続を再度監視するようにマークするまで、他のスレッドが同じ接続を処理できないため、この接続を処理するスレッドが干渉を受けないことが保証されます。

epoll_ctl()およびEPOLL_CTL_MODを使用して、EPOLLINまたはEPOLLOUTを再度モニターするようにfdを設定できます。

複数のスレッドでこのようなepollを使用することの重要な利点は、1つのスレッドが接続を終了し、それをepoll監視セットに追加すると、前の処理スレッドが戻る前であっても、epoll_wait()に残っている他のスレッドがすぐに監視することです。 epoll_wait()に。ちなみに、別のスレッドがその接続をすぐに取得する場合(したがって、この接続のデータ構造をフェッチして前のスレッドのキャッシュをフラッシュする必要がある場合)、キャッシュデータの局所性が不足するため、これも不利になる可能性があります。何が最も効果的に機能するかは、正確な使用パターンによって異なります。

異なるスレッドで同じ接続で後で受信したメッセージを処理しようとしている場合、epollを使用するこのスキームは適切ではなく、ワーカースレッドにフィードする効率的なキューにフィードするリスニングスレッドを使用するアプローチの方が適している場合があります。

于 2011-04-04T17:32:46.993 に答える
2

複数のスレッドから epoll_wait() を呼び出すことは悪い考えであることを指摘する以前の回答はほぼ間違いなく正しいですが、同じハンドルで複数のスレッドから呼び出されたときに何が起こるかを試してみるために、質問に十分に興味をそそられました。同じソケットを待っています。次のテストコードを書きました。

#include <netinet/in.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>

struct thread_info {
  int number;
  int socket;
  int epoll;
};

void * thread(struct thread_info * arg)
{
    struct epoll_event events[10];
    int s;
    char buf[512];

    sleep(5 * arg->number);
    printf("Thread %d start\n", arg->number);

    do {
        s = epoll_wait(arg->epoll, events, 10, -1);

        if (s < 0) {
            perror("wait");
            exit(1);
        } else if (s == 0) {
            printf("Thread %d No data\n", arg->number);
            exit(1);
        }
        if (recv(arg->socket, buf, 512, 0) <= 0) {
            perror("recv");
            exit(1);
        }
        printf("Thread %d got data\n", arg->number);
    } while (s == 1);

    printf("Thread %d end\n", arg->number);

    return 0;
}

int main()
{
    pthread_attr_t attr;
    pthread_t threads[2];
    struct thread_info thread_data[2];
    int s;
    int listener, client, epollfd;
    struct sockaddr_in listen_address;
    struct sockaddr_storage client_address;
    socklen_t client_address_len;
    struct epoll_event ev;

    listener = socket(AF_INET, SOCK_STREAM, 0);

    if (listener < 0) {
        perror("socket");
        exit(1);
    }

    memset(&listen_address, 0, sizeof(struct sockaddr_in));
    listen_address.sin_family = AF_INET;
    listen_address.sin_addr.s_addr = INADDR_ANY;
    listen_address.sin_port = htons(6799);

    s = bind(listener,
             (struct sockaddr*)&listen_address,
             sizeof(listen_address));

    if (s != 0) {
        perror("bind");
        exit(1);
    }

    s = listen(listener, 1);

    if (s != 0) {
        perror("listen");
        exit(1);
    }

    client_address_len = sizeof(client_address);
    client = accept(listener,
                    (struct sockaddr*)&client_address,
                    &client_address_len);

    epollfd = epoll_create(10);
    if (epollfd == -1) {
        perror("epoll_create");
        exit(1);
    }

    ev.events = EPOLLIN;
    ev.data.fd = client;
    if (epoll_ctl(epollfd, EPOLL_CTL_ADD, client, &ev) == -1) {
        perror("epoll_ctl: listen_sock");
        exit(1);
    }

    thread_data[0].number = 0;
    thread_data[1].number = 1;
    thread_data[0].socket = client;
    thread_data[1].socket = client;
    thread_data[0].epoll = epollfd;
    thread_data[1].epoll = epollfd;

    s = pthread_attr_init(&attr);
    if (s != 0) {
        perror("pthread_attr_init");
        exit(1);
    }

    s = pthread_create(&threads[0],
                       &attr,
                       (void*(*)(void*))&thread,
                       &thread_data[0]);

    if (s != 0) {
        perror("pthread_create");
        exit(1);
    }

    s = pthread_create(&threads[1],
                       &attr,
                       (void*(*)(void*))&thread,
                       &thread_data[1]);

    if (s != 0) {
        perror("pthread_create");
        exit(1);
    }

    pthread_join(threads[0], 0);
    pthread_join(threads[1], 0);

    return 0;
}

データが到着し、両方のスレッドが epoll_wait() で待機している場合、1 つだけが返されますが、後続のデータが到着すると、データを処理するためにウェイクアップするスレッドは 2 つのスレッド間で事実上ランダムになります。どのスレッドが起こされたかに影響を与える方法を見つけることができませんでした。

epoll_wait を呼び出す 1 つのスレッドが最も理にかなっている可能性が高く、イベントはワーカー スレッドに渡されて IO をポンピングします。

于 2011-04-04T20:51:46.807 に答える
1

epoll とコアごとのスレッドを使用する高性能ソフトウェアは、それぞれがすべての接続のサブセットを処理する複数の epoll ハンドルを作成すると思います。このようにして作業は分割されますが、あなたが説明した問題は回避されます。

于 2011-04-04T16:50:54.723 に答える
0

通常、epoll単一のスレッドが単一の非同期ソースでデータをリッスンしている場合に使用されます。ビジー待機 (手動でのポーリング) を避けるためepollに、データの準備ができたときに通知するために使用します ( のようselectに)。

複数のスレッドが 1 つのデータ ソースから読み取ることは標準的な方法ではありません。少なくとも、私はそれを悪い方法だと考えています。

複数のスレッドを使用したいが、入力ソースが 1 つしかない場合は、スレッドの 1 つをリッスンしてデータをキューに入れるように指定し、他のスレッドがキューから個々のピースを読み取れるようにします。

于 2011-04-04T16:47:51.950 に答える