ここで何がうまくいかないのか本当に理解できないので、誰かが私が見逃したものを見つけてくれることを願っています.
Javaで開発したクライアントを受け入れるユーザーデーモンを書いています。今のところ、このクライアントはユーザー名パスワードのみを接続して送信します。私はcygwinの下でコードを開発しましたが、そこで動作します。デーモンが自己紹介を送信し、次にクライアントがユーザー名とパスワードを送信すると、デーモンはクライアントを切断するか、OK を送信して応答します (まだ行われていません)。
cygwin を使用してこれをテストすると、ローカルホストで動作します。コードを HPUX に移植したところ、クライアントは接続でき、デーモンからの導入も受信できます。クライアントがユーザー名とパスワードを送信すると、もう機能しません。デーモンは 1 バイトしか受信せず、再度読み取ろうとすると、EAGAIN の結果として -1 が返され、他には何も返されません。クライアントにはエラーは表示されず、デーモン側にもエラーはありません。gdb を使用してコードをステップ実行すると、メッセージが完全に受信されます。:(
私が使用するコードはこれです。さらに情報が必要な場合は、追加できます。
int TCPSocket::receive(SocketMessage &oMessage, int nBytes)
{
int max = getSendBufferSize();
if(nBytes != -1 && nBytes < max)
max = nBytes;
SocketMessage sb(max);
int rcv_bytes = 0;
int total = 0;
int error = 0;
while(1)
{
rcv_bytes = ::recv(getSocketId(), &sb[0], sb.size(), 0);
error = errno;
FILE_LOG(logDEBUG4) << "Received on socket: " << mSocketId << " bytes: " << rcv_bytes << " expected:" << sb.size() << " total: " << total << " errno: " << error;
if(rcv_bytes == -1)
{
if(error == EAGAIN || error == EWOULDBLOCK)
return total;
throw SocketException(this, SocketException::RECEIVE, error, "Socket", "Client connection error!", __FILE__, __LINE__);
}
//if(rcv_bytes == 0)
// throw SocketException(this, SocketException::RECEIVE, error, "Socket", "Client connection has been closed!");
total += rcv_bytes;
oMessage.insert(oMessage.end(), sb.begin(), sb.begin()+rcv_bytes);
}
return total;
}
ログの出力は次のとおりです。
16:16:04.391 DEBUG4: Received on socket: 4 bytes: 1 expected:32768 total: 0 errno: 2
16:16:04.391 DEBUG4: Received on socket: 4 bytes: -1 expected:32768 total: 1 errno: 11
残りの 30 バイトはどこにあり、なぜ返されないのでしょうか?
アップデート
これは、実際にデータを受け取るコードの一部にすぎません。ソケット クラス自体は、プロトコルなしで raw ソケットのみを処理します。プロトコルは別のクラスで実装されます。この受信関数は、ネットワーク上で利用可能なすべてのバイトを取得し、それをバッファー (SocketMessage) に入れることになっています。バイト数が複数のメッセージであるか、単一のメッセージの一部であるかは問題ではありません。これは、制御クラスが (おそらく) 部分的なメッセージ ストリームから実際のメッセージ ブロックを構築するためです。最初の呼び出しが 1 バイトしか受信しないことは問題ではありません。メッセージが完了していない場合、呼び出し元はさらにデータが到着してメッセージが完了するまでループ内で待機するためです。複数のクライアントが存在する可能性があるため、非ブロッキング ソケットを使用しています。個別のスレッドを扱いたくありませんでした。私のサーバーは接続を多重化しています。ここでの問題は、受信が 1 バイトしか受信しないのに、もっと多くのバイトが必要であることを知っていることです。エラーコード EAGAIN が処理され、次に受信が開始されると、さらにバイトが取得されます。ネットワークが 1 バイトしか送信しなかったとしても、残りのメッセージは次に到着するはずですが、そうではありません。あたかもそこに何もないかのようにデータ ブロックを受信するためにソケットを待機する選択。dbg で同じコードを実行してステップ実行すると、機能します。同じクライアントに再度接続すると、突然、より多くのバイトが受信されます。localhost を使用して cygwin で同じコードを使用すると、正常に動作します。エラーコード EAGAIN が処理され、次に受信が開始されると、さらにバイトが取得されます。ネットワークが 1 バイトしか送信しなかったとしても、残りのメッセージは次に到着するはずですが、そうではありません。あたかもそこに何もないかのようにデータ ブロックを受信するためにソケットを待機する選択。dbg で同じコードを実行してステップ実行すると、機能します。同じクライアントに再度接続すると、突然、より多くのバイトが受信されます。localhost を使用して cygwin で同じコードを使用すると、正常に動作します。エラーコード EAGAIN が処理され、次に受信が開始されると、さらにバイトが取得されます。ネットワークが 1 バイトしか送信しなかったとしても、残りのメッセージは次に到着するはずですが、そうではありません。あたかもそこに何もないかのようにデータ ブロックを受信するためにソケットを待機する選択。dbg で同じコードを実行してステップ実行すると、機能します。同じクライアントに再度接続すると、突然、より多くのバイトが受信されます。localhost を使用して cygwin で同じコードを使用すると、正常に動作します。同じクライアントに再度接続すると、突然、より多くのバイトが受信されます。localhost を使用して cygwin で同じコードを使用すると、正常に動作します。同じクライアントに再度接続すると、突然、より多くのバイトが受信されます。localhost を使用して cygwin で同じコードを使用すると、正常に動作します。
アップデート
これが完全なコードです。
Main.cpp
mServerSocket = new TCPSocket(getListeningPort());
mServerSocket->bindSocket();
mServerSocket->setSocketBlocking(false);
mServerSocket->listenToClient(0);
setupSignals();
while(isSIGTERM() == false)
{
try
{
max = prepareListening();
//memset(&ts, 0, sizeof(struct timespec));
pret = pselect(max+1, &mReaders, &mWriters, &mExceptions, NULL, &mSignalMask);
error_code = errno;
if(pret == 0)
{
// Timeout occured, but we are not interested in that for now.
// Currently this shouldn't happen anyway.
continue;
}
processRequest(pret, error_code);
}
catch (SocketException &excp)
{
removeClientConnection(findClientConnection(excp.getTCPSocket()));
}
} // while sigTERM
BaseSocket.cpp:
#ifdef UNIX
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/ioctl.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#endif
#include "support/exceptions/socket_exception.h"
#include "support/logging/simple_log.h"
#include "support/network/base_socket.h"
using namespace std;
BaseSocket::BaseSocket(void)
{
mSocketId = -1;
mSendBufferSize = MAX_SEND_LEN;
}
BaseSocket::BaseSocket(int pNumber)
{
mSocketId = -1;
mPortNumber = pNumber;
mBlocking = 1;
mSendBufferSize = MAX_SEND_LEN;
try
{
if ((mSocketId = ::socket(AF_INET, SOCK_STREAM, 0)) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::CONSTRUCTOR, errno, "Socket", "unix: error in socket constructor", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
}
/*
set the initial address of client that shall be communicated with to
any address as long as they are using the same port number.
The clientAddr structure is used in the future for storing the actual
address of client applications with which communication is going
to start
*/
mClientAddr.sin_family = AF_INET;
mClientAddr.sin_addr.s_addr = htonl(INADDR_ANY);
mClientAddr.sin_port = htons(mPortNumber);
updateSendBufferSize(MAX_SEND_LEN);
}
void BaseSocket::updateSendBufferSize(int nNewSize)
{
mSendBufferSize = getSendBufferSize();
if(mSendBufferSize > nNewSize)
mSendBufferSize = nNewSize;
}
BaseSocket::~BaseSocket(void)
{
close();
}
void BaseSocket::setSocketId(int socketFd)
{
mSocketId = socketFd;
}
int BaseSocket::getSocketId()
{
return mSocketId;
}
// returns the port number
int BaseSocket::getPortNumber()
{
return mPortNumber;
}
void BaseSocket::setDebug(int debugToggle)
{
try
{
if (setsockopt(mSocketId, SOL_SOCKET, SO_DEBUG, (char *) &debugToggle, sizeof(debugToggle)) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error set debug", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
}
}
void BaseSocket::setReuseAddr(int reuseToggle)
{
try
{
if (setsockopt(mSocketId, SOL_SOCKET, SO_REUSEADDR, (char *) &reuseToggle,
sizeof(reuseToggle)) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error set reuse address", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
}
}
void BaseSocket::setKeepAlive(int aliveToggle)
{
try
{
if (setsockopt(mSocketId, SOL_SOCKET, SO_KEEPALIVE, (char *) &aliveToggle,
sizeof(aliveToggle)) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error set keep alive", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
}
}
void BaseSocket::setLingerSeconds(int seconds)
{
struct linger lingerOption;
if (seconds > 0)
{
lingerOption.l_linger = seconds;
lingerOption.l_onoff = 1;
}
else
lingerOption.l_onoff = 0;
try
{
if (setsockopt(mSocketId, SOL_SOCKET, SO_LINGER, (char *) &lingerOption,
sizeof(struct linger)) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error set linger seconds", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
}
}
void BaseSocket::setLingerOnOff(bool lingerOn)
{
struct linger lingerOption;
if (lingerOn)
lingerOption.l_onoff = 1;
else
lingerOption.l_onoff = 0;
try
{
if (setsockopt(mSocketId, SOL_SOCKET, SO_LINGER, (char *) &lingerOption,
sizeof(struct linger)) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error set linger on/off", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
}
}
void BaseSocket::setSendBufferSize(int sendBufSize)
{
if (setsockopt(mSocketId, SOL_SOCKET, SO_SNDBUF, (char *) &sendBufSize, sizeof(sendBufSize)) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error send buffer size", __FILE__, __LINE__);
#endif
}
updateSendBufferSize(sendBufSize);
}
void BaseSocket::setReceiveBufferSize(int receiveBufSize)
{
if (setsockopt(mSocketId, SOL_SOCKET, SO_RCVBUF, (char *) &receiveBufSize, sizeof(receiveBufSize)) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error set receive buffer size", __FILE__, __LINE__);
#endif
}
}
int BaseSocket::isSocketBlocking()
{
return mBlocking;
}
void BaseSocket::setSocketBlocking(int blockingToggle)
{
if (blockingToggle)
{
if (isSocketBlocking())
return;
else
mBlocking = 1;
}
else
{
if (!isSocketBlocking())
return;
else
mBlocking = 0;
}
try
{
#ifdef UNIX
int flags;
if (-1 == (flags = fcntl(mSocketId, F_GETFL, 0)))
flags = 0;
if(mBlocking)
fcntl(mSocketId, F_SETFL, flags & (~O_NONBLOCK));
else
fcntl(mSocketId, F_SETFL, flags | O_NONBLOCK);
/*if (ioctl(socketId, FIONBIO, (char *) &blocking) == -1)
{
throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error set socke blocking");
}*/
#endif
}
catch (SocketException& excp)
{
excp.response();
}
}
int BaseSocket::getDebug()
{
int myOption;
int myOptionLen = sizeof(myOption);
try
{
if (getsockopt(mSocketId, SOL_SOCKET, SO_DEBUG, (void *) &myOption, &myOptionLen) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error get debug", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
return -1;
}
return myOption;
}
int BaseSocket::getReuseAddr()
{
int myOption;
int myOptionLen = sizeof(myOption);
try
{
if (getsockopt(mSocketId, SOL_SOCKET, SO_REUSEADDR, (void *) &myOption, &myOptionLen) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error get reuse address", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
return -1;
}
return myOption;
}
int BaseSocket::getKeepAlive()
{
int myOption;
int myOptionLen = sizeof(myOption);
try
{
if (getsockopt(mSocketId, SOL_SOCKET, SO_KEEPALIVE, (void *) &myOption, &myOptionLen) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error get keep alive", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
return -1;
}
return myOption;
}
int BaseSocket::getLingerSeconds()
{
struct linger lingerOption;
int myOptionLen = sizeof(struct linger);
try
{
if (getsockopt(mSocketId, SOL_SOCKET, SO_LINGER, (void *) &lingerOption, &myOptionLen) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error get linger seconds", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
return -1;
}
return lingerOption.l_linger;
}
bool BaseSocket::getLingerOnOff()
{
struct linger lingerOption;
int myOptionLen = sizeof(struct linger);
try
{
if (getsockopt(mSocketId, SOL_SOCKET, SO_LINGER, (void *) &lingerOption, &myOptionLen) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error get linger on/off", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
}
if (lingerOption.l_onoff == 1)
return true;
else
return false;
}
int BaseSocket::getSendBufferSize()
{
int sendBuf;
int myOptionLen = sizeof(sendBuf);
try
{
if (getsockopt(mSocketId, SOL_SOCKET, SO_SNDBUF, (void *)&sendBuf, &myOptionLen) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error get send buffer size", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
return -1;
}
return sendBuf;
}
int BaseSocket::getReceiveBufferSize()
{
int rcvBuf;
int myOptionLen = sizeof(rcvBuf);
try
{
if (getsockopt(mSocketId, SOL_SOCKET, SO_RCVBUF, (void *) &rcvBuf, &myOptionLen) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error get receive buffer size", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
return -1;
}
return rcvBuf;
}
ostream &operator<<(ostream& io, BaseSocket& s)
{
string flagStr = "";
io << endl;
io << "Summary of socket settings:" << endl;
io << " Socket Id: " << s.getSocketId() << endl;
io << " port #: " << s.getPortNumber() << endl;
io << " debug: " << (flagStr = s.getDebug() ? "true" : "false")
<< endl;
io << " reuse addr: " << (flagStr = s.getReuseAddr() ? "true" : "false")
<< endl;
io << " keep alive: " << (flagStr = s.getKeepAlive() ? "true" : "false")
<< endl;
io << " send buf size: " << s.getSendBufferSize() << endl;
io << " recv bug size: " << s.getReceiveBufferSize() << endl;
io << " blocking: "
<< (flagStr = s.isSocketBlocking() ? "true" : "false") << endl;
io << " linger on: "
<< (flagStr = s.getLingerOnOff() ? "true" : "false") << endl;
io << " linger seconds: " << s.getLingerSeconds() << endl;
io << endl;
return io;
}
void BaseSocket::close(void)
{
::close(mSocketId);
}
TCPSocket.cpp:
#ifdef UNIX
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/ioctl.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#endif
#include <sstream>
#include "support/logging/log.h"
#include "support/exceptions/socket_exception.h"
#include "support/logging/simple_log.h"
#include "support/network/tcp_socket.h"
using namespace std;
const int MSG_HEADER_LEN = 6;
TCPSocket::TCPSocket()
: BaseSocket()
{
}
TCPSocket::TCPSocket(int portId)
: BaseSocket(portId)
{
}
TCPSocket::~TCPSocket()
{
}
void TCPSocket::initialize()
{
}
void TCPSocket::bindSocket()
{
try
{
if (bind(mSocketId, (struct sockaddr *) &mClientAddr, sizeof(struct sockaddr_in)) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::BIND, 0, "Socket", "unix: error calling bind()", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
}
}
void TCPSocket::connectToServer(string& serverNameOrAddr, hostType hType)
{
/*
when this method is called, a client socket has been built already,
so we have the socketId and portNumber ready.
a HostInfo instance is created, no matter how the server's name is
given (such as www.yuchen.net) or the server's address is given (such
as 169.56.32.35), we can use this HostInfo instance to get the
IP address of the server
*/
HostInfo serverInfo(serverNameOrAddr, hType);
// Store the IP address and socket port number
struct sockaddr_in serverAddress;
serverAddress.sin_family = AF_INET;
serverAddress.sin_addr.s_addr = inet_addr(
serverInfo.getHostIPAddress().c_str());
serverAddress.sin_port = htons(mPortNumber);
// Connect to the given address
try
{
if (connect(mSocketId, (struct sockaddr *) &serverAddress, sizeof(serverAddress)) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::CONNECT, 0, "Socket", "unix: error calling connect()", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
}
}
TCPSocket *TCPSocket::acceptClient(string& clientHost)
{
int newSocket; // the new socket file descriptor returned by the accept system call
// the length of the client's address
int clientAddressLen = sizeof(struct sockaddr_in);
struct sockaddr_in clientAddress; // Address of the client that sent data
// Accepts a new client connection and stores its socket file descriptor
try
{
if ((newSocket = accept(mSocketId, (struct sockaddr *) &clientAddress, &clientAddressLen)) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::ACCEPT, 0, "Socket", "unix: error calling accept()", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
return NULL;
}
// Get the host name given the address
char *sAddress = inet_ntoa((struct in_addr) clientAddress.sin_addr);
HostInfo clientInfo(sAddress, ADDRESS);
clientHost += clientInfo.getHostName();
// Create and return the new TCPSocket object
TCPSocket* retSocket = new TCPSocket();
retSocket->setSocketId(newSocket);
return retSocket;
}
void TCPSocket::listenToClient(int totalNumPorts)
{
try
{
if (listen(mSocketId, totalNumPorts) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::LISTEN, 0, "Socket", "unix: error calling listen()", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
}
}
ostream &operator<<(ostream &oStream, const TCPSocket &oSocket)
{
oStream << oSocket.mSocketId;
return oStream;
}
int TCPSocket::send(SocketMessage const &oBuffer, int nSize)
{
int numBytes; // the number of bytes sent
int error = errno;
if(nSize == -1)
nSize = oBuffer.size();
if((unsigned int)nSize > oBuffer.size())
{
std::stringstream ss;
ss << "Invalid Buffersize! Requested: " << (unsigned int)nSize << " Provided: " << oBuffer.size();
std::string s;
ss >> s;
FILE_LOG(logERROR) << s;
throw SocketException(this, SocketException::SEND, 0, "Socket", s, __FILE__, __LINE__);
}
// Sends the message to the connected host
try
{
FILE_LOG(logDEBUG4) << "Sending on socket: "<< mSocketId << " bytes:" << nSize;
numBytes = ::send(mSocketId, &oBuffer[0], nSize, 0);
error = errno;
FILE_LOG(logDEBUG4) << "Sent on socket: "<< mSocketId << " bytes:" << nSize << " errno: " << error;
if(numBytes == -1)
{
#ifdef UNIX
if(error == EAGAIN || error == EWOULDBLOCK)
{
return -1;
}
else
throw SocketException(this, SocketException::SEND, error, "Socket", "unix: error calling send()", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
}
return numBytes;
}
int TCPSocket::receive(SocketMessage &oMessage, int nBytes)
{
int max = getSendBufferSize();
if(nBytes != -1 && nBytes < max)
max = nBytes;
SocketMessage sb(max);
int rcv_bytes = 0;
int total = 0;
int error = 0;
while(1)
{
rcv_bytes = ::recv(getSocketId(), &sb[0], sb.size(), 0);
error = errno;
FILE_LOG(logDEBUG4) << "Received on socket: " << getSocketId() << " bytes: " << rcv_bytes << " expected:" << sb.size() << " total: " << total << " errno: " << error;
if(rcv_bytes == -1)
{
if(error == EAGAIN || error == EWOULDBLOCK)
return total;
throw SocketException(this, SocketException::RECEIVE, error, "Socket", "Client connection error!", __FILE__, __LINE__);
}
// Socket has been closed.
if(rcv_bytes == 0)
return total;
total += rcv_bytes;
oMessage.insert(oMessage.end(), sb.begin(), sb.begin()+rcv_bytes);
}
return total;
}
void TCPSocket::close(void)
{
BaseSocket::close();
}