マスター スレッドが起動accept()し、新しいクライアント ソケットをスレーブ スレッドに渡すサーバーの実装に問題があります (以前に固定サイズのスレッド プールを作成しました)。
これは、整数の単純な配列を使用してクライアント ソケットのスタックを表すように実装されており、accept()呼び出しが成功するといっぱいになります。スレーブ スレッドごとに 1 つのミューテックスがあり、現在のクライアント ファイル記述子が に設定されている場合、各スレーブは暗黙的に解放されます-1(要求を満たす最後に、スレッド コードでそのように設定されます)。
ユース ケースでは、シェル スクリプトを使用して、指定された数のクライアント (たとえば、6 つの同時クライアント (6 プロセス)) を起動します。TCP 接続のセットアップを確認すると、 でエラーを受信するクライアントはありませんconnect()。
ここに問題があります。6 つのクライアントを使用したこの特定のテスト シナリオでは、サーバーは予想される 6つの成功メッセージ( ) から5 つaccept()しか出力していません。よくわかりませんが、受け入れるクライアント接続の 1 つが単に失われているように感じます!> Server got connection from ...
さらに奇妙なことに、受け入れられたソケットの受信とそれをスレーブ スレッドにディスパッチする間にcontinue( ) を追加しました。//MAGICAL TESTこれにより、continueスレーブは作業を受け取りませんが、すべてのクライアント接続が期待どおりに出力されます。正確にはわかりませんが、マスタースレッドが作業をスレーブにディスパッチしている間に、受け入れキュー内のクライアント接続の 1 つがドロップされる可能性はありますか?
編集:これは、同時リクエストの数よりも少ない数のスレーブスレッドがある場合にのみ発生します。それでも、その場合に余分な接続の1つがマスタースレッドによって出力されないように見える理由がわかりません(たとえば、2つのスレーブと6つの同時要求)。
以下で定義されるスレッドローカルデータ:
#define PENDING_CONNS 35
typedef struct _ThreadData {
    pthread_t m_Thread;
    pthread_mutex_t m_Mutex;
    char* m_Task;
    int m_ClientFD;
    unsigned int m_Index;
    unsigned short int m_Networking;
    char* m_OutputDir;
} ThreadData;
static ThreadData *g_Workers = NULL;
static unsigned int g_NumWorkers = 0;
static pthread_barrier_t g_StartBarrier;
次に、マスター スレッド セクションを示します。
/*
 * Executed by the coordinating thread. Awaits and passes new client connections to free working threads.
 */
