0

ここで何がうまくいかないのか本当に理解できないので、誰かが私が見逃したものを見つけてくれることを願っています.

Javaで開発したクライアントを受け入れるユーザーデーモンを書いています。今のところ、このクライアントはユーザー名パスワードのみを接続して送信します。私はcygwinの下でコードを開発しましたが、そこで動作します。デーモンが自己紹介を送信し、次にクライアントがユーザー名とパスワードを送信すると、デーモンはクライアントを切断するか、OK を送信して応答します (まだ行われていません)。

cygwin を使用してこれをテストすると、ローカルホストで動作します。コードを HPUX に移植したところ、クライアントは接続でき、デーモンからの導入も受信できます。クライアントがユーザー名とパスワードを送信すると、もう機能しません。デーモンは 1 バイトしか受信せず、再度読み取ろうとすると、EAGAIN の結果として -1 が返され、他には何も返されません。クライアントにはエラーは表示されず、デーモン側にもエラーはありません。gdb を使用してコードをステップ実行すると、メッセージが完全に受信されます。:(

私が使用するコードはこれです。さらに情報が必要な場合は、追加できます。

int TCPSocket::receive(SocketMessage &oMessage, int nBytes)
{
    int max = getSendBufferSize();

    if(nBytes != -1 && nBytes < max)
        max = nBytes;

    SocketMessage sb(max);
    int rcv_bytes = 0;
    int total = 0;
    int error = 0;

    while(1)
    {
        rcv_bytes = ::recv(getSocketId(), &sb[0], sb.size(), 0);
        error = errno;
        FILE_LOG(logDEBUG4) << "Received on socket: " << mSocketId << " bytes: " << rcv_bytes << "  expected:" << sb.size() << " total: " << total << " errno: " << error;

        if(rcv_bytes == -1)
        {
            if(error == EAGAIN || error == EWOULDBLOCK)
                return total;

            throw SocketException(this, SocketException::RECEIVE, error, "Socket", "Client connection error!", __FILE__, __LINE__);
        }

        //if(rcv_bytes == 0)
        //  throw SocketException(this, SocketException::RECEIVE, error, "Socket", "Client connection has been closed!");

        total += rcv_bytes;
        oMessage.insert(oMessage.end(), sb.begin(), sb.begin()+rcv_bytes);
    }

    return total;
}

ログの出力は次のとおりです。

16:16:04.391 DEBUG4: Received on socket: 4 bytes: 1  expected:32768 total: 0 errno: 2
16:16:04.391 DEBUG4: Received on socket: 4 bytes: -1  expected:32768 total: 1 errno: 11

残りの 30 バイトはどこにあり、なぜ返されないのでしょうか?

アップデート

これは、実際にデータを受け取るコードの一部にすぎません。ソケット クラス自体は、プロトコルなしで raw ソケットのみを処理します。プロトコルは別のクラスで実装されます。この受信関数は、ネットワーク上で利用可能なすべてのバイトを取得し、それをバッファー (SocketMessage) に入れることになっています。バイト数が複数のメッセージであるか、単一のメッセージの一部であるかは問題ではありません。これは、制御クラスが (おそらく) 部分的なメッセージ ストリームから実際のメッセージ ブロックを構築するためです。最初の呼び出しが 1 バイトしか受信しないことは問題ではありません。メッセージが完了していない場合、呼び出し元はさらにデータが到着してメッセージが完了するまでループ内で待機するためです。複数のクライアントが存在する可能性があるため、非ブロッキング ソケットを使用しています。個別のスレッドを扱いたくありませんでした。私のサーバーは接続を多重化しています。ここでの問題は、受信が 1 バイトしか受信しないのに、もっと多くのバイトが必要であることを知っていることです。エラーコード EAGAIN が処理され、次に受信が開始されると、さらにバイトが取得されます。ネットワークが 1 バイトしか送信しなかったとしても、残りのメッセージは次に到着するはずですが、そうではありません。あたかもそこに何もないかのようにデータ ブロックを受信するためにソケットを待機する選択。dbg で同じコードを実行してステップ実行すると、機能します。同じクライアントに再度接続すると、突然、より多くのバイトが受信されます。localhost を使用して cygwin で同じコードを使用すると、正常に動作します。エラーコード EAGAIN が処理され、次に受信が開始されると、さらにバイトが取得されます。ネットワークが 1 バイトしか送信しなかったとしても、残りのメッセージは次に到着するはずですが、そうではありません。あたかもそこに何もないかのようにデータ ブロックを受信するためにソケットを待機する選択。dbg で同じコードを実行してステップ実行すると、機能します。同じクライアントに再度接続すると、突然、より多くのバイトが受信されます。localhost を使用して cygwin で同じコードを使用すると、正常に動作します。エラーコード EAGAIN が処理され、次に受信が開始されると、さらにバイトが取得されます。ネットワークが 1 バイトしか送信しなかったとしても、残りのメッセージは次に到着するはずですが、そうではありません。あたかもそこに何もないかのようにデータ ブロックを受信するためにソケットを待機する選択。dbg で同じコードを実行してステップ実行すると、機能します。同じクライアントに再度接続すると、突然、より多くのバイトが受信されます。localhost を使用して cygwin で同じコードを使用すると、正常に動作します。同じクライアントに再度接続すると、突然、より多くのバイトが受信されます。localhost を使用して cygwin で同じコードを使用すると、正常に動作します。同じクライアントに再度接続すると、突然、より多くのバイトが受信されます。localhost を使用して cygwin で同じコードを使用すると、正常に動作します。

アップデート

これが完全なコードです。

Main.cpp

    mServerSocket = new TCPSocket(getListeningPort());
    mServerSocket->bindSocket();
    mServerSocket->setSocketBlocking(false);
    mServerSocket->listenToClient(0);

    setupSignals();
    while(isSIGTERM() == false)
    {
        try
        {
            max = prepareListening();

            //memset(&ts, 0, sizeof(struct timespec));
            pret = pselect(max+1, &mReaders, &mWriters, &mExceptions, NULL, &mSignalMask);
            error_code = errno;
            if(pret == 0)
            {
                // Timeout occured, but we are not interested in that for now.
                // Currently this shouldn't happen anyway.
                continue;
            }
            processRequest(pret, error_code);

        }
        catch (SocketException &excp)
        {
            removeClientConnection(findClientConnection(excp.getTCPSocket()));
        }
    } // while sigTERM


BaseSocket.cpp:

#ifdef UNIX

    #include <sys/socket.h>
    #include <sys/types.h>
    #include <sys/ioctl.h>
    #include <unistd.h>
    #include <fcntl.h>
    #include <errno.h>

#endif

#include "support/exceptions/socket_exception.h"
#include "support/logging/simple_log.h"
#include "support/network/base_socket.h"

using namespace std;

BaseSocket::BaseSocket(void)
{
    mSocketId = -1;
    mSendBufferSize = MAX_SEND_LEN;
}

BaseSocket::BaseSocket(int pNumber)
{
    mSocketId = -1;
    mPortNumber = pNumber;
    mBlocking = 1;
    mSendBufferSize = MAX_SEND_LEN;

    try
    {
        if ((mSocketId = ::socket(AF_INET, SOCK_STREAM, 0)) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::CONSTRUCTOR, errno, "Socket", "unix: error in socket constructor", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
    }

    /*
     set the initial address of client that shall be communicated with to
     any address as long as they are using the same port number.
     The clientAddr structure is used in the future for storing the actual
     address of client applications with which communication is going
     to start
     */
    mClientAddr.sin_family = AF_INET;
    mClientAddr.sin_addr.s_addr = htonl(INADDR_ANY);
    mClientAddr.sin_port = htons(mPortNumber);
    updateSendBufferSize(MAX_SEND_LEN);
}

void BaseSocket::updateSendBufferSize(int nNewSize)
{
    mSendBufferSize = getSendBufferSize();
    if(mSendBufferSize > nNewSize)
        mSendBufferSize = nNewSize;
}

BaseSocket::~BaseSocket(void)
{
    close();
}

void BaseSocket::setSocketId(int socketFd)
{
    mSocketId = socketFd;
}

int BaseSocket::getSocketId()
{
    return mSocketId;
}

// returns the port number
int BaseSocket::getPortNumber()
{
    return mPortNumber;
}

void BaseSocket::setDebug(int debugToggle)
{
    try
    {
        if (setsockopt(mSocketId, SOL_SOCKET, SO_DEBUG, (char *) &debugToggle, sizeof(debugToggle)) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error set debug", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
    }
}

void BaseSocket::setReuseAddr(int reuseToggle)
{
    try
    {
        if (setsockopt(mSocketId, SOL_SOCKET, SO_REUSEADDR, (char *) &reuseToggle,
              sizeof(reuseToggle)) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error set reuse address", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
    }
}

void BaseSocket::setKeepAlive(int aliveToggle)
{
    try
    {
        if (setsockopt(mSocketId, SOL_SOCKET, SO_KEEPALIVE, (char *) &aliveToggle,
              sizeof(aliveToggle)) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error set keep alive", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
    }
}

void BaseSocket::setLingerSeconds(int seconds)
{
    struct linger lingerOption;

    if (seconds > 0)
    {
        lingerOption.l_linger = seconds;
        lingerOption.l_onoff = 1;
    }
    else
        lingerOption.l_onoff = 0;

    try
    {
        if (setsockopt(mSocketId, SOL_SOCKET, SO_LINGER, (char *) &lingerOption,
              sizeof(struct linger)) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error set linger seconds", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
    }
}

void BaseSocket::setLingerOnOff(bool lingerOn)
{
    struct linger lingerOption;

    if (lingerOn)
        lingerOption.l_onoff = 1;
    else
        lingerOption.l_onoff = 0;

    try
    {
        if (setsockopt(mSocketId, SOL_SOCKET, SO_LINGER, (char *) &lingerOption,
              sizeof(struct linger)) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error set linger on/off", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
    }
}

void BaseSocket::setSendBufferSize(int sendBufSize)
{
    if (setsockopt(mSocketId, SOL_SOCKET, SO_SNDBUF, (char *) &sendBufSize, sizeof(sendBufSize)) == -1)
    {
#ifdef UNIX
        throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error send buffer size", __FILE__, __LINE__);
#endif
    }
    updateSendBufferSize(sendBufSize);
}

void BaseSocket::setReceiveBufferSize(int receiveBufSize)
{
    if (setsockopt(mSocketId, SOL_SOCKET, SO_RCVBUF, (char *) &receiveBufSize, sizeof(receiveBufSize)) == -1)
    {
#ifdef UNIX
        throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error set receive buffer size", __FILE__, __LINE__);
#endif
    }
}

int BaseSocket::isSocketBlocking()
{
    return mBlocking;
}

void BaseSocket::setSocketBlocking(int blockingToggle)
{
    if (blockingToggle)
    {
        if (isSocketBlocking())
            return;
        else
            mBlocking = 1;
    }
    else
    {
        if (!isSocketBlocking())
            return;
        else
            mBlocking = 0;
    }

    try
    {
#ifdef UNIX
        int flags;
        if (-1 == (flags = fcntl(mSocketId, F_GETFL, 0)))
            flags = 0;

        if(mBlocking)
            fcntl(mSocketId, F_SETFL, flags & (~O_NONBLOCK));
        else
            fcntl(mSocketId, F_SETFL, flags | O_NONBLOCK);

        /*if (ioctl(socketId, FIONBIO, (char *) &blocking) == -1)
        {
            throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error set socke blocking");
        }*/
#endif
    }
    catch (SocketException& excp)
    {
        excp.response();
    }
}

int BaseSocket::getDebug()
{
    int myOption;
    int myOptionLen = sizeof(myOption);

    try
    {
        if (getsockopt(mSocketId, SOL_SOCKET, SO_DEBUG, (void *) &myOption, &myOptionLen) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error get debug", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
        return -1;
    }

    return myOption;
}

int BaseSocket::getReuseAddr()
{
    int myOption;
    int myOptionLen = sizeof(myOption);

    try
    {
        if (getsockopt(mSocketId, SOL_SOCKET, SO_REUSEADDR, (void *) &myOption, &myOptionLen) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error get reuse address", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
        return -1;
    }

    return myOption;
}

int BaseSocket::getKeepAlive()
{
    int myOption;
    int myOptionLen = sizeof(myOption);

    try
    {
        if (getsockopt(mSocketId, SOL_SOCKET, SO_KEEPALIVE, (void *) &myOption, &myOptionLen) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error get keep alive", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
        return -1;
    }
    return myOption;
}

int BaseSocket::getLingerSeconds()
{
    struct linger lingerOption;
    int myOptionLen = sizeof(struct linger);

    try
    {
        if (getsockopt(mSocketId, SOL_SOCKET, SO_LINGER, (void *) &lingerOption, &myOptionLen) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error get linger seconds", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
        return -1;
    }

    return lingerOption.l_linger;
}

bool BaseSocket::getLingerOnOff()
{
    struct linger lingerOption;
    int myOptionLen = sizeof(struct linger);

    try
    {
        if (getsockopt(mSocketId, SOL_SOCKET, SO_LINGER, (void *) &lingerOption, &myOptionLen) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error get linger on/off", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
    }

    if (lingerOption.l_onoff == 1)
        return true;
    else
        return false;
}

int BaseSocket::getSendBufferSize()
{
    int sendBuf;
    int myOptionLen = sizeof(sendBuf);

    try
    {
        if (getsockopt(mSocketId, SOL_SOCKET, SO_SNDBUF, (void *)&sendBuf, &myOptionLen) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error get send buffer size", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
        return -1;
    }
    return sendBuf;
}

int BaseSocket::getReceiveBufferSize()
{
    int rcvBuf;
    int myOptionLen = sizeof(rcvBuf);

    try
    {
        if (getsockopt(mSocketId, SOL_SOCKET, SO_RCVBUF, (void *) &rcvBuf, &myOptionLen) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error get receive buffer size", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
        return -1;
    }
    return rcvBuf;
}

ostream &operator<<(ostream& io, BaseSocket& s)
{
    string flagStr = "";

    io << endl;
    io << "Summary of socket settings:" << endl;
    io << "   Socket Id:     " << s.getSocketId() << endl;
    io << "   port #:        " << s.getPortNumber() << endl;
    io << "   debug:         " << (flagStr = s.getDebug() ? "true" : "false")
          << endl;
    io << "   reuse addr:    " << (flagStr = s.getReuseAddr() ? "true" : "false")
          << endl;
    io << "   keep alive:    " << (flagStr = s.getKeepAlive() ? "true" : "false")
          << endl;
    io << "   send buf size: " << s.getSendBufferSize() << endl;
    io << "   recv bug size: " << s.getReceiveBufferSize() << endl;
    io << "   blocking:      "
          << (flagStr = s.isSocketBlocking() ? "true" : "false") << endl;
    io << "   linger on:     "
          << (flagStr = s.getLingerOnOff() ? "true" : "false") << endl;
    io << "   linger seconds: " << s.getLingerSeconds() << endl;
    io << endl;
    return io;
}

void BaseSocket::close(void)
{
    ::close(mSocketId);
}

TCPSocket.cpp:


#ifdef UNIX
    #include <sys/socket.h>
    #include <sys/types.h>
    #include <sys/ioctl.h>
    #include <unistd.h>
    #include <fcntl.h>
    #include <errno.h>
#endif

#include <sstream>

#include "support/logging/log.h"
#include "support/exceptions/socket_exception.h"
#include "support/logging/simple_log.h"
#include "support/network/tcp_socket.h"

using namespace std;

const int MSG_HEADER_LEN = 6;

TCPSocket::TCPSocket()
: BaseSocket()
{
}

TCPSocket::TCPSocket(int portId)
: BaseSocket(portId)
{
}

TCPSocket::~TCPSocket()
{
}

void TCPSocket::initialize()
{
}

void TCPSocket::bindSocket()
{
    try
    {
        if (bind(mSocketId, (struct sockaddr *) &mClientAddr, sizeof(struct sockaddr_in)) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::BIND, 0, "Socket", "unix: error calling bind()", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
    }
}

void TCPSocket::connectToServer(string& serverNameOrAddr, hostType hType)
{
    /* 
     when this method is called, a client socket has been built already,
     so we have the socketId and portNumber ready.

     a HostInfo instance is created, no matter how the server's name is
     given (such as www.yuchen.net) or the server's address is given (such
     as 169.56.32.35), we can use this HostInfo instance to get the
     IP address of the server
     */

    HostInfo serverInfo(serverNameOrAddr, hType);

    // Store the IP address and socket port number
    struct sockaddr_in serverAddress;

    serverAddress.sin_family = AF_INET;
    serverAddress.sin_addr.s_addr = inet_addr(
          serverInfo.getHostIPAddress().c_str());
    serverAddress.sin_port = htons(mPortNumber);

    // Connect to the given address
    try
    {
        if (connect(mSocketId, (struct sockaddr *) &serverAddress, sizeof(serverAddress)) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::CONNECT, 0, "Socket", "unix: error calling connect()", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
    }
}

TCPSocket *TCPSocket::acceptClient(string& clientHost)
{
    int newSocket; // the new socket file descriptor returned by the accept system call

    // the length of the client's address
    int clientAddressLen = sizeof(struct sockaddr_in);
    struct sockaddr_in clientAddress;    // Address of the client that sent data

    // Accepts a new client connection and stores its socket file descriptor
    try
    {
        if ((newSocket = accept(mSocketId, (struct sockaddr *) &clientAddress, &clientAddressLen)) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::ACCEPT, 0, "Socket", "unix: error calling accept()", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
        return NULL;
    }

    // Get the host name given the address
    char *sAddress = inet_ntoa((struct in_addr) clientAddress.sin_addr);
    HostInfo clientInfo(sAddress, ADDRESS);
    clientHost += clientInfo.getHostName();

    // Create and return the new TCPSocket object
    TCPSocket* retSocket = new TCPSocket();
    retSocket->setSocketId(newSocket);
    return retSocket;
}

void TCPSocket::listenToClient(int totalNumPorts)
{
    try
    {
        if (listen(mSocketId, totalNumPorts) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::LISTEN, 0, "Socket", "unix: error calling listen()", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
    }
}

ostream &operator<<(ostream &oStream, const TCPSocket &oSocket)
{
    oStream << oSocket.mSocketId;
    return oStream;
}

int TCPSocket::send(SocketMessage const &oBuffer, int nSize)
{
    int numBytes;  // the number of bytes sent
    int error = errno;

    if(nSize == -1)
        nSize = oBuffer.size();

    if((unsigned int)nSize > oBuffer.size())
    {
        std::stringstream ss;
        ss << "Invalid Buffersize! Requested: " << (unsigned int)nSize << " Provided: " << oBuffer.size();
        std::string s;
        ss >> s;
        FILE_LOG(logERROR) << s;
        throw SocketException(this, SocketException::SEND, 0, "Socket", s, __FILE__, __LINE__);
    }

    // Sends the message to the connected host
    try
    {
        FILE_LOG(logDEBUG4) << "Sending on socket: "<< mSocketId << " bytes:" << nSize;
        numBytes = ::send(mSocketId, &oBuffer[0], nSize, 0);
        error = errno;
        FILE_LOG(logDEBUG4) << "Sent on socket: "<< mSocketId << " bytes:" << nSize << " errno: " << error;
        if(numBytes == -1)
        {
#ifdef UNIX
            if(error == EAGAIN || error == EWOULDBLOCK)
            {
                return -1;
            }
            else
                throw SocketException(this, SocketException::SEND, error, "Socket", "unix: error calling send()", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
    }

    return numBytes;
}

int TCPSocket::receive(SocketMessage &oMessage, int nBytes)
{
    int max = getSendBufferSize();

    if(nBytes != -1 && nBytes < max)
        max = nBytes;

    SocketMessage sb(max);
    int rcv_bytes = 0;
    int total = 0;
    int error = 0;

    while(1)
    {
        rcv_bytes = ::recv(getSocketId(), &sb[0], sb.size(), 0);
        error = errno;
        FILE_LOG(logDEBUG4) << "Received on socket: " << getSocketId() << " bytes: " << rcv_bytes << "  expected:" << sb.size() << " total: " << total << " errno: " << error;

        if(rcv_bytes == -1)
        {
            if(error == EAGAIN || error == EWOULDBLOCK)
                return total;

            throw SocketException(this, SocketException::RECEIVE, error, "Socket", "Client connection error!", __FILE__, __LINE__);
        }

        // Socket has been closed.
        if(rcv_bytes == 0)
            return total;

        total += rcv_bytes;
        oMessage.insert(oMessage.end(), sb.begin(), sb.begin()+rcv_bytes);
    }

    return total;
}

void TCPSocket::close(void)
{
    BaseSocket::close();
}
4

3 に答える 3

1

Nagle Algorithmがここで作動していませんか? TCP_NODELAYソケット オプションを設定して無効にしていない場合、一定量のデータ ( MSS ) が利用可能になるまでデータが送信されない可能性があります。

于 2013-05-14T17:45:14.543 に答える