0

メイン スレッドからのワーカー スレッドの終了に問題があります。これまでに試した各方法は、競合状態またはデッドロックにつながります。

ワーカー スレッドは、ThreadPool というクラス内の内部クラスに格納されます。ThreadPool は、unique_ptr を使用してこれらの WorkerThreads のベクトルを維持します。

私の ThreadPool のヘッダーは次のとおりです。

class ThreadPool
{
public:
typedef void (*pFunc)(const wpath&, const Args&, Global::mFile_t&, std::mutex&, std::mutex&);       // function to point to
private:

    class WorkerThread
    {
    private:
        ThreadPool* const _thisPool;        // reference enclosing class

        // pointers to arguments
        wpath _pPath;               // member argument that will be modifyable to running thread
        Args * _pArgs;
        Global::mFile_t * _pMap;

        // flags for thread management
        bool _terminate;                    // terminate thread
        bool _busy;                         // is thread busy?
        bool _isRunning;

        // thread management members

        std::mutex              _threadMtx;
        std::condition_variable _threadCond;
        std::thread             _thisThread;

        // exception ptr
        std::exception_ptr _ex;

        // private copy constructor
        WorkerThread(const WorkerThread&): _thisPool(nullptr) {}
    public:
        WorkerThread(ThreadPool&, Args&, Global::mFile_t&);
        ~WorkerThread();

        void setPath(const wpath);          // sets a new task
        void terminate();                   // calls terminate on thread
        bool busy() const;                  // returns whether thread is busy doing task
        bool isRunning() const;             // returns whether thread is still running
        void join();                        // thread join wrapper
        std::exception_ptr exception() const;

        // actual worker thread running tasks
        void thisWorkerThread();
    };

    // thread specific information
    DWORD _numProcs;                        // number of processors on system
    unsigned _numThreads;                   // number of viable threads
    std::vector<std::unique_ptr<WorkerThread>> _vThreads;   // stores thread pointers - workaround for no move constructor in WorkerThread
    pFunc _task;                            // the task threads will call

    // synchronization members
    unsigned _barrierLimit;                 // limit before barrier goes down
    std::mutex _barrierMtx;                 // mutex for barrier
    std::condition_variable _barrierCond;   // condition for barrier
    std::mutex _coutMtx;

public:
    // argument mutex
    std::mutex matchesMap_mtx;
    std::mutex coutMatch_mtx;

    ThreadPool(pFunc f);

    // wake a thread and pass it a new parameter to work on
    void callThread(const wpath&);

    // barrier synchronization
    void synchronizeStartingThreads();

    // starts and synchronizes all threads in a sleep state
    void startThreads(Args&, Global::mFile_t&);

    // terminate threads
    void terminateThreads();

private:
};

これまでのところ、私が抱えている本当の問題は、メインスレッドから terminateThreads() を呼び出すと、デッドロックまたは競合状態が発生することです。

_terminate フラグを true に設定すると、スレッドが起動して終了する前に、メインが既にスコープを終了し、すべてのミューテックスを破棄する可能性があります。実際、私はこのクラッシュを何度も経験しています (コンソール ウィンドウの表示: ビジー状態でミューテックスが破棄されました)。

スレッドを notify_all() した後に thread.join() を追加すると、終了したスレッドに参加するとプログラムが無期限に中断されるため、結合が発生する前にスレッドが終了し、無限のデッド ロックが発生する可能性があります。

デタッチした場合 - 上記と同じ問題ですが、プログラムがクラッシュします

代わりに while(WorkerThread.isRunning()) Sleep(0); を使用すると WorkerThread が最後の右中括弧に到達する前にメイン スレッドが終了する可能性があるため、プログラムがクラッシュする可能性があります。

すべてのワーカー スレッドが安全に終了するまで、メインの停止を停止するために他に何をすべきかわかりません。また、スレッドとメインで try-catch を使用しても、例外はキャッチされません。(私が試したすべてがプログラムのクラッシュにつながります)

ワーカー スレッドが終了するまでメイン スレッドを停止するにはどうすればよいですか?

主な機能の実装は次のとおりです。

個々のワーカー スレッドを終了する

void ThreadPool::WorkerThread::terminate()
{
    _terminate = true;
    _threadCond.notify_all();
    _thisThread.join();
}

実際のスレッドループ

