1

パイプで接続された 2 つのプロセスがあります。

1 つのプロセスには、パイプにメッセージを書き込む複数のスレッドがあります。

他のプロセスはパイプを読み取り、メッセージを処理します。

問題は、プロセスがパイプを読み取るときに、すべてのメッセージを次々に取得することです。一度に 1 つのメッセージだけを読む方法はありますか?

最初は、ファイル記述子を直接使用して、書き込み関数と読み取り関数を使用しました。次に、fdopen、fread、fwrite を使用してそれらをファイルとして扱ってみましたが、それでもすべてのデータが同時に読み取られます。

メッセージのサイズはその都度変わるので、一定の文字数を読んで修正することはできません。

4

1 に答える 1

1

むかしむかし、POSIX が世界のどこかで知られている概念になる前に、少なくとも 1 つのバージョンの Unix が、パイプ バッファに残されたスペースのサイズよりも少ない書き込みが、対応するアトミック チャンクで読み取られるようなものを維持していました。パイプに書き込まれたパケットのサイズに、十分なデータを読み取ろうとしたという制約が適用されます。残念ながら (または、「明らかに」という意味かもしれません)、これが事実であったことを証明することはできません。関連するハードウェアと O/S に四半世紀以上アクセスできていません。

ただし、反例による証明は、Mac OS X が読み取り側をそのように処理しないことを示しています (ただしwrite()、要求された書き込みサイズが十分に小さい場合、POSIX は呼び出しがアトミックであることを保証します)。これは私にとってちょっとした驚きでした。

反例 — コード

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>

static void childish(int fd)
{
    char buffer[1024];
    int  nbytes;
    int  pid = getpid();
    while ((nbytes = read(fd, buffer, sizeof(buffer))) > 0)
    {
        printf("%.5d: %.4d <<%.*s>>\n", pid, nbytes, nbytes, buffer);
        fflush(stdout);
    }
    printf("%.5d: exiting\n", pid);
    exit(0);
}

static void parental(int fd)
{
    char message[] = "The Quick Brown Fox Jumped Over The Lazy Dog";
    int  pid = getpid();
    for (int i = 0; i < 20; i++)
    {
        int  nbytes = rand() % (sizeof(message) - 1);
        while (nbytes == 0)
            nbytes = rand() % (sizeof(message) - 1);
        if (write(fd, message, nbytes) != nbytes)
            break;
        printf("%.5d: %.4d <<%.*s>>\n", pid, nbytes, nbytes, message);
        fflush(stdout);
    }
    printf("%.5d: exiting\n", pid);
    exit(0);
}

int main(void)
{
    int fd[2];
    pipe(fd);
    pid_t pid = fork();
    if (pid < 0)
        fprintf(stderr, "failed to fork\n");
    else if (pid == 0)
    {
        close(fd[1]);
        childish(fd[0]);
    }
    else
    {
        close(fd[0]);
        parental(fd[1]);
    }
    return EXIT_FAILURE;  // Failed to fork
}

反例 — データ

Mac OS X 10.8.3 でテスト済み。

