5

Boostを使い始めたばかりです。非同期ソケットを使用してTCPクライアントサーバーを作成しています。

タスクは次のとおりです。

  1. クライアントはサーバーに番号を送信します
  2. クライアントは、サーバーの応答を受信する前に別のナブマーを送信できます。
  3. サーバーは数値を受信し、それを使用して計算を行い、結果をクライアントに送り返します。
  4. 複数のクライアントをサーバーに接続できます。

現在、次のように動作します

  • クライアントからサーバーに番号を送信します
  • サーバーは現在のスレッドで数値を受け取り、OnReceiveハンドラーで正しく計算します(これは悪いことですが、並列計算を行うために新しいスレッドを開始する方法)
  • サーバーは応答を送り返しますが、クライアントはすでに切断されています

クライアントがキーボードから数字を入力し、同時にサーバーからの応答を待つことができるようにするにはどうすればよいですか?

そして、なぜ私のクライアントはサーバーからの応答を待たないのですか?

クライアントコード:

using boost::asio::ip::tcp;

class TCPClient
{
    public:
        TCPClient(boost::asio::io_service& IO_Service, tcp::resolver::iterator EndPointIter);
        void Close();

    private:
        boost::asio::io_service& m_IOService;
        tcp::socket m_Socket;

        string m_SendBuffer;
        static const size_t m_BufLen = 100;
        char m_RecieveBuffer[m_BufLen*2];

        void OnConnect(const boost::system::error_code& ErrorCode, tcp::resolver::iterator EndPointIter);
        void OnReceive(const boost::system::error_code& ErrorCode);
        void OnSend(const boost::system::error_code& ErrorCode);
        void DoClose();
};

TCPClient::TCPClient(boost::asio::io_service& IO_Service, tcp::resolver::iterator EndPointIter)
: m_IOService(IO_Service), m_Socket(IO_Service), m_SendBuffer("")
{
    tcp::endpoint EndPoint = *EndPointIter;

    m_Socket.async_connect(EndPoint,
        boost::bind(&TCPClient::OnConnect, this, boost::asio::placeholders::error, ++EndPointIter));
}

void TCPClient::Close()
{
    m_IOService.post(
        boost::bind(&TCPClient::DoClose, this));
}
void TCPClient::OnConnect(const boost::system::error_code& ErrorCode, tcp::resolver::iterator EndPointIter)
{
    cout << "OnConnect..." << endl;
    if (ErrorCode == 0)
    {
        cin >> m_SendBuffer;
        cout << "Entered: " << m_SendBuffer << endl;
        m_SendBuffer += "\0";

        m_Socket.async_send(boost::asio::buffer(m_SendBuffer.c_str(),m_SendBuffer.length()+1),
            boost::bind(&TCPClient::OnSend, this,
            boost::asio::placeholders::error));
    } 
    else if (EndPointIter != tcp::resolver::iterator())
    {
        m_Socket.close();
        tcp::endpoint EndPoint = *EndPointIter;

        m_Socket.async_connect(EndPoint, 
            boost::bind(&TCPClient::OnConnect, this, boost::asio::placeholders::error, ++EndPointIter));
    }
}

void TCPClient::OnReceive(const boost::system::error_code& ErrorCode)
{
    cout << "receiving..." << endl;
    if (ErrorCode == 0)
    {
        cout << m_RecieveBuffer << endl;

        m_Socket.async_receive(boost::asio::buffer(m_RecieveBuffer, m_BufLen),
            boost::bind(&TCPClient::OnReceive, this, boost::asio::placeholders::error));
    } 
    else 
    {
        cout << "ERROR! OnReceive..." << endl;
        DoClose();
    }
}

void TCPClient::OnSend(const boost::system::error_code& ErrorCode)
{
    cout << "sending..." << endl;
    if (!ErrorCode)
    {
        cout << "\""<< m_SendBuffer <<"\" has been sent" << endl;
        m_SendBuffer = "";

        m_Socket.async_receive(boost::asio::buffer(m_RecieveBuffer, m_BufLen),
            boost::bind(&TCPClient::OnReceive, this, boost::asio::placeholders::error));
    }
    else
    {
        cout << "OnSend closing" << endl;
        DoClose();
    }

}

void TCPClient::DoClose()
{
    m_Socket.close();
}

int main()
{
    try 
    {
        cout << "Client is starting..." << endl;
        boost::asio::io_service IO_Service;

        tcp::resolver Resolver(IO_Service);

        string port = "13";
        tcp::resolver::query Query("127.0.0.1", port);

        tcp::resolver::iterator EndPointIterator = Resolver.resolve(Query);

        TCPClient Client(IO_Service, EndPointIterator);

        cout << "Client is started!" << endl;

        cout << "Enter a query string " << endl;

        boost::thread ClientThread(boost::bind(&boost::asio::io_service::run, &IO_Service));

        Client.Close();
        ClientThread.join();
    } 
    catch (exception& e)
    {
        cerr << e.what() << endl;
    }

    cout << "\nClosing";
    getch();
}

