つまり、ユーザーが提供したタスクを次のような別の関数でラップする必要があります。
- ユーザー関数または呼び出し可能オブジェクトを呼び出します。
- ミューテックスをロックし、カウンターをデクリメントします。
このスレッド プールのすべての要件を理解していない可能性があります。したがって、明確にするために、要件であると私が信じるものの明示的なリストを次に示します。
- プールは、スレッドの存続期間を管理します。ユーザーは、プール内に存在するスレッドを削除できないようにする必要があります。
- ユーザーは、邪魔にならない方法でタスクをプールに割り当てることができます。
- タスクが割り当てられているときに、プール内のすべてのスレッドが現在他のタスクを実行している場合、そのタスクは破棄されます。
実装を提供する前に、強調したい重要な点がいくつかあります。
- スレッドが起動されると、完了、キャンセル、または終了まで実行されます。スレッドが実行している関数は再割り当てできません。1 つのスレッドがその存続期間中に複数の関数を実行できるようにするために、スレッドは、 などのキューから読み取る関数を使用して起動する必要が
io_service::run()
あり、呼び出し可能な型は from などのイベント キューにポストされますio_service::post()
。
io_service::run()
に保留中の作業がない場合io_service
、io_service
が停止した場合、またはスレッドが実行していたハンドラから例外がスローされた場合に戻ります。io_serivce::run()
未完成の作業がないときに戻るのを防ぐために、io_service::work
クラスを使用できます。
object()
タイプを要求する (つまり、タスクは から継承する必要がある) 代わりに、タスクのタイプ要件 (つまり、タスクのタイプは構文によって呼び出し可能でなければならない) を定義するprocess
と、ユーザーにより多くの柔軟性が提供されます。これにより、ユーザーはタスクを関数ポインターまたは nullary を提供する型として提供できoperator()
ます。
を使用した実装boost::asio
:
#include <boost/asio.hpp>
#include <boost/thread.hpp>
class thread_pool
{
private:
boost::asio::io_service io_service_;
boost::asio::io_service::work work_;
boost::thread_group threads_;
std::size_t available_;
boost::mutex mutex_;
public:
/// @brief Constructor.
thread_pool( std::size_t pool_size )
: work_( io_service_ ),
available_( pool_size )
{
for ( std::size_t i = 0; i < pool_size; ++i )
{
threads_.create_thread( boost::bind( &boost::asio::io_service::run,
&io_service_ ) );
}
}
/// @brief Destructor.
~thread_pool()
{
// Force all threads to return from io_service::run().
io_service_.stop();
// Suppress all exceptions.
try
{
threads_.join_all();
}
catch ( const std::exception& ) {}
}
/// @brief Adds a task to the thread pool if a thread is currently available.
template < typename Task >
void run_task( Task task )
{
boost::unique_lock< boost::mutex > lock( mutex_ );
// If no threads are available, then return.
if ( 0 == available_ ) return;
// Decrement count, indicating thread is no longer available.
--available_;
// Post a wrapped task into the queue.
io_service_.post( boost::bind( &thread_pool::wrap_task, this,
boost::function< void() >( task ) ) );
}
private:
/// @brief Wrap a task so that the available count can be increased once
/// the user provided task has completed.
void wrap_task( boost::function< void() > task )
{
// Run the user supplied task.
try
{
task();
}
// Suppress all exceptions.
catch ( const std::exception& ) {}
// Task has finished, so increment count of available threads.
boost::unique_lock< boost::mutex > lock( mutex_ );
++available_;
}
};
実装に関するいくつかのコメント:
- 例外処理は、ユーザーのタスクの周りで発生する必要があります。ユーザーの関数または呼び出し可能なオブジェクトが型ではない例外をスローした場合
boost::thread_interrupted
、std::terminate()
が呼び出されます。これは、スレッド関数の動作におけるBoost.Thread の例外の結果です。ハンドラーからスローされた例外の Boost.Asio の影響も読む価値があります。
- ユーザーが
task
via を指定するとboost::bind
、ネストされたboost::bind
ものはコンパイルに失敗します。次のいずれかのオプションが必要です。
task
によって作成されたサポートではありませんboost::bind
。
- 特定の関数オブジェクトでのみ適切に機能する
boost::bind
ため、その結果boost::protect
が使用できるかどうかに基づいて、ユーザーの型に基づいてコンパイル時の分岐を実行するメタプログラミング。boost::protect
- 別の型を使用して
task
オブジェクトを間接的に渡します。boost::function
正確な型を失うことを犠牲にして、読みやすさのため に使用することにしました。Boost.Asio のシリアライゼーションboost::tuple
の例に見られるように、わずかに読みにくくなりますが、正確な型を保持するために使用することもできます。
アプリケーション コードは、thread_pool
型を非侵入的に使用できるようになりました。
void work() {};
struct worker
{
void operator()() {};
};
void more_work( int ) {};
int main()
{
thread_pool pool( 2 );
pool.run_task( work ); // Function pointer.
pool.run_task( worker() ); // Callable object.
pool.run_task( boost::bind( more_work, 5 ) ); // Callable object.
}
はBoost.Asio なしで作成できます。メンテナーにとっては、いつ返されるか、オブジェクトが何であるかなどの動作thread_pool
について知る必要がなくなるため、少し簡単になる可能性があります。Boost.Asio
io_service::run()
io_service::work
#include <queue>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
class thread_pool
{
private:
std::queue< boost::function< void() > > tasks_;
boost::thread_group threads_;
std::size_t available_;
boost::mutex mutex_;
boost::condition_variable condition_;
bool running_;
public:
/// @brief Constructor.
thread_pool( std::size_t pool_size )
: available_( pool_size ),
running_( true )
{
for ( std::size_t i = 0; i < pool_size; ++i )
{
threads_.create_thread( boost::bind( &thread_pool::pool_main, this ) ) ;
}
}
/// @brief Destructor.
~thread_pool()
{
// Set running flag to false then notify all threads.
{
boost::unique_lock< boost::mutex > lock( mutex_ );
running_ = false;
condition_.notify_all();
}
try
{
threads_.join_all();
}
// Suppress all exceptions.
catch ( const std::exception& ) {}
}
/// @brief Add task to the thread pool if a thread is currently available.
template < typename Task >
void run_task( Task task )
{
boost::unique_lock< boost::mutex > lock( mutex_ );
// If no threads are available, then return.
if ( 0 == available_ ) return;
// Decrement count, indicating thread is no longer available.
--available_;
// Set task and signal condition variable so that a worker thread will
// wake up andl use the task.
tasks_.push( boost::function< void() >( task ) );
condition_.notify_one();
}
private:
/// @brief Entry point for pool threads.
void pool_main()
{
while( running_ )
{
// Wait on condition variable while the task is empty and the pool is
// still running.
boost::unique_lock< boost::mutex > lock( mutex_ );
while ( tasks_.empty() && running_ )
{
condition_.wait( lock );
}
// If pool is no longer running, break out.
if ( !running_ ) break;
// Copy task locally and remove from the queue. This is done within
// its own scope so that the task object is destructed immediately
// after running the task. This is useful in the event that the
// function contains shared_ptr arguments bound via bind.
{
boost::function< void() > task = tasks_.front();
tasks_.pop();
lock.unlock();
// Run the task.
try
{
task();
}
// Suppress all exceptions.
catch ( const std::exception& ) {}
}
// Task has finished, so increment count of available threads.
lock.lock();
++available_;
} // while running_
}
};