86504: 0043 <<The Quick Brown Fox Jumped Over The Lazy Do>>
86504: 0001 <<T>>
86504: 0033 <<The Quick Brown Fox Jumped Over T>>
86504: 0006 <<The Qu>>
86504: 0030 <<The Quick Brown Fox Jumped Ove>>
86504: 0036 <<The Quick Brown Fox Jumped Over The >>
86504: 0024 <<The Quick Brown Fox Jump>>
86504: 0022 <<The Quick Brown Fox Ju>>
86504: 0031 <<The Quick Brown Fox Jumped Over>>
86504: 0037 <<The Quick Brown Fox Jumped Over The L>>
86504: 0028 <<The Quick Brown Fox Jumped O>>
86504: 0017 <<The Quick Brown F>>
86504: 0032 <<The Quick Brown Fox Jumped Over >>
86504: 0038 <<The Quick Brown Fox Jumped Over The La>>
86504: 0019 <<The Quick Brown Fox>>
86504: 0007 <<The Qui>>
86504: 0023 <<The Quick Brown Fox Jum>>
86504: 0005 <<The Q>>
86504: 0020 <<The Quick Brown Fox >>
86504: 0004 <<The >>
86504: exiting
86505: 0456 <<The Quick Brown Fox Jumped Over The Lazy DoTThe Quick Brown Fox Jumped Over TThe QuThe Quick Brown Fox Jumped OveThe Quick Brown Fox Jumped Over The The Quick Brown Fox JumpThe Quick Brown Fox JuThe Quick Brown Fox Jumped OverThe Quick Brown Fox Jumped Over The LThe Quick Brown Fox Jumped OThe Quick Brown FThe Quick Brown Fox Jumped Over The Quick Brown Fox Jumped Over The LaThe Quick Brown FoxThe QuiThe Quick Brown Fox JumThe QThe Quick Brown Fox The >>
86505: exiting

アトミック書き込み — 非アトミック読み取り

このコードはwritev()、メッセージの長さとメッセージの両方をパイプに書き込むために使用します。必然的に、2 回の読み取りを使用してデータを取得し、長さを取得してからメッセージを取得します。これは単一のリーダーで機能します。複数のリーダーを使用する場合、あるリーダーがファイル記述子にアクセスせず、別のリーダーが長さを読み取ってもデータを読み取らないように、リーダー間で調整する必要があります。このコードは、writev()システム コールを使用して、1 回のシステム コールで長さとデータを書き込みます。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/uio.h>
#include <unistd.h>

static void childish(int fd)
{
    int  nbytes;
    int  pid = getpid();
    while (read(fd, &nbytes, sizeof(nbytes)) == sizeof(nbytes))
    {
        char buffer[1024];
        int  actual;
        if ((actual = read(fd, buffer, nbytes)) != nbytes)
        {
            fprintf(stderr, "%.5d: short read (wanted %d, actual %d)\n", pid, nbytes, actual);
            break;
        }
        printf("%.5d: %.4d <<%.*s>>\n", pid, nbytes, nbytes, buffer);
        fflush(stdout);
    }
    printf("%.5d: exiting\n", pid);
    exit(0);
}

static void parental(int fd)
{
    char message[] = "The Quick Brown Fox Jumped Over The Lazy Dog";
    int nbytes = 0;
    struct iovec req[2];
    req[0].iov_base = &nbytes;
    req[0].iov_len  = sizeof(nbytes);
    req[1].iov_base = message;
    req[1].iov_len  = 0;
    int  pid = getpid();
    for (int i = 0; i < 20; i++)
    {
        do
        {
            nbytes = rand() % (sizeof(message) - 1);
        } while (nbytes == 0);
        req[1].iov_len = nbytes;
        if (writev(fd, req, 2) != (int)(nbytes + sizeof(nbytes)))
            break;
        printf("%.5d: %.4d <<%.*s>>\n", pid, nbytes, nbytes, message);
        fflush(stdout);
    }
    printf("%.5d: exiting\n", pid);
    exit(0);
}

int main(void)
{
    int fd[2];
    pipe(fd);
    pid_t pid = fork();
    if (pid < 0)
        fprintf(stderr, "failed to fork\n");
    else if (pid == 0)
    {
        close(fd[1]);
        childish(fd[0]);
    }
    else
    {
        close(fd[0]);
        parental(fd[1]);
    }
    return EXIT_FAILURE;  // Failed to fork
}

出力例

