1

を使用してマルチスレッドサーバーを作成していboost::asioます。非同期の読み取りと書き込みを使用する X スレッドのプールがあります (この例に基づく)。

サーバー構造は次のようになります。

サーバ

プログラムのスレッドを管理し、async_accept新しいクライアントごとに新しいセッションの作成を開始します。

セッション

クライアント自体を表します。refを取得し、彼女(クライアント)からs とssocketを管理します。タイムアウト管理もあります。async_readasync_writesocket


クライアント (ハードウェア デバイス) がフリーズし、サーバーに応答がないことがあります。これを解決するために、 (この例async_waitのように) withの使用について読み、それを自分のソフトウェアに適用しましたが、奇妙なことが起こりました:deadline_timer

通常の切断が発生すると、async操作はキャンセルされ (operation_aborted エラーに到達)、Sessionオブジェクトは破棄されます。しかし、デバイスがフリーズすると、ソケットは閉じられますが、Session オブジェクトは破棄されず、インスタンスはsocket.close()既に呼び出されていてもメモリ内に残ります。

コードを簡略化して以下に示します。

server.h

    class Server
    {
    private:
        boost::asio::io_service& _io_service;
        boost::asio::ip::tcp::acceptor* _acceptor;
        boost::asio::ip::tcp::endpoint* _endpoint;

        boost::asio::signal_set _signals;

        Session_SP _session;
    public:
        Server(boost::asio::io_service& io_service);
        ~Server();


        /**
         * Queues async accept action.
         */
        virtual void queueAccept();

        /**
         * Async accept handler.
         */
        virtual void handleAccept(const boost::system::error_code& error);

        /**
         * Start the server
         */
        virtual void run();

        boost::asio::io_service& getIOService();

        /**
         * Shutdown the service
         */
        virtual void shutdown();
    };

サーバー.cpp

#include "server.hpp"

Server::Server(boost::asio::io_service& io_service):
    _io_service(io_service), _signals(io_service)
{
    this->_endpoint = new boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), config.getServer().port);

    this->_acceptor = new boost::asio::ip::tcp::acceptor(io_service);

    this->_acceptor->open(boost::asio::ip::tcp::v4());
    this->_acceptor->bind(*this->_endpoint);
    this->_acceptor->listen();

    this->_signals.add(SIGINT);
    this->_signals.add(SIGTERM);
#if defined(SIGQUIT)
    this->_signals_.add(SIGQUIT);
#endif // defined(SIGQUIT)

    this->_signals.async_wait(boost::bind(&Server::shutdown, this));

    this->queueAccept();
}

Server::~Server()
{
    delete this->_acceptor;
    delete this->_endpoint;
}

void Server::queueAccept()
{
    this->_session.reset(new Session(*this));

    _acceptor->async_accept(
        this->_session->getSocket(),
        boost::bind(
            &Server::handleAccept,
            this,
            boost::asio::placeholders::error
        )
    );
}

void Server::handleAccept(const boost::system::error_code& error)
{
    if (!error)
        this->_session->start();

    this->queueAccept();
}

boost::asio::io_service& Server::getIOService()
{
    return this->_io_service;
}

void Server::shutdown()
{
    this->_io_service.stop();
}

session.h

    class Session:
        public boost::enable_shared_from_this<Session>
    {
    public:
        Session(Server& server);
        ~Session();

        bool stopped() const;

        virtual void start();

        virtual boost::asio::ip::tcp::socket& getSocket();

        virtual void disconnect();

        /**
         * Async read handler
         */
        void handleRead(const boost::system::error_code& error, size_t bytesTransfered);

        /**
         * Async write handler
         */
        void handleWrite(const boost::system::error_code& error);

        /**
         * Queues write action.
         */
        void queueWrite();

        /**
         * Push a packet to be sent on queue end
         */
        void pushPacket(protocols::SendPacket &packet);

        void handleDeadlineAsyncWait(boost::asio::deadline_timer* deadline);

        void handleDeadlineAsyncWaitKillConnection(boost::asio::deadline_timer* deadline);

    private:
        Server& _server;

        boost::asio::ip::tcp::socket _socket;

        boost::asio::io_service* _ioService;

        boost::asio::io_service::strand _strand;

        boost::asio::deadline_timer _input_deadline;

        boost::asio::deadline_timer _non_empty_output_queue;

        /**
         * Queue that stores the packet to be sent.
         */
        protocols::SendPacketQueue _writeQueue;

        /**
         * Referência do pacote que será atualizado.
         */
        Protocol* _protocol;

        /**
         * Queues the async_read acction.
         */
        virtual void queueRead();

        virtual void _pushPacket(protocols::SendPacket &packet);
    };

    typedef boost::shared_ptr<Session> Session_SP;

セッション.cpp

#include "session.hpp"

Session::Session(Server& server):
    _server(server), _socket(server.getIOService()), _protocol(NULL),
    _ioService(&server.getIOService()), _strand(server.getIOService()),
    _input_deadline(server.getIOService()),
    _non_empty_output_queue(server.getIOService())
{

    this->_input_deadline.expires_at(boost::posix_time::pos_infin);
    this->_non_empty_output_queue.expires_at(boost::posix_time::pos_infin);
}

Session::~Session()
{
}

bool Session::stopped() const
{
    return !_socket.is_open();
}

boost::asio::ip::tcp::socket& Session::getSocket()
{
    return this->_socket;
}

