0

私は効率的なソケットサーバーを書いています。意図は、全体的なスループットが良好であることです。メインスレッドをリスナーとして使用します。クライアントでasync_acceptあり、ソケットをキューに追加します。ディスパッチャスレッドは、キューから読み取る準備ができているソケットを取得し、ワーカースレッドのキューの1つに追加します。ワーカースレッドのプールを保持しています。ワーカースレッドは実際の読み取り/書き込みを行います。

async_accept私はリスナーで使用します。どのソケットが読み取りの準備ができているかを確認するために、ディスパッチャでasync_read_someを使用します。このアイデアは機能しますが、問題があります。Myio_service.run()はlistenerで呼び出されるため、indispatcherのハンドラーasync_read_someは実際にはリスナースレッドで実行されます。

これが私のコードです:

using boost::asio::ip::tcp;
using namespace std;

std::queue<std::shared_ptr<tcp::socket>> q_sock;
boost::mutex m_log1;
boost::condition_variable m_cond1;
boost::mutex::scoped_lock m_lock1 = boost::mutex::scoped_lock(m_log1);
sem_t _sem_sock;

enum { max_length1 = 1024 };
char data_1[max_length1];

void handle_read1(std::shared_ptr<tcp::socket> sock, const boost::system::error_code& error,
  size_t bytes_transferred)
{
    printf("handle_read1 : error : %s : %d, thread id is: %ld, pid : %d \n", error.category().name(), error.value(), (long int)syscall(SYS_gettid), getpid());

    boost::asio::write(*(sock.get()), boost::asio::buffer(data_1, bytes_transferred));
}


void sock_dispatch() {
    int v_size = 0;
    std::shared_ptr<tcp::socket> curr_sock;

    printf("sock_dispatch started. The ID of this of this thread is: %ld, pid : %d \n", (long int)syscall(SYS_gettid), getpid());

    while(1) {

        while(1) {
            sem_wait(&_sem_sock);
            v_size = q_sock.size();
            sem_post(&_sem_sock);

            if(v_size <= 0)
                m_cond1.timed_wait(m_lock1,boost::posix_time::milliseconds(5000));
            else
                break;
        }

        sem_wait(&_sem_sock);
        curr_sock = q_sock.front();
        q_sock.pop();
        sem_post(&_sem_sock);

        curr_sock->async_read_some(boost::asio::buffer(data_1, max_length1),
        boost::bind(handle_read1, curr_sock,
          boost::asio::placeholders::error,
          boost::asio::placeholders::bytes_transferred));
    }

}

class session
{
    public:
      session(boost::asio::io_service& io_service)
        : sockptr(new tcp::socket(io_service)) {}

      void start()
      {
            printf("START NEW SESSION   The ID of this of this thread is: %ld, pid : %d \n", (long int)syscall(SYS_gettid), getpid());

            sem_wait(&_sem_sock);

            q_sock.push(sockptr);

            sem_post(&_sem_sock);

            m_cond1.notify_all();
      }

      std::shared_ptr<tcp::socket> sockptr;
};

class server
{
    public:
      server(boost::asio::io_service& io_service, short port)
        : io_service_(io_service),
          acceptor_(io_service, tcp::endpoint(tcp::v4(), port))
      {
        session* new_session = new session(io_service_);
        acceptor_.async_accept(*(new_session->sockptr.get()),
            boost::bind(&server::handle_accept, this, new_session,
              boost::asio::placeholders::error));

        printf("WAITING TO ACCEPT: The ID of this of this thread is: %ld, pid : %d \n", (long int)syscall(SYS_gettid), getpid());

      }

      void handle_accept(session* new_session,
          const boost::system::error_code& error)
      {
          new_session->start();
          new_session = new session(io_service_);
          acceptor_.async_accept(*(new_session->sockptr.get()),
              boost::bind(&server::handle_accept, this, new_session,
                boost::asio::placeholders::error));
      }

    private:
      boost::asio::io_service& io_service_;
      tcp::acceptor acceptor_;
};

int main(int argc, char* argv[])
{
    sem_init(&_sem_sock, 0, 1);

    boost::asio::io_service io_service;

    using namespace std;
    server s(io_service, atoi(argv[1]));

    boost::thread t(boost::bind(sock_dispatch));

    io_service.run();

    return 0;
}

このコードは、boost :: asioの例、http ://www.boost.org/doc/libs/1_39_0/doc/html/boost_asio/example/echo/async_tcp_echo_server.cppから変更されています。また、クライアントコードはhttp://www.boost.org/doc/libs/1_39_0/doc/html/boost_asio/example/echo/blocking_tcp_echo_client.cppです。

クライアントが接続すると、サーバーの出力は次のようになります。

WAITING TO ACCEPT: The ID of this of this thread is: 3843, pid : 3843 
sock_dispatch started. The ID of this of this thread is: 3844, pid : 3843 
START NEW SESSION   The ID of this of this thread is: 3843, pid : 3843 
handle_read1 : error : system : 0, thread id is: 3843, pid : 3843

