4

ACE Proactor フレームワークの上に構築されたアプリケーションを移植しています。このアプリケーションは VxWorks と Windows の両方で完全に動作しますが、Linux (CentOS 5.5、WindRiver Linux 1.4 & 3.0) ではカーネル 2.6.XX (librt を使用) では動作しません。

私は問題を非常に基本的な問題に絞り込みました。アプリケーションは、ソケットで非同期 (aio_read 経由) 読み取り操作を開始し、その後、まったく同じソケットで非同期 (aio_write 経由) 書き込みを開始します。プロトコルはアプリケーション側で初期化されているため、読み取り操作はまだ実行できません。- ソケットがブロッキング モードの場合、書き込みに到達せず、プロトコルが「ハング」します。- O_NONBLOCK ソケットを使用している場合、書き込みは成功しますが、読み取りは「EWOULDBLOCK/EAGAIN」エラーで無期限に返され、回復することはありません (AIO 操作が再開されても)。

私は複数のフォーラムを調べましたが、Linux AIO でこれが機能するか (そして私は何か間違ったことをしている)、または不可能であるかについて、決定的な答えを見つけることができませんでした。AIO を削除して別の実装を探すことは可能ですか (epoll/poll/select などを介して)?

非ブロッキング ソケットで問題をすばやく再現するためのサンプル コードを添付します。

#include <aio.h>
#include <stdio.h>
#include <stdlib.h>
#include <netdb.h>
#include <string.h> 
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <assert.h>
#include <errno.h>

#define BUFSIZE (100)

// Global variables
struct aiocb *cblist[2];
int theSocket;

void InitializeAiocbData(struct aiocb* pAiocb, char* pBuffer)
{
    bzero( (char *)pAiocb, sizeof(struct aiocb) );

    pAiocb->aio_fildes = theSocket;
    pAiocb->aio_nbytes = BUFSIZE;
    pAiocb->aio_offset = 0;
    pAiocb->aio_buf = pBuffer;
}

void IssueReadOperation(struct aiocb* pAiocb, char* pBuffer)
{
    InitializeAiocbData(pAiocb, pBuffer);

    int ret = aio_read( pAiocb );
    assert (ret >= 0);
}

void IssueWriteOperation(struct aiocb* pAiocb, char* pBuffer)
{
    InitializeAiocbData(pAiocb, pBuffer);

    int ret = aio_write( pAiocb );
    assert (ret >= 0);
}

int main()
{
    int ret;
    int nPort = 11111;
    char* szServer = "10.10.9.123";

    // Connect to the remote server
    theSocket = socket(AF_INET, SOCK_STREAM, 0);
    assert (theSocket >= 0);

    struct hostent *pServer;
    struct sockaddr_in serv_addr;
    pServer = gethostbyname(szServer);

    bzero((char *) &serv_addr, sizeof(serv_addr));
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_port = htons(nPort);
    bcopy((char *)pServer->h_addr, (char *)&serv_addr.sin_addr.s_addr, pServer->h_length);

    assert (connect(theSocket, (const sockaddr*)(&serv_addr), sizeof(serv_addr)) >= 0);

    // Set the socket to be non-blocking
    int oldFlags = fcntl(theSocket, F_GETFL) ;
    int newFlags = oldFlags | O_NONBLOCK;

    fcntl(theSocket, F_SETFL, newFlags);
    printf("Socket flags: before=%o, after=%o\n", oldFlags, newFlags);

    // Construct the AIO callbacks array
    struct aiocb my_aiocb1, my_aiocb2;
    char* pBuffer = new char[BUFSIZE+1];

    bzero( (char *)cblist, sizeof(cblist) );
    cblist[0] = &my_aiocb1;
    cblist[1] = &my_aiocb2;

    // Start the read and write operations on the same socket
    IssueReadOperation(&my_aiocb1, pBuffer);
    IssueWriteOperation(&my_aiocb2, pBuffer);

    // Wait for I/O completion on both operations
    int nRound = 1;
    printf("\naio_suspend round #%d:\n", nRound++);
    ret = aio_suspend( cblist, 2, NULL );
    assert (ret == 0);

    // Check the error status for the read and write operations
    ret = aio_error(&my_aiocb1);
    assert (ret == EWOULDBLOCK);

    // Get the return code for the read
    {
        ssize_t retcode = aio_return(&my_aiocb1);
        printf("First read operation results: aio_error=%d, aio_return=%d  -  That's the first EWOULDBLOCK\n", ret, retcode);
    }

    ret = aio_error(&my_aiocb2);
    assert (ret == EINPROGRESS);
    printf("Write operation is still \"in progress\"\n");

    // Re-issue the read operation
    IssueReadOperation(&my_aiocb1, pBuffer);

    // Wait for I/O completion on both operations
    printf("\naio_suspend round #%d:\n", nRound++);
    ret = aio_suspend( cblist, 2, NULL );
    assert (ret == 0);

    // Check the error status for the read and write operations for the second time
    ret = aio_error(&my_aiocb1);
    assert (ret == EINPROGRESS);
    printf("Second read operation request is suddenly marked as \"in progress\"\n");

    ret = aio_error(&my_aiocb2);
    assert (ret == 0);

    // Get the return code for the write
    {
        ssize_t retcode = aio_return(&my_aiocb2);
        printf("Write operation has completed with results: aio_error=%d, aio_return=%d\n", ret, retcode);
    }

    // Now try waiting for the read operation to complete - it'll just busy-wait, receiving "EWOULDBLOCK" indefinitely
    do
    {
        printf("\naio_suspend round #%d:\n", nRound++);
        ret = aio_suspend( cblist, 1, NULL );
        assert (ret == 0);

        // Check the error of the read operation and re-issue if needed
        ret = aio_error(&my_aiocb1);
        if (ret == EWOULDBLOCK)
        {
            IssueReadOperation(&my_aiocb1, pBuffer);
            printf("EWOULDBLOCK again on the read operation!\n");
        }
    }
    while (ret == EWOULDBLOCK);
}