void Session::disconnect()
{
    this->_input_deadline.cancel();
    this->_non_empty_output_queue.cancel();
    try
    {
        this->getSocket().close();
        LOG("Session::disconnect : close successful!");
    }
    catch (void* e)
    {
        // Never reached here!!
    }
}

void Session::queueRead()
{
    this->_input_deadline.expires_from_now(boost::posix_time::seconds(30));

    boost::asio::async_read_until(
        _socket,
        _buffer,
        "\x004", // Just a test
        this->_strand.wrap(boost::bind(
            &Session::handleRead,
            this->shared_from_this(),
            boost::asio::placeholders::error,
            boost::asio::placeholders::bytes_transferred
        ))
    );
}

void Session::start()
{
    this->queueRead();

    this->_input_deadline.async_wait(
        this->_strand.wrap(boost::bind(
            &Session::handleDeadlineAsyncWait,
            shared_from_this(),
            &this->_input_deadline
        ))
    );

    this->queueWrite();
}

void Session::handleRead(const boost::system::error_code& error, size_t bytesTransfered)
{
    if (this->stopped())
        return;

    if (!error)
    {
        // ... a lot of code here, but isn't important
    }
    else if (error != boost::asio::error::operation_aborted)
        this->disconnect();
}

void Session::handleWrite(const boost::system::error_code& error)
{
    if (this->stopped())
        return;

    if (!error)
    {
        this->_writeQueue.pop_front(); // Dequeue
        this->queueWrite();
    }
    else
    {
        if (error != boost::asio::error::operation_aborted)
            this->disconnect();
    }
}

void Session::queueWrite()
{
    if (this->stopped())
        return;

    if (this->_writeQueue.empty())
    {
        this->_non_empty_output_queue.expires_at(boost::posix_time::pos_infin);
        this->_non_empty_output_queue.async_wait(
            boost::bind(&Session::queueWrite, shared_from_this())
        );
    }
    else
    {
        this->_input_deadline.expires_from_now(boost::posix_time::seconds(this->_server.getConfig().getServer().timeout));

        boost::asio::async_write(
            this->getSocket(),
            boost::asio::buffer(
                this->_writeQueue.front().getData(),
                this->_writeQueue.front().getDataSize()
            ),
            this->_strand.wrap(boost::bind(
                &Session::handleWrite,
                this,
                boost::asio::placeholders::error
            ))
        );
    }
}

void Session::handleDeadlineAsyncWait(boost::asio::deadline_timer* deadline)
{
    if (this->stopped())
        return;

    if (deadline->expires_at() <= boost::asio::deadline_timer::traits_type::now())
    {
        boost::system::error_code sdEc;
        this->getSocket().shutdown(boost::asio::ip::tcp::socket::shutdown_send, sdEc);

        deadline->expires_from_now(boost::posix_time::seconds(15));

        deadline->async_wait(
            this->_strand.wrap(boost::bind(
                &Session::handleDeadlineAsyncWaitKillConnection,
                shared_from_this(),
                deadline
            ))
        );

    }
    else
    {
        deadline->async_wait(
            this->_strand.wrap(boost::bind(
                &Session::handleDeadlineAsyncWait,
                shared_from_this(),
                deadline
            ))
        );
    }
}

void Session::handleDeadlineAsyncWaitKillConnection(boost::asio::deadline_timer* deadline)
{
    this->disconnect();
}
4

2 に答える 2

0

タイムアウトasync_waitハンドラはasync_read()、ソケットをシャットダウンするだけでなく、未解決のものをキャンセルする必要があります。そうしないと、ソケットが開いたままになります。

void Session::handleDeadlineAsyncWait(boost::asio::deadline_timer* deadline)
{
    if (this->stopped())
        return;

    if (deadline->expires_at() <= boost::asio::deadline_timer::traits_type::now())
    {
        boost::system::error_code sdEc;
        this->getSocket().shutdown(boost::asio::ip::tcp::socket::shutdown_send, sdEc);
        this->getSocket().cancel(); // <-- add this
    }
    else
    {
        deadline->async_wait(
            this->_strand.wrap(boost::bind(
                &Session::handleDeadlineAsyncWait,
                shared_from_this(),
                deadline
            ))
        );
    }
}

さらに、Session::handleRead()ハンドラーでboost::asio::error::operation_abortedエラーを検出する必要があります。これは、読み取りがキャンセルされたことを意味するためです。

于 2013-02-14T19:53:47.720 に答える
0

問題が見つかりました。

エラーは関数にありますSession::quereWrite

セッション.cpp

void Session::queueWrite()
{
    if (this->stopped())
        return;

    if (!this->_writeQueue.empty())
    {
        boost::asio::async_write(
            this->getSocket(),
            boost::asio::buffer(
                this->_writeQueue.front().getData(),
                this->_writeQueue.front().getDataSize()
            ),
            this->_strand.wrap(boost::bind(
                &Session::handleWrite,
                this,
                boost::asio::placeholders::error
            ))
        );
    }
}

メソッド_non_empty_output_queueを使用して、同じことを行う別の方法を使用しpushPackます。

問題は、メソッドをasync_wait呼び出し、別のメソッドを自分自身に呼び出したことですが、期限切れのタイマーがプロセッサにオーバーヘッドを引き起こし、自己破壊を防いでいたことです。queueWriteasync_waitSession

@Sam Miller の多大な貢献に感謝します。

于 2013-02-16T21:17:27.230 に答える