0

目的: N 個のノード (異なるマシンで実行されている) は、相互に TCP 接続を確立することによって相互に通信する必要があります。メッセージの送受信は、プロセスによって作成された 2 つのスレッドによって行われます。最初に、メイン プロセスはすべてのノードを相互に接続し、2 つのスレッドを作成して、スレッドがデータの送受信に使用できるファイル記述子のリストを与えます。以下の構造体は、メイン プロセスによって埋められ、スレッドに渡されます。

typedef struct
{
    char hostName[MAXIMUM_CHARACTERS_IN_HOSTNAME];  /* Host name of the node */
    char portNumber[MAXIMUM_PORT_LENGTH];           /* Port number of the node */
    char nodeId[MAXIMUM_NODE_ID_LENGTH];            /* Node ID of the node */
    int socketFd;                                   /* Socket file descriptor */
    int socketReady;        /* Flag to indicate if socket information is filled */
}SNodeInformation;

PS: socketFd は、接続が確立された方法 (ノードからの接続をリッスンするか、ノードに接続するか) に応じて、accept() または socket() によって受信されるソケット記述子です。

サイズ MAX_NUM_OF_NODES の SNodeInformation の配列が使用されます。

送信スレッドは nodeInformation を通過し、以下に示すように、それ自体を除くすべてのノードにメッセージ「Hello」を送信します。

void *sendMessageThread(void *pNodeInformation) {

    int i;
    int ownNodeId;
    int bytesSent = 0;
    char ownHostName[MAXIMUM_CHARACTERS_IN_HOSTNAME];

    SNodeInformation *nodeInformation = (SNodeInformation *) pNodeInformation;
    SNodeInformation *iterNodeInformation;

    printf("SendMessageThread: Send thread created\n");

    if(gethostname(ownHostName, MAXIMUM_CHARACTERS_IN_HOSTNAME) != 0) {
        perror("Error: sendMessageThread, gethostname failed\n");
        exit(1);
    }

    for(i=0, iterNodeInformation=nodeInformation ; i<MAXIMUM_NUMBER_OF_NODES ; i++, iterNodeInformation++) {
        if(strcmp((const char*) iterNodeInformation->hostName, (const char*) ownHostName) != 0)  {
            /* Send message to all nodes except yourself */
            bytesSent = send(iterNodeInformation->socketFd, "Hello", 6, 0);

            if(bytesSent == -1) {
                printf("Error: sendMessageThread, sending failed, code: %s FD %d\n",     strerror(errno), iterNodeInformation->socketFd);
            }
        }
    }

    pthread_exit(NULL);
}

以下に示すように、受信スレッドは nodeInformation を通過し、ファイル記述子セットを設定し、select を使用して着信データを待ちます。

void *receiveMessageThread(void *pNodeInformation)
{
    int i;
    int fileDescriptorMax = -1;
    int doneReceiving = 0;
    int numberOfBytesReceived = 0;
    int receiveCount = 0;
    fd_set readFileDescriptorList;
    char inMessage[6];

    SNodeInformation *nodeInformation = (SNodeInformation *) pNodeInformation;
    SNodeInformation *iterNodeInformation;

    printf("ReceiveMessageThread: Receive thread created\n");

    /* Initialize the read file descriptor */
    FD_ZERO(&readFileDescriptorList);

    for(i=0, iterNodeInformation=nodeInformation ; i<MAXIMUM_NUMBER_OF_NODES ; i++, iterNodeInformation++) {
        FD_SET(iterNodeInformation->socketFd, &readFileDescriptorList);

        if(iterNodeInformation->socketFd > fileDescriptorMax) {
            fileDescriptorMax = iterNodeInformation->socketFd;
        }
    }

    printf("ReceiveMessageThread: fileDescriptorMax:%d\n", fileDescriptorMax);

    while(!doneReceiving) {
        if (select(fileDescriptorMax+1, &readFileDescriptorList, NULL, NULL, NULL) == -1) {
            perror("Error receiveMessageThread, select failed \n");
            return -1;
        }

        for(i=0 ; i<fileDescriptorMax ; i++) {
            if (FD_ISSET(i, &readFileDescriptorList)) {
                /* Check if any FD was set */
                printf("ReceiveThread: FD set %d\n", i);

                /* Receive data from one of the nodes */
                if ((numberOfBytesReceived = recv(i, &inMessage, 6, 0)) <= 0) {
                    /* Got error or connection closed by client */
                    if (numberOfBytesReceived == 0) {
                        /* Connection closed */
                        printf("Info: receiveMessageThread, node %d hung up\n", i);
                    }
                    else {
                        perror("Error: receiveMessageThread, recv FAILED\n");
                    }

                    close(i);

                    /* Remove from Master file descriptor set */
                    FD_CLR(i, &readFileDescriptorList); 
                    doneReceiving = 1;
                }
                else {
                    /* Valid data from a node */
                    inMessage[6] = '\0';

                    if(++receiveCount == MAXIMUM_NUMBER_OF_NODES-1) {
                        doneReceiving = 1;
                    }

                    printf("ReceiveThread: %s received, count: %d\n", inMessage, rece    iveCount);
                }
            }
        }
    }
    pthread_exit(NULL);
}

