0
#include "cs241.c"

#define THREAD_COUNT 10

int list_s;
int connections[THREAD_COUNT];
char space[THREAD_COUNT];

int done = 0;

pthread_mutex_t muxlock = PTHREAD_MUTEX_INITIALIZER;

int *numbers;
int numbers_count;

void *listener(void *arg) {
    int n = *(int *) arg;

    FILE *f = fdopen(connections[n], "r");
    if (f == NULL)
        printf("Could not open file\n");
    char *line = NULL;
    size_t *len = malloc(sizeof(int));
    while(getline(&line, len, f) != -1) {
        printf("%s", line);
        if (strcmp("END", line) == 0) {
                            pthread_mutex_lock(&muxlock);
            done = 1;
            pthread_mutex_unlock(&muxlock);                     
        }
    }

    space[n] = 't';
    fclose(f);
    free(len);
    close(connections[n]);
    return NULL;
}

void initialize() {
    int n;
    for (n = 0; n < THREAD_COUNT; n++) {
        space[n] = 't';
    }
}

int check() {
    int index;
    for (index = 0; index < THREAD_COUNT; index++) {
        if (space[index] == 't') {
            space[index] = 'f';
            return index;
        }
    }
    return 0;
}   

int main(int argc, char *argv[]) {
    int port = 0;
    int binder;
    int lis;
    int i = 0;
    int *j = malloc(sizeof(int));
    initialize();
    pthread_t threads[THREAD_COUNT];

    if ((argc != 2) || (sscanf(argv[1], "%d", &port) != 1)) {
        fprintf(stderr, "Usage: %s [PORT]\n", argv[0]);
        exit(1);
    }

    if (port < 1024) {
        fprintf(stderr, "Port must be greater than 1024\n");
        exit(1);
    }

    // set the initial conditions for the numbers array.
    numbers = malloc(sizeof(int));
    numbers_count = 0;

    struct sockaddr_in servaddr; // socket address structure
    // set all bytes in socket address structure to zero, and fill in the relevant data members
    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(port);

    list_s = socket(AF_INET, SOCK_STREAM, 0);
    if (list_s < 0) {
        printf("Could not create socket\n");
        exit(EXIT_FAILURE);
    }
    binder = bind(list_s, (struct sockaddr *) &servaddr, sizeof(servaddr));
    if (binder < 0) {
        printf("Could not bind socket\n");
        exit(EXIT_FAILURE);
    }
    lis = listen(list_s, SOMAXCONN);
    if (lis < 0) {
        printf("Could not listen on socket\n");
        exit(EXIT_FAILURE);
    }
    SET_NON_BLOCKING(list_s);
    while (done != 1) {
        connections[i] = accept(list_s, NULL, NULL);
        if (connections[i] < 0) {
            if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
                continue;
            printf("Could not accept connection\n");
            exit(EXIT_FAILURE);
        }
        i = check();
        *j = i;
                    SET_BLOCKING(list_s);
        pthread_create(&threads[i], NULL, listener, j);
    }

    // Verify the array.
    //verify(numbers, numbers_count);

    free(j);
    close(list_s);

    exit(EXIT_SUCCESS);

}

そのため、main() に while ループがあり、'done' = 1 のときに終了する必要があります。これは、listener() が 'END' を受け取ったときに設定されます。最初の問題は、最初の反復で 'END' を送信すると、while ループが終了せず、別の 'END' が送信された後にのみ終了することです。

ソケットのブロックを解除およびブロックするために別のファイルに 2 つのマクロ SET_NON_BLOCKING および SET_BLOCKING があるため、接続がない場合は接続を待ちます。次の問題は、これらのマクロを使用しない場合です。listener() の getline() は、ストリームから出力されるすべてを読み取ることができません。それらを使用すると、ストリームをまったく開くことができません。

問題のいくつかは、スレッドに 'j' を渡すことにあると思います。2 番目のスレッドが開始すると、最初のスレッドが読み取れる前に 'j' が上書きされます。しかし、私は数日以上そこにいて、どこにも行けません。どんな助けでも大歓迎です。ありがとう!

また、ソケットのブロックとスレッドのロックが適切な場所にあるかどうかを尋ねたいと思いますか?

4

2 に答える 2

1

accept()リスナー スレッドを生成した直後にメイン スレッドがブロックされるため、おそらく END を 2 回送信する必要があります。ブロックされているため、accept()それを見ることはできませんdone == 1

これは、非ブロック モードのままにすることで修正できます。これを行う場合、タイトなループで回転するのを避けるために、おそらくスリープ状態にする必要があります。別の代替手段は、EINTR を設定する原因となる受け入れをウェイクアップするためのシグナルを送信することです。

接続インデックスをリスナー スレッドに渡すという点では、値を上書きするスレッド間で競合が発生していることはおそらく正しいでしょう。intは と同じかそれよりも少ないバイト数を必要とするため、実際void *には をint直接渡すことができます。例えば:

pthread_create(&threads[i], NULL, listener, (void *)i);

そしてリスナーで:

int n = (int)arg;

これは一種のハックです。より完全な解決策は、malloc を使用してスレッドごとに個別の引数構造体を割り当てることです。リスナー スレッドは、引数を解放する責任があります。

struct params *p = malloc(sizeof(struct params));
p.index = i;

pthread_create(&threads[i], NULL, listener, p);

次にリスナーで:

struct params *p = args;
if (p == NULL) {
  // handle error
  return NULL;
}
int n = p->index;
free(p);
于 2012-12-02T19:22:37.333 に答える
1

connections[]およびspace[]配列の管理方法には、かなりの問題があるようです。

  • への接続を受け入れますconnections[i]が、スレッドiに渡すのは によって返されるものであり、まったく異なる.listenercheck()i
  • 各スレッドには、スレッド パラメーターの同じアドレスが渡されますj。一度だけ割り当てられ、スレッドの作成ごとに同じ割り当てが使用されます。2 つ以上のスレッドがほぼ同時に開始されると、それらのスレッドの一部に渡されるはずのインデックスが失われます。
  • 終了するとlistener、接続が利用可能であるとマークされますが(それが意図していると思います)、接続を閉じるためにspace[n] = 't';使用します。connections[n]これは、同期では実行されません。

その他のコメント:

  • なぜあなたは動的に割り当てているのlenですか?
  • doneinmain()のループのチェックはwhileミューテックスで保護する必要があります
  • 転送されるデータのプロトコルが正確にはわかりませんが、使用しているのでgetline()、改行で終了する一連の行であると想定しています。その場合、getline()ドキュメントからのこのビットの情報に注意してください。「バッファはヌルで終了しており、見つかった場合は改行文字が含まれています」。"END"行に改行が含まれている場合、strcmp("END",line)は返されません0
于 2012-12-02T19:32:38.450 に答える