Yotamさん、よろしくお願いします。

4

1 に答える 1

3

まず、O_NONBLOCKAIO は混ざりません。AIO は、対応するreadorwriteがブロックされていない場合O_NONBLOCK非同期操作の完了を報告しaioます。aio_return()EWOULDBLOCK

次に、2 つの同時未処理の aio リクエストに同じバッファを使用しないでください。バッファーは、aio 要求が発行されaio_error()てから完了したことが通知されるまでの間、完全にオフリミットであると見なす必要があります。

3 番目に、適切な結果を得るために、同じファイル記述子への AIO 要求がキューに入れられます。これは、読み取りが完了するまで書き込みが行われないことを意味します。最初にデータを書き込む必要がある場合は、逆の順序で AIO を発行する必要があります。以下は、設定せずにO_NONBLOCK正常に動作します。

struct aiocb my_aiocb1, my_aiocb2;
char pBuffer1[BUFSIZE+1], pBuffer2[BUFSIZE+1] = "Some test message";

const struct aiocb *cblist[2] = { &my_aiocb1, &my_aiocb2 };

// Start the read and write operations on the same socket
IssueWriteOperation(&my_aiocb2, pBuffer2);
IssueReadOperation(&my_aiocb1, pBuffer1);

// Wait for I/O completion on both operations
int nRound = 1;
int aio_status1, aio_status2;
do {
    printf("\naio_suspend round #%d:\n", nRound++);
    ret = aio_suspend( cblist, 2, NULL );
    assert (ret == 0);

    // Check the error status for the read and write operations
    aio_status1 = aio_error(&my_aiocb1);
    if (aio_status1 == EINPROGRESS)
        puts("aio1 still in progress.");
    else
        puts("aio1 completed.");

    aio_status2 = aio_error(&my_aiocb2);

    if (aio_status2 == EINPROGRESS)
        puts("aio2 still in progress.");
    else
        puts("aio2 completed.");
} while (aio_status1 == EINPROGRESS || aio_status2 == EINPROGRESS);

 // Get the return code for the read
ssize_t retcode;
retcode = aio_return(&my_aiocb1);
printf("First operation results: aio_error=%d, aio_return=%d\n", aio_status1, retcode);

retcode = aio_return(&my_aiocb1);
printf("Second operation results: aio_error=%d, aio_return=%d\n", aio_status1, retcode);

あるいは、読み取りと書き込みが相互に順序付けられていることを気にしない場合は、 を使用dup()してソケット用に 2 つのファイル記述子を作成し、1 つを読み取り用に、もう 1 つを書き込み用に使用できます。それぞれの AIO 操作がキューに入れられます。別々に。

于 2011-01-07T01:23:34.417 に答える