86798: 0043 <<The Quick Brown Fox Jumped Over The Lazy Do>>
86798: 0001 <<T>>
86798: 0033 <<The Quick Brown Fox Jumped Over T>>
86798: 0006 <<The Qu>>
86798: 0030 <<The Quick Brown Fox Jumped Ove>>
86798: 0036 <<The Quick Brown Fox Jumped Over The >>
86798: 0024 <<The Quick Brown Fox Jump>>
86798: 0022 <<The Quick Brown Fox Ju>>
86798: 0031 <<The Quick Brown Fox Jumped Over>>
86798: 0037 <<The Quick Brown Fox Jumped Over The L>>
86798: 0028 <<The Quick Brown Fox Jumped O>>
86798: 0017 <<The Quick Brown F>>
86798: 0032 <<The Quick Brown Fox Jumped Over >>
86798: 0038 <<The Quick Brown Fox Jumped Over The La>>
86798: 0019 <<The Quick Brown Fox>>
86798: 0007 <<The Qui>>
86798: 0023 <<The Quick Brown Fox Jum>>
86798: 0005 <<The Q>>
86798: 0020 <<The Quick Brown Fox >>
86798: 0004 <<The >>
86798: exiting
86799: 0043 <<The Quick Brown Fox Jumped Over The Lazy Do>>
86799: 0001 <<T>>
86799: 0033 <<The Quick Brown Fox Jumped Over T>>
86799: 0006 <<The Qu>>
86799: 0030 <<The Quick Brown Fox Jumped Ove>>
86799: 0036 <<The Quick Brown Fox Jumped Over The >>
86799: 0024 <<The Quick Brown Fox Jump>>
86799: 0022 <<The Quick Brown Fox Ju>>
86799: 0031 <<The Quick Brown Fox Jumped Over>>
86799: 0037 <<The Quick Brown Fox Jumped Over The L>>
86799: 0028 <<The Quick Brown Fox Jumped O>>
86799: 0017 <<The Quick Brown F>>
86799: 0032 <<The Quick Brown Fox Jumped Over >>
86799: 0038 <<The Quick Brown Fox Jumped Over The La>>
86799: 0019 <<The Quick Brown Fox>>
86799: 0007 <<The Qui>>
86799: 0023 <<The Quick Brown Fox Jum>>
86799: 0005 <<The Q>>
86799: 0020 <<The Quick Brown Fox >>
86799: 0004 <<The >>
86799: exiting

この例では、プロセスが厳密に連続して実行されていることに失望しています。これはマルチコア マシンなので、期待したものではありませんでした。ループ制限を 20 から 2000 に変更すると、実行がインターリーブされ、送信側と受信側でデータの同期が保たれました。

長さには 4 バイトintの値を使用しました。明らかに、手持ちのデータの場合、1 バイトを使用するだけで十分でした (文字列の長さが 44 文字しかないので、 1 バイトunsigned charでもかまいません)。signed char

ジェネレーターをシードしていないことに注意してください。そのrand()ため、出力は実行されるたびにプロセス ID とは別に決定論的です。

また、POSIX 仕様を読んでも、writev()セグメントがパイプ上で 1 つのユニットとして扱われることが保証されているとは断言できません。そうでない場合parental()は、長さと関連するデータ量を含むコード内のバッファーを作成し、通常の呼び出しにフォールバックする必要がありwrite()ます。これを行うのはまったく難しくありません。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

static void childish(int fd)
{
    char nbytes;
    int  pid = getpid();
    while (read(fd, &nbytes, sizeof(nbytes)) == sizeof(nbytes))
    {
        char buffer[1024];
        int  actual;
        if ((actual = read(fd, buffer, nbytes)) != nbytes)
        {
            fprintf(stderr, "%.5d: short read (wanted %d, actual %d)\n", pid, nbytes, actual);
            break;
        }
        printf("%.5d: %.4d <<%.*s>>\n", pid, nbytes, nbytes, buffer);
        fflush(stdout);
    }
    printf("%.5d: exiting\n", pid);
    exit(0);
}

static void parental(int fd)
{
    char message[] = "\000The Quick Brown Fox Jumped Over The Lazy Dog";
    int  pid = getpid();
    for (int i = 0; i < 20; i++)
    {
        int nbytes;
        do
        {
            nbytes = rand() % (sizeof(message) - 1);
        } while (nbytes == 0);
        message[0] = nbytes;
        if (write(fd, message, nbytes + 1) != (nbytes + 1))
            break;
        printf("%.5d: %.4d <<%.*s>>\n", pid, nbytes, nbytes, message+1);
        fflush(stdout);
    }
    printf("%.5d: exiting\n", pid);
    exit(0);
}