期待される出力: P1 (最初に開始) と P2 を machine1 で実行し、もう 1 つを machine2 で実行する 2 つのプロセスだけで試しました。マシン内の両方のプロセスが最初に接続し、次にスレッドがメッセージ「Hello」を送受信して終了する必要があります。

観測された出力: P1 はメッセージを送信でき、P2 (受信スレッド) はメッセージ「Hello」を受信できます。しかし、P1 (受信スレッド) は P2 (送信スレッド) からメッセージを取得できません。アプリケーションコードは両方のマシンで同じですが、毎回、最初に開始されたプロセスは他のプロセスからメッセージを取得しません。ファイル記述子が設定されているかどうかを確認するために印刷を追加しましたが、P1 では表示されず、P2 でのみ表示されます。受信プロセスの送信は失敗せず、6 で返されます。ファイル記述子の最大値を確認しましたが、正しいです。

最初に P2 を開始し、次に P1 を開始すると、P1 が P2 からメッセージを受信し、P2 が P1 からのメッセージを無限に待機している間に存在することがわかります。

問題がソケット記述子の不適切な使用によるものなのか、それともスレッドによるものなのか、よくわかりません。

4

1 に答える 1

1

2 つの問題:

1 設定されているファイル記述子のループ テストには、設定されたすべてのファイル記述子が含まれているわけではありません。(このプログラミング エラーが、OP に記載されている誤動作の原因であると予想されます。)

2 に渡されたファイル記述子のセットはselect()によって変更されるselect()ため、select()再度使用する前にセットを再初期化する必要があります。(プログラミング エラーは、複数のソケットからデータを受信した場合にのみ顕著になります。)

OP のコードに対する次の mod/s を参照してください。

void *receiveMessageThread(void *pNodeInformation)
{
    ...

    printf("ReceiveMessageThread: Receive thread created\n");

    while(!doneReceiving) {
        /* Initialize the read-set of file descriptors */

        /* Issue 2 fixed from here ... */
        FD_ZERO(&readFileDescriptorList);

        for(i=0, iterNodeInformation=nodeInformation ; i<MAXIMUM_NUMBER_OF_NODES ; i++, iterNodeInformation++) {
            FD_SET(iterNodeInformation->socketFd, &readFileDescriptorList);

            if (iterNodeInformation->socketFd > fileDescriptorMax) {
                fileDescriptorMax = iterNodeInformation->socketFd;
            }
        }
        /* ... up to here. */

        printf("ReceiveMessageThread: fileDescriptorMax:%d\n", fileDescriptorMax);

        if (select(fileDescriptorMax+1, &readFileDescriptorList, NULL, NULL, NULL) == -1) {
            perror("Error receiveMessageThread, select failed \n");
            return -1;
        }

        for(i=0 ; i <= fileDescriptorMax ; i++) { /* Issue 1 fixed here. */
            ...
于 2012-10-07T16:06:38.413 に答える