この場合、ディスパッチャーのスレッドIDは3944ですが、handle_read1はスレッド3843で実行されます。理想的には、handle_read1はディスパッチャーで実行されるため、リスナーでの受け入れがブロックされません。

これを達成するために私が何をすべきか考えていますか?または、全体に対してより良いデザインがあります:)?

4

1 に答える 1

2

特定のスレッドで特定のハンドラーを呼び出す必要がある場合は、別io_serviceのオブジェクトを使用してください。たとえば、はで構築acceptorできio_service1、ソケットはで構築できますio_service2。その後、メインスレッドはを実行できますがio_service1.run()、スレッドプール内のスレッドはを実行できますio_service2.run()

そうは言っても、非同期機能と同期機能を組み合わせるのはかなり難しい場合があります。私が取り組んだほとんどの非同期プログラムでは、スレッドを特定の非同期チェーン専用にする必要はめったにありません。


全体として、概念設計は問題ないと思いますが、実装に関するいくつかの提案があります。

  • コンシューマーコードとプロデューサーコードは、q_sock高レベルと低レベルの構成が混在しています。条件変数の使用は少し非特異的であり、なぜ、、およびロックsem_tの代わりに使用されているのかという疑問が生じます。boost::mutexたとえば、次のコンシューマーコードとプロデューサーコードです。

    // Consumer
    while(1)
    {
      sem_wait(&_sem_sock);
      v_size = q_sock.size();
      sem_post(&_sem_sock);
    
      if (v_size <= 0)
        m_cond1.timed_wait(m_lock1, boost::posix_time::milliseconds(5000));
      else
        break;
    }
    sem_wait(&_sem_sock);
    curr_sock = q_sock.front();
    q_sock.pop();
    sem_post(&_sem_sock);
    
    // Producer    
    sem_wait(&_sem_sock);
    q_sock.push(sockptr);
    sem_post(&_sem_sock);
    m_cond1.notify_all();
    

    を使用せずに書き直すことができ、Boost.Threadのドキュメントsem_tに基づいてもう少し慣用的になります。condition_variable代替案を検討してください。

    // Consumer
    boost::unique_lock<boost::mutex> lock(m_log1);
    while (q_sock.empty())
    {
      m_cond1.wait(lock);
    }
    curr_sock = q_sock.front();
    q_sock.pop();
    lock.unlock();
    
    // Producer
    {
      boost::lock_guard<boost::mutex> lock(m_log1);
      q_sock.push(sockptr);
    }
    m_cond1.notify_all();
    
  • sessionどの機能が提供するかは不明です。

    • ソケットを割り当ててキューに入れる手段としてのみ機能しているようです。ソケットを直接割り当てて、呼び出し元にそれをキューに入れてもらうのはなぜですか?
    • session::sockptrスマートポインタを介して管理されますが、そうでsessionはありません。スマートポインタを介して管理されていない場合、再割り当てでへのハンドルが失われるためsession、でメモリリークが発生します。server::handle_acceptsession

    提供する機能を特定しsession、その周りのインターフェイスを設計します。

    • カプセル化を提供することを目的としている場合は、などの非メンバー関数をメンバー関数にhandle_read1する必要があります。
    • session独自の非同期チェーンがあり、それ自体をハンドラーに提供している場合は、の使用を検討してenable_shared_from_thisください。Boost.Asioチュートリアルは、いくつかのと同様に、使用例を提供します。
  • 現時点でasync_read_someは、どのソケットを読み取る準備ができているかを示していません。ReadHandlerが呼び出されるまでに、データが読み取られています。

    これがProactorとReactorの根本的な違いです。Reactorスタイルの操作が必要な場合は、を使用してboost::asio::null_buffersください。詳細については、このドキュメントを参照してください。ただし、それぞれのアプローチには結果があります。したがって、最良の決定を下せるように、これらの結果を理解することが重要です。

  • Boost.Asioが高レベルの構造を介してイベントの逆多重化を提供しているため、sock_dispatchスレッドは実用的ではないように見える場合があります。メンバー関数はsession::start、ソケットで非同期読み取りを開始できます。q_sockこの小さな変更により、サンプルコードのすべての同期構造が不要になります。

  • 同期書き込みを使用する必要がある理由を調べます。エコークライアントの場合、例に示すように、非同期チェーン自体のフローを制御してリソースの競合を取り除くことにより、非同期書き込みを使用できることがよくあります。これにより、各接続に独自のバッファを設定して、読み取りと書き込みの両方に使用できます。

  • 事前に最適化しないでください。非同期プログラミングは、制御の逆の流れの結果として、本質的にデバッグが困難です。スループットを事前に最適化しようとすると、複雑さの問題が悪化するだけです。プログラムが動作したら、スループットテストを実行します。結果が要件を満たしていない場合は、プロファイルを作成してボトルネックを特定します。私の経験では、スループットの高いほとんどのサーバーは、CPUバウンドになるずっと前にI/Oバウンドになります。
于 2013-01-27T01:55:25.400 に答える