0

私はboost::asioの初心者なので、助けてください。

シングルスレッドの TCP サーバーを作成する必要があります。サーバーはクライアント接続を受け入れ、クライアントソケットから入力データを継続的に読み取る必要があります。サーバーは定期的にデータをクライアントに送信する必要があります。だから私はある種の問題を抱えています-すべての例は、常にループがある場合を説明しています

  1. async_receive()
  2. on_receive() -> async_write()
  3. on_write() -> goto 1 :)

したがって、私の決定は、ソケットに送信されるデータをチェックするためにタイマーを使用することでした。

私はテストサーバーを書きましたが、非常に奇妙な動作をしています-クライアントが接続し、何かを行い、いくつかの時間 delta で次々と切断されても問題ありません。しかし、すべてのクライアントが同時に切断された場合、タイマー ハンドラーが既に破壊されたオブジェクトのメンバー クラスを使用しようとする状況があります (クリティカル セクションをロックしています)。

理由は説明できません!助けてください !

[このビデオはどのように再現されているかを示しています] ( http://www.youtube.com/watch?v=NMWkD7rqf7Y&feature=youtu.be "1080p" )

ありがとうございました !

#include <boost/none.hpp>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>

#include <iostream>




using namespace boost::asio;
using namespace boost::posix_time;



class CIncommingConnection ;
typedef boost::shared_ptr<CIncommingConnection> CIncommingConnectionPtr;


struct IIncomingServer
{
    virtual  void OnData(CIncommingConnectionPtr pConn, const char *pData, size_t bytes) = 0;
    virtual  void OnConnected(CIncommingConnectionPtr pConn) = 0;
    virtual  void OnDisconnected(const boost::system::error_code& err, CIncommingConnectionPtr pConn) = 0;
};



class CAutoLock
{
public:
    CAutoLock(CRITICAL_SECTION &cs)  :
      m_cs(cs)
    {
        ::EnterCriticalSection(&m_cs);
    }

   ~CAutoLock()
   {
       ::LeaveCriticalSection(&m_cs);
   }

private:
    CRITICAL_SECTION &m_cs;
};

class CIncommingConnection :  public boost::enable_shared_from_this<CIncommingConnection>                      
                             ,boost::noncopyable 
{
public:

    CIncommingConnection(const std::string sPeerName, boost::asio::io_service  &service, IIncomingServer *pServer) :
     m_service(service)
    ,sock_(service)
    ,m_sPeerName(sPeerName) 
    ,m_pServer(pServer)
    ,m_timer(service)
    {
        ::InitializeCriticalSection(&m_cs);

        std::cout << "CIncommingConnection()"  << std::endl ;
    }


    ~CIncommingConnection()
    {
        std::cout << "CIncommingConnection()~"  << std::endl ;
        ::DeleteCriticalSection(&m_cs);
    }


    ip::tcp::socket & sock()
    {
        return sock_;
    }



    void start()
    {
        m_pServer->OnConnected(shared_from_this());
        do_read();
        wait_for_outgoingdata();
    }


private:

    void stop()
    {      
        sock_.close();
        m_timer.cancel();
    }



    void do_read()
    {
        sock_.async_receive(buffer(read_buffer_), boost::bind(&CIncommingConnection::handler_read, this, _1, _2) );
    }



    void do_error(const boost::system::error_code& error)
    {
        CIncommingConnectionPtr pConn = shared_from_this();

        stop() ;

        m_pServer->OnDisconnected(error, pConn);
    }



    void handler_read(const boost::system::error_code& error, std::size_t bytes)
    {
        if (error)
        {
            do_error(error);
            return ;
        }

        CIncommingConnectionPtr pConn = shared_from_this() ;

        m_pServer->OnData(pConn, read_buffer_, bytes);

        do_read();
    }



    void wait_for_outgoingdata()
    {
        m_timer.expires_from_now( boost::posix_time::millisec( 100 ) );
        m_timer.async_wait( boost::bind( &CIncommingConnection::on_output_queue_timer, this, _1 ) );
    }



    void on_output_queue_timer(const boost::system::error_code& error)
    {
        if (error == boost::asio::error::operation_aborted)
        {
            return ;
        }

        CAutoLock oLock(m_cs);

        if (!m_sOutBuf.empty())
            sock_.async_send(buffer(m_sOutBuf), boost::bind(&CIncommingConnection::handler_write, this, _1, _2) );
        else
            wait_for_outgoingdata();
    }


    void handler_write(const boost::system::error_code& error, std::size_t bytes)
    {    
        if (error)
            return ;


        if (bytes)
        {
            m_sOutBuf = m_sOutBuf.substr(bytes, m_sOutBuf.length()-bytes);
        }

        wait_for_outgoingdata();
    }



private:
    ip::tcp::socket sock_;

    enum { max_msg = 1024 };
    char read_buffer_[max_msg];
    char write_buffer_[max_msg];


    boost::asio::io_service        &m_service ;   
    std::string                     m_sPeerName ;
    std::string                     m_sOutBuf;
    CRITICAL_SECTION                m_cs ;
    IIncomingServer                *m_pServer;
    boost::asio::deadline_timer     m_timer;
};






class CIncomingServer :   public boost::enable_shared_from_this<CIncomingServer>                      
                         , public IIncomingServer
                         , boost::noncopyable 
{

public:

    CIncomingServer(boost::asio::io_service  &service,
        unsigned int port,
        bool bAllowManyConnections,
        const std::string sPeerName) :

      m_acceptor (service, ip::tcp::endpoint(ip::tcp::v4(), port), false)
     ,m_sPeerName(sPeerName)
     ,m_port(port) 
     ,m_service(service)
     ,m_timer(service)
     ,m_bAllowManyConnections(bAllowManyConnections)
    {
    }



    ~CIncomingServer()
    {
    }



    void run()
    {
        CIncommingConnectionPtr pConn (new CIncommingConnection(m_sPeerName, m_service, this));
        m_clients.push_back( pConn );


        m_acceptor.async_accept(pConn->sock(), boost::bind(&CIncomingServer::handle_accept, this, _1));

        m_timer.expires_from_now( boost::posix_time::millisec( 500 ) );
        m_timer.async_wait( boost::bind( &CIncomingServer::on_timer, this ) );
    }




private:

    void handle_accept(const boost::system::error_code & err)
    {
        m_clients.back()->start();

        CIncommingConnectionPtr pConnNew (new CIncommingConnection(m_sPeerName, m_service, this));
        m_clients.push_back( pConnNew );

        m_acceptor.async_accept(pConnNew->sock(), boost::bind(&CIncomingServer::handle_accept, this,  _1));
    }


    //IIncomingServer
    virtual  void OnData(CIncommingConnectionPtr pConn, const char *pData, size_t bytes)
    {
        std::cout << "Data received" << std::endl ;
    }


    virtual  void OnConnected(CIncommingConnectionPtr pConn)
    {
        std::cout << "Client connected" << std::endl ;
    }


    virtual  void OnDisconnected(const boost::system::error_code& err, CIncommingConnectionPtr pConn)
    {
        std::cout << "Client disconnected" << std::endl ;

        auto it = std::find(m_clients.begin(), m_clients.end(), pConn) ;
        if (it != m_clients.end())
        {
            m_clients.erase(it);
        }

    }



    void on_timer()
    {
        //if (NeedTerminate())
        //{
        //    m_service.stop();
        //    return ;
        //}

        m_timer.expires_from_now( boost::posix_time::millisec( 500 ) );
        m_timer.async_wait( boost::bind( &CIncomingServer::on_timer, this ) );
    }



private:
    ip::tcp::acceptor  m_acceptor ;

    std::vector<CIncommingConnectionPtr> m_clients;
    std::string m_sPeerName ;
    unsigned int m_port ;
    boost::asio::io_service       &m_service ;
    boost::asio::deadline_timer    m_timer;
    bool                           m_bAllowManyConnections; 
};


int _tmain(int argc, _TCHAR* argv[])
{

    boost::asio::io_service  service ;


    boost::shared_ptr<CIncomingServer> pServer;

    try
    {
        pServer.reset( new CIncomingServer(service, 8000,  false, "BS Server"));        
        pServer->run();
    }
    catch (const boost::system::system_error &err)
    {
        std::cout << "Error : " << err.what() << std::endl ;
        return 0 ;
    }

    service.run();

    return 0 ;


}
4

1 に答える 1

2

shared_from_this()簡単に言うと、完了ハンドラーをプレーンthis(いわゆるshared_from_thisイディオム)ではなく、から返された shared_ptr にバインドする必要があります。このようにして、接続オブジェクトの有効期間を正しく自動的に管理できます。

技術的には、次のことが起こります: do_error2 つのアクションが発生します:

  1. タイマーのキャンセル (非同期操作) の削除
  2. CIncommingConnectionPtrコンテナーから (同期操作)。

shared_ptrポイント (2) で、接続を保持しているが他にないため、接続が破棄されます。タイマー完了ハンドラーが来る... クラッシュ!

于 2013-10-02T12:40:14.900 に答える