7

シングルを使用してスレッドプールを備えたboost::asioサーバーを実装しますio_serviceHTTPサーバー3の例)。io_serviceunix ドメイン ソケットにバインドされ、このソケット上の接続からの要求を別のスレッドに渡します。リソースの消費を減らすために、スレッド プールを動的にしたいと考えています。

ここにコンセプトがあります。最初に単一のスレッドが作成されます。リクエストが到着し、サーバーがプールにアイドル状態のスレッドがないことを確認すると、新しいスレッドを作成してリクエストを渡します。サーバーは、最大数のスレッドまで作成できます。理想的には、しばらくアイドル状態のスレッドを一時停止する機能が必要です。

誰かが似たようなものを作ったのですか?それとも、誰かが関連する例を持っていますか?

私としては、それio_service.dispatchを達成するにはどうにかしてオーバーライドする必要があると思います。

4

1 に答える 1

8

最初のアプローチにはいくつかの課題があるかもしれません。

  • boost::asio::io_serviceから派生したり、再実装したりすることは意図されていません。仮想機能がないことに注意してください。
  • スレッド ライブラリがスレッドの状態を照会する機能を提供しない場合は、状態情報を個別に管理する必要があります。

別の解決策は、ジョブを に投稿してからio_service、 にどれくらいの時間留まっていたかを確認することio_serviceです。実行準備が整った時点から実際に実行された時点までの時間差が特定のしきい値を超えている場合は、キューにサービスを提供しているスレッドよりも多くのジョブがキューにあることを示しています。これの主な利点は、動的スレッド プールの成長ロジックが他のロジックから分離されることです。

を使用してこれを実現する例を次に示しdeadline_timerます。

  • 今から数秒後deadline_timerに期限切れになるように設定します。3
  • で非同期に待機しdeadline_timerます。3ハンドラーは、設定されてから数秒で実行可能になりますdeadline_timer
  • 非同期ハンドラーで、タイマーが期限切れになるように設定された時点を基準とした現在の時間を確認します。2秒よりも大きい場合は、io_serviceキューがバックアップされているため、スレッド プールにスレッドを追加します。

例:

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <iostream>

class thread_pool_checker
  : private boost::noncopyable
{
public:

  thread_pool_checker( boost::asio::io_service& io_service,
                       boost::thread_group& threads,
                       unsigned int max_threads,
                       long threshold_seconds,
                       long periodic_seconds )
    : io_service_( io_service ),
      timer_( io_service ),
      threads_( threads ),
      max_threads_( max_threads ),
      threshold_seconds_( threshold_seconds ),
      periodic_seconds_( periodic_seconds )
    {
      schedule_check();
    }

private:

  void schedule_check();
  void on_check( const boost::system::error_code& error );

private:

  boost::asio::io_service&    io_service_;
  boost::asio::deadline_timer timer_;
  boost::thread_group&        threads_;
  unsigned int                max_threads_;
  long                        threshold_seconds_;
  long                        periodic_seconds_;
};

void thread_pool_checker::schedule_check()
{
  // Thread pool is already at max size.
  if ( max_threads_ <= threads_.size() )
  {
    std::cout << "Thread pool has reached its max.  Example will shutdown."
              << std::endl;
    io_service_.stop();
    return;
  }

  // Schedule check to see if pool needs to increase.
  std::cout << "Will check if pool needs to increase in " 
            << periodic_seconds_ << " seconds." << std::endl;
  timer_.expires_from_now( boost::posix_time::seconds( periodic_seconds_ ) );
  timer_.async_wait( 
    boost::bind( &thread_pool_checker::on_check, this,
                 boost::asio::placeholders::error ) );
}

void thread_pool_checker::on_check( const boost::system::error_code& error )
{
  // On error, return early.
  if ( error ) return;

  // Check how long this job was waiting in the service queue.  This
  // returns the expiration time relative to now.  Thus, if it expired
  // 7 seconds ago, then the delta time is -7 seconds.
  boost::posix_time::time_duration delta = timer_.expires_from_now();
  long wait_in_seconds = -delta.seconds();

  // If the time delta is greater than the threshold, then the job
  // remained in the service queue for too long, so increase the
  // thread pool.
  std::cout << "Job job sat in queue for " 
            << wait_in_seconds << " seconds." << std::endl;
  if ( threshold_seconds_ < wait_in_seconds )
  {
    std::cout << "Increasing thread pool." << std::endl;
    threads_.create_thread(
      boost::bind( &boost::asio::io_service::run,
                   &io_service_ ) );
  }

  // Otherwise, schedule another pool check.
  schedule_check();
}

// Busy work functions.
void busy_work( boost::asio::io_service&,
                unsigned int );

void add_busy_work( boost::asio::io_service& io_service,
                    unsigned int count )
{
  io_service.post(
    boost::bind( busy_work,
                 boost::ref( io_service ),
                 count ) );
}

void busy_work( boost::asio::io_service& io_service,
                unsigned int count )
{
  boost::this_thread::sleep( boost::posix_time::seconds( 5 ) );

  count += 1;

  // When the count is 3, spawn additional busy work.
  if ( 3 == count )
  {
    add_busy_work( io_service, 0 );
  }
  add_busy_work( io_service, count );
}

int main()
{
  using boost::asio::ip::tcp;

  // Create io service.
  boost::asio::io_service io_service;

  // Add some busy work to the service.
  add_busy_work( io_service, 0 );

  // Create thread group and thread_pool_checker.
  boost::thread_group threads;
  thread_pool_checker checker( io_service, threads,
                               3,   // Max pool size.
                               2,   // Create thread if job waits for 2 sec.
                               3 ); // Check if pool needs to grow every 3 sec.

  // Start running the io service.
  io_service.run();

  threads.join_all();

  return 0;
}

出力:

プールを 3 秒で増やす必要があるかどうかを確認します。
ジョブ job は 7 秒間キューに留まりました。
スレッドプールを増やしています。
プールを 3 秒で増やす必要があるかどうかを確認します。
ジョブ job はキューに 0 秒間留まりました。
プールを 3 秒で増やす必要があるかどうかを確認します。
ジョブ job はキューに 4 秒間留まりました。
スレッドプールを増やしています。
プールを 3 秒で増やす必要があるかどうかを確認します。
ジョブ job はキューに 0 秒間留まりました。
プールを 3 秒で増やす必要があるかどうかを確認します。
ジョブ job はキューに 0 秒間留まりました。
プールを 3 秒で増やす必要があるかどうかを確認します。
ジョブ job はキューに 0 秒間留まりました。
プールを 3 秒で増やす必要があるかどうかを確認します。
ジョブ job はキューに 3 秒間留まりました。
スレッドプールを増やしています。
スレッド プールが最大に達しました。例はシャットダウンします。
于 2012-06-26T16:44:13.157 に答える