int main(void)
{
    int fd[2];
    pipe(fd);
    pid_t pid = fork();
    if (pid < 0)
        fprintf(stderr, "failed to fork\n");
    else if (pid == 0)
    {
        close(fd[1]);
        childish(fd[0]);
    }
    else
    {
        close(fd[0]);
        parental(fd[1]);
    }
    return EXIT_FAILURE;  // Failed to fork
}

コードの記述にスレッドを追加する

書き込みを行うためのスレッドを作成することは、それほど難しいことではありません。コードはまだ を使用してrand()いますが、rand()スレッドセーフであることが保証されていないため、期待どおりに機能しない可能性があります。一方、このコードは単にrand()可変サイズのメッセージを生成するために使用しています。完全に機能することは重要ではありません。

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

static void childish(int fd)
{
    char nbytes;
    int  pid = getpid();
    while (read(fd, &nbytes, sizeof(nbytes)) == sizeof(nbytes))
    {
        char buffer[1024];
        int  actual;
        if ((actual = read(fd, buffer, nbytes)) != nbytes)
        {
            fprintf(stderr, "%.5d: short read (wanted %d, actual %d)\n", pid, nbytes, actual);
            break;
        }
        printf("%.5d: %.4d R <<%.*s>>\n", pid, nbytes, nbytes, buffer);
        fflush(stdout);
    }
    printf("%.5d: exiting\n", pid);
    exit(0);
}

static void *p_thread(void *data)
{
    int fd = *(int *)data;
    char message[] = "\000The Quick Brown Fox Jumped Over The Lazy Dog";
    int  pid = getpid();
    for (int i = 0; i < 20; i++)
    {
        int nbytes;
        do
        {
            nbytes = rand() % (sizeof(message) - 1);
        } while (nbytes == 0);
        message[0] = nbytes;
        if (write(fd, message, nbytes + 1) != (nbytes + 1))
            break;
        printf("%.5d: %.4d W <<%.*s>>\n", pid, nbytes, nbytes, message+1);
        fflush(stdout);
    }
    printf("%.5d: thread exiting\n", pid);
    return(0);
}

static void parental(int fd)
{
    enum { NUM_THREADS = 3 };
    pthread_t thr[NUM_THREADS];
    int  pid = getpid();
    for (int i = 0; i < NUM_THREADS; i++)
    {
        if (pthread_create(&thr[i], 0, p_thread, (void *)&fd) != 0)
        {
            fprintf(stderr, "%.5d: failed to create thread number %d\n", pid, i);
            exit(EXIT_FAILURE);
        }
    }
    for (int i = 0; i < NUM_THREADS; i++)
    {
        if (pthread_join(thr[i], 0) != 0)
        {
            fprintf(stderr, "%.5d: failed to join thread number %d\n", pid, i);
            exit(EXIT_FAILURE);
        }
    }
    printf("%.5d: master thread exiting\n", pid);
    exit(EXIT_SUCCESS);
}

int main(void)
{
    int fd[2];
    pipe(fd);
    pid_t pid = fork();
    if (pid < 0)
        fprintf(stderr, "failed to fork\n");
    else if (pid == 0)
    {
        close(fd[1]);
        childish(fd[0]);
    }
    else
    {
        close(fd[0]);
        parental(fd[1]);
    }
    return EXIT_FAILURE;  // Failed to fork
}

Note that p_thread(), the thread function, is almost a copy of the previous parental function, but the new parental() function coordinates the creation and termination of three threads. The code in childish() and main() did not need changing at all (though I added the R to the print in childish() to match the W in the code in p_thread()).

于 2013-05-12T18:56:02.673 に答える