メイン スレッドからのワーカー スレッドの終了に問題があります。これまでに試した各方法は、競合状態またはデッドロックにつながります。
ワーカー スレッドは、ThreadPool というクラス内の内部クラスに格納されます。ThreadPool は、unique_ptr を使用してこれらの WorkerThreads のベクトルを維持します。
私の ThreadPool のヘッダーは次のとおりです。
class ThreadPool
{
public:
typedef void (*pFunc)(const wpath&, const Args&, Global::mFile_t&, std::mutex&, std::mutex&); // function to point to
private:
class WorkerThread
{
private:
ThreadPool* const _thisPool; // reference enclosing class
// pointers to arguments
wpath _pPath; // member argument that will be modifyable to running thread
Args * _pArgs;
Global::mFile_t * _pMap;
// flags for thread management
bool _terminate; // terminate thread
bool _busy; // is thread busy?
bool _isRunning;
// thread management members
std::mutex _threadMtx;
std::condition_variable _threadCond;
std::thread _thisThread;
// exception ptr
std::exception_ptr _ex;
// private copy constructor
WorkerThread(const WorkerThread&): _thisPool(nullptr) {}
public:
WorkerThread(ThreadPool&, Args&, Global::mFile_t&);
~WorkerThread();
void setPath(const wpath); // sets a new task
void terminate(); // calls terminate on thread
bool busy() const; // returns whether thread is busy doing task
bool isRunning() const; // returns whether thread is still running
void join(); // thread join wrapper
std::exception_ptr exception() const;
// actual worker thread running tasks
void thisWorkerThread();
};
// thread specific information
DWORD _numProcs; // number of processors on system
unsigned _numThreads; // number of viable threads
std::vector<std::unique_ptr<WorkerThread>> _vThreads; // stores thread pointers - workaround for no move constructor in WorkerThread
pFunc _task; // the task threads will call
// synchronization members
unsigned _barrierLimit; // limit before barrier goes down
std::mutex _barrierMtx; // mutex for barrier
std::condition_variable _barrierCond; // condition for barrier
std::mutex _coutMtx;
public:
// argument mutex
std::mutex matchesMap_mtx;
std::mutex coutMatch_mtx;
ThreadPool(pFunc f);
// wake a thread and pass it a new parameter to work on
void callThread(const wpath&);
// barrier synchronization
void synchronizeStartingThreads();
// starts and synchronizes all threads in a sleep state
void startThreads(Args&, Global::mFile_t&);
// terminate threads
void terminateThreads();
private:
};
これまでのところ、私が抱えている本当の問題は、メインスレッドから terminateThreads() を呼び出すと、デッドロックまたは競合状態が発生することです。
_terminate フラグを true に設定すると、スレッドが起動して終了する前に、メインが既にスコープを終了し、すべてのミューテックスを破棄する可能性があります。実際、私はこのクラッシュを何度も経験しています (コンソール ウィンドウの表示: ビジー状態でミューテックスが破棄されました)。
スレッドを notify_all() した後に thread.join() を追加すると、終了したスレッドに参加するとプログラムが無期限に中断されるため、結合が発生する前にスレッドが終了し、無限のデッド ロックが発生する可能性があります。
デタッチした場合 - 上記と同じ問題ですが、プログラムがクラッシュします
代わりに while(WorkerThread.isRunning()) Sleep(0); を使用すると WorkerThread が最後の右中括弧に到達する前にメイン スレッドが終了する可能性があるため、プログラムがクラッシュする可能性があります。
すべてのワーカー スレッドが安全に終了するまで、メインの停止を停止するために他に何をすべきかわかりません。また、スレッドとメインで try-catch を使用しても、例外はキャッチされません。(私が試したすべてがプログラムのクラッシュにつながります)
ワーカー スレッドが終了するまでメイン スレッドを停止するにはどうすればよいですか?
主な機能の実装は次のとおりです。
個々のワーカー スレッドを終了する
void ThreadPool::WorkerThread::terminate()
{
_terminate = true;
_threadCond.notify_all();
_thisThread.join();
}
実際のスレッドループ
void ThreadPool::WorkerThread::thisWorkerThread()
{
_thisPool->synchronizeStartingThreads();
try
{
while (!_terminate)
{
{
_thisPool->_coutMtx.lock();
std::cout << std::this_thread::get_id() << " Sleeping..." << std::endl;
_thisPool->_coutMtx.unlock();
_busy = false;
std::unique_lock<std::mutex> lock(_threadMtx);
_threadCond.wait(lock);
}
_thisPool->_coutMtx.lock();
std::cout << std::this_thread::get_id() << " Awake..." << std::endl;
_thisPool->_coutMtx.unlock();
if(_terminate)
break;
_thisPool->_task(_pPath, *_pArgs, *_pMap, _thisPool->coutMatch_mtx, _thisPool->matchesMap_mtx);
_thisPool->_coutMtx.lock();
std::cout << std::this_thread::get_id() << " Finished Task..." << std::endl;
_thisPool->_coutMtx.unlock();
}
_thisPool->_coutMtx.lock();
std::cout << std::this_thread::get_id() << " Terminating" << std::endl;
_thisPool->_coutMtx.unlock();
}
catch (const std::exception&)
{
_ex = std::current_exception();
}
_isRunning = false;
}
すべてのワーカー スレッドを終了する
void ThreadPool::terminateThreads()
{
for (std::vector<std::unique_ptr<WorkerThread>>::iterator it = _vThreads.begin(); it != _vThreads.end(); ++it)
{
it->get()->terminate();
//it->get()->_thisThread.detach();
// if thread threw an exception, rethrow it in main
if (it->get()->exception() != nullptr)
std::rethrow_exception(it->get()->exception());
}
}
最後に、スレッド プールを呼び出している関数 (スキャン関数はメインで実行されています)
// scans a path recursively for all files of selected extension type, calls thread to parse file
unsigned int Functions::Scan(wpath path, const Args& args, ThreadPool& pool)
{
wrecursive_directory_iterator d(path), e;
unsigned int filesFound = 0;
while ( d != e )
{
if (args.verbose())
std::wcout << L"Grepping: " << d->path().string() << std::endl;
for (Args::ext_T::const_iterator it = args.extension().cbegin(); it != args.extension().cend(); ++it)
{
if (extension(d->path()) == *it)
{
++filesFound;
pool.callThread(d->path());
}
}
++d;
}
std::cout << "Scan Function: Calling TerminateThreads() " << std::endl;
pool.terminateThreads();
std::cout << "Scan Function: Called TerminateThreads() " << std::endl;
return filesFound;
}
もう一度質問を繰り返します: ワーカー スレッドが終了するまでメイン スレッドを停止するにはどうすればよいですか?