これがコンソールからの出力です

Client is starting...
Client is started!
OnConnect...
12
Entered: 12
sending...
"12" has been sent
receiving...
ERROR! OnReceive...

Closing

サーバー部分

class Session
{
    public:
        Session(boost::asio::io_service& io_service)
            : socket_(io_service)
        {
            dataRx[0] = '\0';
            dataTx[0] = '\0';
        }

        tcp::socket& socket()
        {
            return socket_;
        }

        void start()
        {
            socket_.async_read_some(boost::asio::buffer(dataRx, max_length),
                boost::bind(&Session::handle_read, this,
                boost::asio::placeholders::error,
                boost::asio::placeholders::bytes_transferred));
        }

        void handle_read(const boost::system::error_code& error, size_t bytes_transferred)
        {
            cout << "reading..." << endl;
            cout << "Data: " << dataRx << endl;

            if (!error)
            {
                if (!isValidData())
                {
                    cout << "Bad data!" << endl;
                    sprintf(dataTx, "Bad data!\0");
                    dataRx[0] = '\0';
                }
                else
                {
                    sprintf(dataTx, getFactorization().c_str());
                    dataRx[0] = '\0';
                }

                boost::asio::async_write(socket_,
                    boost::asio::buffer(dataTx, max_length*2),
                    boost::bind(&Session::handle_write, this,
                    boost::asio::placeholders::error));
            }
            else
            {
                delete this;
            }
        }

        void handle_write(const boost::system::error_code& error)
        {
            cout << "writing..." << endl;
            if (!error)
            {
                cout << "dataTx sent: " << dataTx << endl;
                dataTx[0] = '\0';

                socket_.async_read_some(boost::asio::buffer(dataRx, max_length),
                    boost::bind(&Session::handle_read, this,
                    boost::asio::placeholders::error,
                    boost::asio::placeholders::bytes_transferred));
            }
            else
            {
                delete this;
            }
        }

        string getFactorization() const
        {
            //Do something
        }

        bool isValidData()
        {
            locale loc; 
            for (int i = 0; i < strlen(dataRx); i++)
                if (!isdigit(dataRx[i],loc))
                    return false;

            return true;
        }

    private:
        tcp::socket socket_;
        static const size_t max_length = 100;
        char dataRx[max_length];
        char dataTx[max_length*2];
};

class Server
{
    public:
        Server(boost::asio::io_service& io_service, short port)
            : io_service_(io_service),
            acceptor_(io_service, tcp::endpoint(tcp::v4(), port))
        {
            Session* new_session = new Session(io_service_);
            acceptor_.async_accept(new_session->socket(),
                boost::bind(&Server::handle_accept, this, new_session,
                boost::asio::placeholders::error));
        }

        void handle_accept(Session* new_session, const boost::system::error_code& error)
        {
            if (!error)
            {
                new_session->start();
                new_session = new Session(io_service_);
                acceptor_.async_accept(new_session->socket(),
                    boost::bind(&Server::handle_accept, this, new_session,
                    boost::asio::placeholders::error));
            }
            else
            {
                delete new_session;
            }
        }

    private:
        boost::asio::io_service& io_service_;
        tcp::acceptor acceptor_;
};

int main(int argc, char* argv[])
{
    cout << "Server is runing..." << endl;
    try
    {
        boost::asio::io_service io_service;

        int port = 13;
        Server s(io_service, port);
        cout << "Server is run!" << endl;
        io_service.run();
    }
    catch (boost::system::error_code& e)
    {
        std::cerr << e << "\n";
    }
    catch (std::exception& e)
    {
        std::cerr << "Exception: " << e.what() << "\n";
    }

    return 0;
}

サーバーの出力

Server is runing...
Server is run!
reading...
Data: 12
writing...
dataTx sent: 13    //just send back received ++number
reading...
Data:

あなたの助けは非常に高く評価されます

========

追加した

わかりました。しかし、ErrorCode == boost :: asio :: error :: eofが機能しないことを確認してください...私は何を間違えましたか?

else if (ErrorCode == boost::asio::error::eof)
{
    cout << "boost::asio::error::eof in OnReceive!" << endl;
}
else 
{
    cout << "ERROR! OnReceive..." << ErrorCode << endl;
    DoClose();
}

プリントアウトはERROR! OnReceive...system:10009私の比較が間違っているようです

========

追加した