void ThreadPool::WorkerThread::thisWorkerThread()
{
    _thisPool->synchronizeStartingThreads();

    try
    {
        while (!_terminate)
        {
            {
                _thisPool->_coutMtx.lock();
                std::cout << std::this_thread::get_id() << " Sleeping..." << std::endl;
                _thisPool->_coutMtx.unlock();
                _busy = false;
                std::unique_lock<std::mutex> lock(_threadMtx);
                _threadCond.wait(lock);
            }
            _thisPool->_coutMtx.lock();
            std::cout << std::this_thread::get_id() << " Awake..." << std::endl;
            _thisPool->_coutMtx.unlock();
            if(_terminate)
                break;

            _thisPool->_task(_pPath, *_pArgs, *_pMap, _thisPool->coutMatch_mtx, _thisPool->matchesMap_mtx);

            _thisPool->_coutMtx.lock();
            std::cout << std::this_thread::get_id() << " Finished Task..." << std::endl;
            _thisPool->_coutMtx.unlock();

        }
        _thisPool->_coutMtx.lock();
        std::cout << std::this_thread::get_id() << " Terminating" << std::endl;
        _thisPool->_coutMtx.unlock();   
    }
    catch (const std::exception&)
    {
        _ex = std::current_exception();
    }
    _isRunning = false;
}

すべてのワーカー スレッドを終了する

void ThreadPool::terminateThreads()
{
    for (std::vector<std::unique_ptr<WorkerThread>>::iterator it = _vThreads.begin(); it != _vThreads.end(); ++it)
    {
        it->get()->terminate();
        //it->get()->_thisThread.detach();

        // if thread threw an exception, rethrow it in main
        if (it->get()->exception() != nullptr)
            std::rethrow_exception(it->get()->exception());
    }
}

最後に、スレッド プールを呼び出している関数 (スキャン関数はメインで実行されています)

// scans a path recursively for all files of selected extension type, calls thread to parse file
unsigned int Functions::Scan(wpath path, const Args& args, ThreadPool& pool)
{
    wrecursive_directory_iterator d(path), e;
    unsigned int filesFound = 0;
    while ( d != e )
    {
        if (args.verbose())
            std::wcout << L"Grepping: " << d->path().string() << std::endl;

        for (Args::ext_T::const_iterator it = args.extension().cbegin(); it != args.extension().cend(); ++it)
        {
            if (extension(d->path()) == *it)
            {
                ++filesFound;
                pool.callThread(d->path());
            }
        }
        ++d;
    }

    std::cout << "Scan Function: Calling TerminateThreads() " << std::endl;
    pool.terminateThreads();
    std::cout << "Scan Function: Called TerminateThreads() " << std::endl;
    return filesFound;
}

もう一度質問を繰り返します: ワーカー スレッドが終了するまでメイン スレッドを停止するにはどうすればよいですか?

4

2 に答える 2

1

スレッドの終了と参加に関する問題はありません。

スレッドへの参加は、指定されたスレッドが終了するまで待機することなので、まさにあなたがやりたいことです。スレッドの実行がすでに終了している場合は、joinすぐに戻ります。

terminateしたがって、コードで既に行っているように、呼び出し中に各スレッドに参加したいだけです。

注:現在、終了したばかりのスレッドにアクティブなexception_ptr. これにより、結合されていないスレッドが発生する可能性があります。これらの例外を処理するときは、そのことに留意する必要があります

更新:コードを確認したところ、潜在的なバグが見つかりました:std::condition_variable::wait()スプリアス ウェイクアップが発生したときに戻る可能性があります。その場合、前回作業したパスで再度作業することになり、間違った結果につながります。新しい作業が追加された場合に設定される新しい作業のフラグが必要であり、その_threadCond.wait(lock)行はフラグとをチェックするループ内にある必要があり_terminateます。ただし、それで問題が解決するかどうかはわかりません。

于 2013-06-17T06:24:21.947 に答える
0

問題は次の 2 つでした。

synchronizeStartingThreads() は、1 つまたは 2 つのスレッドをブロックし、OK が先に進むのを待つことがあります (while (some_condition) barrierCond.wait(lock の問題)。条件が true に評価されないことがあります。while ループを削除すると、これが修正されました。ブロッキングの問題。

2 つ目の問題は、ワーカー スレッドが _threadMtx に入る可能性であり、notify_all が _threadCond.wait() に入る直前に呼び出されました。

すなわち。

{
    // terminate() is called
    std::unique_lock<std::mutex> lock(_threadMtx);
    // _threadCond.notify_all() is called here
    _busy = false;
    _threadCond.wait(lock);
    // thread is blocked forever
}

驚くべきことに、terminate() でこのミューテックスをロックしても、これは発生しませんでした。

これは、_threadCond.wait() に 30 ミリ秒のタイムアウトを追加することで解決されました。

また、同じタスクが再度処理されていないことを確認するために、タスクの開始前にチェックが追加されました。

新しいコードは次のようになります。

このWorkerThread

_threadCond.wait_for(lock, std::chrono::milliseconds(30));  // hold the lock a max of 30ms

// after the lock, and the termination check

if(_busy)
        {
            Global::mFile_t rMap = _thisPool->_task(_pPath, *_pArgs, _thisPool->coutMatch_mtx);
            _workerMap.element.insert(rMap.element.begin(), rMap.element.end());
        }
于 2013-06-17T10:18:01.157 に答える