int RunServer(char* port) {
    // Listen on sock_fd, new connection on new_fd.
    int sockfd, new_fd;
    struct addrinfo hints, *servinfo, *p;
    // Connector's address information.
    struct timeval timeout;
    struct sockaddr_storage their_addr;
    socklen_t sin_size;
    //  struct sigaction sa; //TODO: possible signal handling later.
    int yes = 1;
    char s[INET6_ADDRSTRLEN];
    int rv;
    memset(&hints, 0, sizeof hints);
    hints.ai_family = AF_UNSPEC;
    hints.ai_socktype = SOCK_STREAM;
    hints.ai_flags = AI_PASSIVE; // use my IP
    if ((rv = getaddrinfo(NULL, port, &hints, &servinfo)) != 0) {
        fprintf(stderr, "> Server: getaddrinfo: %s\n", gai_strerror(rv));
        return -1;
    }
    // Loop through all the results and bind to the first we can
    for (p = servinfo; p != NULL; p = p->ai_next) {
        if ((sockfd = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1) {
            perror("> Server: socket");
            continue;
        }
        if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1) {
            perror("> Server: setsockopt");
            return -1;
        }
        if (bind(sockfd, p->ai_addr, p->ai_addrlen) == -1) {
            close(sockfd);
            perror("> Server: bind");
            continue;
        }
        break;
    }
    if (p == NULL) {
        fprintf(stderr, "> Server: failed to bind\n");
        return -1;
    }
    // All done with this structure.
    freeaddrinfo(servinfo);
    //PENDING_CONNS defined as 25
    if (listen(sockfd, PENDING_CONNS) == -1) {
        perror("> Server: listen");
        return -1;
    }
    printf("> Server: waiting for connections...\n");
    fd_set_blocking(sockfd, 0); // NON BLOCKING
    int requestStack[1000];
    int stackTop = 0;
    // Main loop of coordinating thread.
    while (1) {
        sin_size = sizeof their_addr;
        new_fd = accept(sockfd, (struct sockaddr *) &their_addr, &sin_size);
        if (new_fd == -1) {
            //queue was probably empty...
        }
        else {
            inet_ntop(their_addr.ss_family,
                GetInAddress((struct sockaddr *) &their_addr), s, sizeof s);
            printf("> Server: got connection from %s\n", s);
            requestStack[stackTop++] = new_fd;
            printf("> Server: stack top is now %i with value %i.\n", stackTop, requestStack[stackTop-1]);
        }
        // MAGICAL TEST: continue;
        // Linear search for a free worker thread...
        unsigned int i;
        int rc;
        for (i = 0; i < g_NumWorkers && stackTop > 0; i++) {
            rc = pthread_mutex_trylock(&(g_Workers[i].m_Mutex));
            if (rc == EBUSY)  // If it was already locked, skip worker thread.
                continue;
            else if (!rc) { // If we managed to lock the thread's mutex...
                if (g_Workers[i].m_ClientFD == -1) { // Check if it is currently out of work.
                    g_Workers[i].m_ClientFD = requestStack[--stackTop];
                    printf("> Server: master thread assigned fd %i to thread %u\n", requestStack[stackTop], i);
                }
                rc = pthread_mutex_unlock(&(g_Workers[i].m_Mutex));
                if (rc) {
                    printf("> Server: main thread mutex unlock failed on thread %i with error %i!\n", i, rc);
                    return -1;
                }
            }
            else
                printf("> Server: main thread try-lock failed on thread %i with error %i!\n", i, rc);
        }
    }
    return 0;
}
コードが冗長で、man ページから基本的な情報が抜けていた可能性があることをお詫びします。
リクエストに応じて、スレーブ スレッド関数を次に示します。
static void *WorkRoutine(void *args) {
    ThreadData * const threadData = (ThreadData*) args;
    pthread_mutex_t * const tMutex = &(threadData->m_Mutex);
    const unsigned int tIndex = threadData->m_Index;
    int clientFD = threadData->m_ClientFD;
    // Each thread initializes its own mutex.
    int rc = pthread_mutex_init(tMutex, NULL);
    if (!rc)
        printf("> Slave %u: mutex %p initialization was sucessful!\n", tIndex, tMutex);
    if (rc && rc != PTHREAD_BARRIER_SERIAL_THREAD) {
        printf("> Slave %u: failed mutex initialization!\n", tIndex);
        perror("pthread_mutex_init");
        exit(-1); //TODO: implement graceful error exit.
    }
    // Synchronization point: all threads including the master thread wait for all slave mutexes to be initialized.
    rc = pthread_barrier_wait(&g_StartBarrier);
    if (rc && rc != PTHREAD_BARRIER_SERIAL_THREAD) {
        printf("> Slave %u: failed initial barrier synchronization!\n", tIndex);
        perror("pthread_barrier_wait");
        exit(-1); //TODO: implement graceful error exit.
    }
    while (1) { //TODO replace 1 with state var that is turned off when the master thread receives a request to shutdown server.
        rc = pthread_mutex_lock(tMutex);
        if (rc) {
            printf("> Slave %u: mutex lock returned an error: %i!\n", tIndex, rc);
            perror("pthread_mutex_lock()");
            continue;
        }
        clientFD = threadData->m_ClientFD;
        if (!threadData->m_Task && clientFD == -1) {
            rc = pthread_mutex_unlock(tMutex);
            int yieldStatus = sched_yield();
            if (yieldStatus) {
                printf("> Slave %u: error while attempting to yield!\n", tIndex);
                perror("sched_yield()\n");
            }
            continue;
            //printf("> Slave %u: yielding.\n", tIndex);
        }
        else if (!threadData->m_Task && clientFD != -1) {
            // Read client request.
            threadData->m_Task = (char*) calloc(FILE_BUFFER, sizeof(char)); //TODO: stop allocating buffer on every task.
            printf("> Slave %u going to read from %i into %p\n", tIndex, clientFD, threadData->m_Task);
            MessageHeader h;
            GetHeader(&h, clientFD);
            if (h.m_ID != BeginSession) {
                //protocol implementation error
                exit(-1);
            }
            printf("> Slave %u: expecting client command of length %i\n", tIndex, h.m_ContentSize);
            int n = GetContent((void*) threadData->m_Task, h.m_ContentSize * sizeof(char), clientFD);
            printf("> Slave %u: client of file descriptor %i sent [%s] at a total of %i bytes\n", tIndex, clientFD, threadData->m_Task, n);
            short int remoteOperation;
            int baseArgvSize = 10;
            char **argv = (char**) calloc(baseArgvSize, sizeof(char*)); //TODO: stop allocating table on every task.
            int argc = CmdToTable(threadData->m_Task, &argv, &baseArgvSize);
            int localLen = strlen("local");
            int remoteLen = strlen("remote");
            if(!strncmp(threadData->m_Task, "remote", remoteLen)) {
                remoteOperation = 1;
                g_Workers[tIndex].m_Networking = 1;
            }
            else if(!strncmp(threadData->m_Task, "local", localLen)) {
                remoteOperation = 0;
                g_Workers[tIndex].m_Networking = 0;
                g_Workers[tIndex].m_OutputDir = (char*) calloc(FILE_BUFFER, sizeof(char));
                MessageHeader hPath;
                GetHeader(&hPath, clientFD);
                if (hPath.m_ID != SendMessage) {
                    //protocol implementation error
                    exit(-1);
                }
                int cnt = GetContent((void*) g_Workers[tIndex].m_OutputDir, hPath.m_ContentSize * sizeof(char), clientFD);
                g_Workers[tIndex].m_OutputDir[hPath.m_ContentSize * sizeof(char)] = '/';
                printf("> Slave %u: received message with %i bytes.\n", tIndex, cnt);
                printf("> Slave %u: client output going to %s\n", tIndex, g_Workers[tIndex].m_OutputDir);
            }
            else {
                printf("> Slave %u: received bogus client command: %s\n", tIndex, threadData->m_Task);
                return NULL;
            }
            printf("> Slave %u: remote mode - %i.\n", tIndex, g_Workers[tIndex].m_Networking);
            // Debug print the table built from the client command.
            printf("> Slave %u: table has %i entries.\n", tIndex, argc);
            int i;
            for (i = 0; i < argc; i++) {
                printf("> Slave %u: argument %i is %s.\n", tIndex, i, argv[i]);
            }
            // Prepare the input files the client will send.
            char**filenames = (char**) calloc(argc, sizeof(char*));
            FILE**files = (FILE**) calloc(argc, sizeof(FILE*));
            unsigned int reqIDIndex = 0;
            unsigned int reqIDLen = strlen(argv[reqIDIndex]);
            if(!remoteOperation)
                continue;
            // Check which files need to be sent from the client.
            for (i = reqIDIndex + 1; i < argc; i++) {
                unsigned int argSize = strlen(argv[i]);
                if (strstr(argv[i], FILETYPE_SEPARATOR) != NULL
                        && strstr(argv[i], INDEX_FILETYPE) == NULL) {
                    // If the current argument has a file type and isn't an index type.
                    filenames[i] = calloc(reqIDLen + argSize + 2, sizeof(char)); // +2 to account for terminator char and hyphen between reqID and filename
                    printf("> Slave %u: arg %s check with req %s\n", tIndex, argv[i], argv[reqIDIndex]);
                    int noReqYet = strncmp(argv[i], argv[reqIDIndex], reqIDLen);
                    if (noReqYet) {
                        printf("> Slave %u: no prepended request yet!\n", tIndex);
                        strncpy(filenames[i], argv[reqIDIndex], reqIDLen);
                        filenames[i][reqIDLen] = '-';
                    }
                    // If the argument is a path to a file, get the filename and discard the path (that was local to the client).
                    char*nameStart = strrchr(argv[i], '/'); //TODO: make a define for 92: backslash ascii char number
                    printf("> Slave %u: going to remove backslash from %s\n", tIndex, argv[i]);
                    if (nameStart != NULL )
                        nameStart++;
                    else
                        nameStart = argv[i];
                    printf("> Slave %u: got %s after removing backslash\n", tIndex, nameStart);
                    printf("noReqYet: %i\n", noReqYet);
                    if (!noReqYet)
                        strncpy(filenames[i], nameStart, strlen(nameStart));
                    else
                        strncpy(filenames[i] + reqIDLen + 1, nameStart,
                                strlen(nameStart)); //+1 to account for the hyphen.
                    // When freeing the argv table, only need to free(argv). There is no need to free argv[i] elements.
                    argv[i] = filenames[i];
                }
                else if (strstr(argv[i], FILETYPE_SEPARATOR) != NULL && strstr(argv[i], INDEX_FILETYPE) != NULL ) {
                    // If the current argument is the index type.
                    char*nameStart = strrchr(argv[i], '/');
                    if (nameStart != NULL ) {
                        nameStart++;
                        argv[i] = nameStart;
                    }
                }
            }
            // Debug print the needed files.
            printf("> Slave %u: awaiting files:", tIndex);
            for (i = 0; i < argc; i++) {
                if (filenames[i] != NULL ) {
                    printf(" %s", filenames[i]);
                }
            }
            printf("\n");
            // Await each input file from the client.
            for (i = 0; i < argc; i++) {
                if (filenames[i] != NULL ) {
                    files[i] = GetFile(clientFD, filenames[i]); // Note: GetFile invokes fclose!
                    if (files[i])
                        printf("> Slave %u: locally stored file %s\n", tIndex, filenames[i]);
                }
            }
        }
        printf("> Slave %u: going to invoke SatisfyRequest with arguments: ", tIndex);
        for (i = reqIDIndex; i < argc; i++) {
            if (argv[i] != NULL )
                printf("%s ", argv[i]);
        }
        printf("\n");
        // Process the request against the index.
        int res = SatisfyRequest(argc, argv, tIndex, clientFD);
        char* dummy;
        if (!res)
            dummy = "Your request succeeded.";
        else
            dummy = "Sorry, there was an error.";
        // Terminate session with the client.
        SendMsg(EndSession, (void*)dummy, strlen(dummy), clientFD);
        printf("> Slave %u: task result delivered to client of file descriptor %i.\n", tIndex, clientFD);
        close(clientFD);
        free(threadData->m_Task);
        free(argv);
        if(!remoteOperation) {
            free(g_Workers[tIndex].m_OutputDir);
        }
        // Free received file names and file pointer array.
        for (i = 0; i < argc; i++) {
            if (filenames[i])
                free(filenames[i]);
        }
        free(filenames);
        free(files);
        // Reset thread-local data.
        threadData->m_Task = NULL;
        threadData->m_ClientFD = -1;
        clientFD = -1;
        printf("> Slave %u: task finished.\n", tIndex);
        rc = pthread_mutex_unlock(tMutex);
        if(!rc)
            continue;
    }
    if (rc) {
        printf("> Slave %u: mutex unlock returned an error: %i!\n", tIndex, rc);
        perror("pthread_mutex_unlock()");
    }
    return NULL;
}
これは、コードのクライアント側の部分です。
char*cmdBuffer = (char*)calloc(FILE_BUFFER, sizeof(char));
char*cmdRealPath = (char*)calloc(FILE_BUFFER, sizeof(char));
int auxNameTableSz = 25;
char**auxNameTable = (char**)calloc(auxNameTableSz, sizeof(char*));
char* ip = argv[2];
char* port = argv[3];
char* networkMode = argv[4];
//  char* reqID = argv[5];
const int cmdStartIndex = 6;
short int remoteOperation;
if(!strcmp(networkMode, "remote")) {
    remoteOperation = 1;
    auxNameTableSz = 0;
}
else if(!strcmp(networkMode, "local")) {
    // If the client is on the server's machine, need to convert argument files to absolute paths.
    remoteOperation = 0;
    int i = cmdStartIndex;
    int j = 0;
    while(i < argc) {
        if(strrchr(argv[i], '/') || strrchr(argv[i], '.')) {
            argv[i] = realpath(argv[i], NULL);
            auxNameTable[j++] = argv[i];
        }
        i++;
    }
    auxNameTableSz = j;
}
else {
    printf("> Client: argument four must be either \"global\" or \"local\"!");
    free(cmdBuffer);
    free(cmdRealPath);
    FreePathsTable(&auxNameTable, auxNameTableSz);
    DeallocateGlobals();
    return 0;
}
// Concatenate client arguments into a single string.
MakeCommand(argc, argv, cmdBuffer);
printf("> Client: [Command]: %s\n", cmdBuffer);
// Establish a (TCP) connection with the server.
if(PrepareConnection(ip, port)) {
    // Failed to prepare connection.
    free(cmdBuffer);
    free(cmdRealPath);
    FreePathsTable(&auxNameTable, auxNameTableSz);
  DeallocateGlobals();
    return 0;
}
if(ConnectToServer()) {
    // Failed to establish a stream socket connection.
    free(cmdBuffer);
    free(cmdRealPath);
    FreePathsTable(&auxNameTable, auxNameTableSz);
    DeallocateGlobals();
    return 0;
}
// Send the client command to the server.
printf("> Client: sending command of %i bytes to server: [%s].\n", (int)strlen(cmdBuffer), cmdBuffer);
int count=0;
if((count=WriteToServer(BeginSession, cmdBuffer)) == -1) {
    // Command send failed.
    free(cmdBuffer);
    free(cmdRealPath);
    FreePathsTable(&auxNameTable, auxNameTableSz);
    DeallocateGlobals();
    return 0;
}
printf("> Client sent command to the server at %i bytes.\n", count);
if(remoteOperation) {
        // Send the correct input files to the server.
        SendInputFiles(argc, argv, cmdStartIndex);
}
else {
    // Send the real path of the client's working directory to the server.
    char *localDir = realpath(".", NULL);
    count=WriteToServer(SendMessage, localDir);
printf("> Client sent the real path to the server at %i bytes.\n", count);
    free(localDir);
}
// Await reply from the server.
char *buffer = (char*) calloc(FILE_BUFFER, sizeof(char));
count = ReceiveFromServer(buffer);
char *backPtr = buffer;
if(remoteOperation) {
    printf("> Client: got output list [%s] totalling %i bytes.\n", buffer, count);
    // Receive the files produced by the server if the client and server are not on the same machine.
    ReceiveOutputFiles(buffer);
    // Await session termination.
        buffer = backPtr;
        memset(buffer, 0, sizeof(char)*FILE_BUFFER);
        count = ReceiveFromServer(buffer);
}
else {
}
printf("> Client: server response: [%s]-[%s]\n", buffer, cmdBuffer);