根本的な原因を見つけました。私はasync_receive(の代わりにasync_read_some)使用法を述べ、行をに交換しましmain

ClientThread.join();
Client.Close();

今では正常に動作します!

現在、ソケットとの間で同時にデータの読み取りと書き込みを行おうとしています(サーバーからの応答を受信する前に、クライアントが追加の要求を送信できる必要があるためです。

OnConnect関数では、ブーストスレッドを作成します。

boost::thread addMsgThread(boost::bind(&TCPClient::addMsgLoop, this));
boost::thread receivingThread(boost::bind(&TCPClient::startReceiving, this));
boost::thread sendingThread(boost::bind(&TCPClient::startSending, this));

補充あり

void TCPClient::startReceiving()
{
    cout << "receiving..." << endl;
    m_RecieveBuffer[0] = '\0';
    m_Socket.async_receive(boost::asio::buffer(m_RecieveBuffer, m_BufLen),
        boost::bind(&TCPClient::receivingLoop, this, boost::asio::placeholders::error)); //runtime error here
    cout << "m_RecieveBuffer = " << m_RecieveBuffer << endl;
}

void TCPClient::receivingLoop(const boost::system::error_code& ErrorCode)
{
    cout << "receiving..." << endl;
    if (ErrorCode == 0)
    {
        cout << "m_RecieveBuffer = " << m_RecieveBuffer << endl;

        m_Socket.async_receive(boost::asio::buffer(m_RecieveBuffer, m_BufLen),
            boost::bind(&TCPClient::receivingLoop, this, boost::asio::placeholders::error));
    }
    else 
    {
        cout << "ERROR! receivingLoop..." << ErrorCode << endl;
        DoClose();
    }
}

void TCPClient::addMsgLoop()
{
    while (true)
    {
        string tmp;
        cin >> tmp;

        cout << "Entered: " << tmp << endl;
        tmp += "\0";

        try
        {
            msgQueue.push(tmp);
        }
        catch(exception &e)
        {
            cerr << "Canno add msg to send queue... " << e.what() << endl;
        }
    }
}

receive問題はスレッドとスレッドの両方で同じsendです:ランタイムエラー(ブーストライブラリのどこかにアクセス違反を書き込んでいます)。

void TCPClient::startReceiving()
{
     ...
     m_Socket.async_receive(); //runtime error here
}

以降のバージョンでは、すべて正常に機能します(ただし、回答前に複数の送信を実装する方法がわかりません)。誰かが問題を修正する方法や別の方法でこれを実装する方法を教えてもらえますか?プーリングが役立つかもしれませんが、今ではそれが良い方法であると確信しています。

4

1 に答える 1

3

boost :: asio :: ip :: tcp :: socket :: async_read_someは、その名前が示すように、完全なデータを読み取ることが保証されているわけではありません。クライアントが書き込みを終了したときにerrorオブジェクトを設定します。boost::asio::error::eof

あなたが得ているエラーはこれが原因です:

サーバー部分

        if (!error)
        {
            ...
        }
        else
        {
            delete this;
        }

ブロックでは、これelseがエラーケースであると想定し、接続を閉じています。これは常に当てはまるわけではありません。elseを確認する必要がある前にerror == boost::asio::error::eof

読み取りハンドラーでのこれとは別に、ヒットするまでバッファーで読み取られたものをすべて収集し続ける必要がありますerror == boost::asio::error::eof。その場合にのみ、読み取りデータを検証してクライアントに書き戻す必要があります。

例のセクションでHTTPサーバー1、2、3実装を見てください

更新:更新された質問への回答

更新されたコードでスレッド同期の問題が発生しました。

  1. msgQueueロックなしで2つ以上のスレッドから同時にアクセスされます。
  2. 同じソケットでの読み取りと書き込みを同時に呼び出すことができます。

私があなたの問題を正しく理解した場合、あなたは次のことをしたいと思います:

  1. ユーザー入力を受け取り、それをサーバーに送信します。
  2. サーバーの応答を同時に受信し続けます。

2つのタスクに2つのboost::asio :: io_service::strandsを使用できます。Asioを使用する場合、ストランドはタスクを同期する方法です。Asioは、ストランドに投稿されたタスクが同期的に実行されるようにします。

  1. 次のようなタスクをstrand1投稿します。sendread_user_input -> send_to_server -> handle_send -> read_user_input

  2. 次のようなタスクをstrand2投稿します。readread_some -> handle_read -> read_some

これによりmsgQueue、2つのスレッドから同時にアクセスされないようになります。サーバーへの読み取りと書き込みに2つのソケットを使用して、同じソケットで読み取りと書き込みが同時に呼び出されないようにします。

于 2012-10-21T07:52:06.007 に答える