8

boost::thread を使用して、C++ のスレッドに Actor 計算モデルを実装しようとしています。しかし、プログラムは実行中に奇妙な例外をスローします。例外は安定しておらず、プログラムが正しく動作する場合もあります。

そこに私のコード:

アクター.hpp

class Actor {

  public:
    typedef boost::function<int()> Job;

  private:
    std::queue<Job>             d_jobQueue;
    boost::mutex                d_jobQueueMutex;
    boost::condition_variable   d_hasJob;
    boost::atomic<bool>         d_keepWorkerRunning;
    boost::thread               d_worker;

    void workerThread();

  public:
    Actor();
    virtual ~Actor();

    void execJobAsync(const Job& job);

    int execJobSync(const Job& job);
};

アクター.cpp

namespace {

int executeJobSync(std::string          *error,
                   boost::promise<int> *promise,
                   const Actor::Job     *job)
{
    int rc = (*job)();

    promise->set_value(rc);
    return 0;
}

}

void Actor::workerThread()
{
    while (d_keepWorkerRunning) try {
        Job job;
        {
            boost::unique_lock<boost::mutex> g(d_jobQueueMutex);

            while (d_jobQueue.empty()) {
                d_hasJob.wait(g);
            }

            job = d_jobQueue.front();
            d_jobQueue.pop();
        }

        job();
    }
    catch (...) {
        // Log error
    }
}

void Actor::execJobAsync(const Job& job)
{
    boost::mutex::scoped_lock g(d_jobQueueMutex);
    d_jobQueue.push(job);
    d_hasJob.notify_one();
}

int Actor::execJobSync(const Job& job)
{
    std::string error;
    boost::promise<int> promise;
    boost::unique_future<int> future = promise.get_future();

    {
        boost::mutex::scoped_lock g(d_jobQueueMutex);
        d_jobQueue.push(boost::bind(executeJobSync, &error, &promise, &job));
        d_hasJob.notify_one();
    }

    int rc = future.get();

    if (rc) {
        ErrorUtil::setLastError(rc, error.c_str());
    }

    return rc;
}

Actor::Actor()
: d_keepWorkerRunning(true)
, d_worker(&Actor::workerThread, this)
{
}

Actor::~Actor()
{
    d_keepWorkerRunning = false;
    {
        boost::mutex::scoped_lock g(d_jobQueueMutex);
        d_hasJob.notify_one();
    }
    d_worker.join();
}

実際にスローされる例外は、boost::thread_interruptedint rc = future.get();行です。しかし、フォーム ブースト ドキュメントでは、この例外の理由を説明できません。ドキュメントは言う

スロー: - *this に関連付けられた結果が呼び出しの時点で準備ができておらず、現在のスレッドが中断された場合、boost::thread_interrupted。

しかし、ワーカー スレッドを中断状態にすることはできません。

gdb を使用して「catch throw」を設定すると、バック トレースが次のようになることがわかります

スロー thread_interrupted

boost::detail::interruption_checker::check_for_interruption

boost::detail::interruption_checker::interruption_checker

boost::condition_variable::wait

boost::detail::future_object_base::wait_internal

boost::detail::future_object_base::wait

boost::detail::future_object::get

boost::unique_future::get

ブースト ソースを調べましたが、interruption_checker がワーカー スレッドが中断されたと判断した理由がわかりません。

だから誰かC ++の第一人者、助けてください。正しいコードを取得するにはどうすればよいですか? 私は使用しています:

ブースト 1_53

Linux バージョン 2.6.18-194.32.1.el5 Red Hat 4.1.2-48

gcc 4.7

編集

修正しました!Evgeny Panasyuk と Lazin に感謝します。問題は TLS 管理にありました。boost::thread と boost::thread_specific_ptr は、その目的のために同じ TLS ストレージを使用しています。私の場合、両方が作成時にこのストレージを変更しようとしたときに問題が発生しました(残念ながら、なぜそれが起こるのか詳細はわかりませんでした)。そのため、TLS が破損しました。

コードの boost::thread_specific_ptr を __thread 指定の変数に置き換えました。

Offtop: デバッグ中に外部ライブラリでメモリ破損を発見し、修正しました =)

.

編集 2 正確な問題が発生しました...これは GCC のバグです =) _GLIBCXX_DEBUG コンパイル フラグは ABI を壊します。ブースト バグトラッカーに関する議論を見ることができます: https://svn.boost.org/trac/boost/ticket/7666

4

2 に答える 2

5

私はいくつかのバグを発見しました:


Actor::workerThread二重ロック解除機能をオンにしd_jobQueueMutexます。最初のロック解除は手動d_jobQueueMutex.unlock();で、2 番目は のデストラクタにありboost::unique_lock<boost::mutex>ます。

ロック解除のいずれかを防止する必要があります。たとえば、との間の関連付けを解除unique_lockmutexます。

g.release(); // <------------ PATCH
d_jobQueueMutex.unlock();

または、追加のコード ブロック + default-constructed を追加しますJob


workerThread次のループから抜け出せない可能性があります。

while (d_jobQueue.empty()) {
    d_hasJob.wait(g);
}

次のケースを想像してください:d_jobQueue空で、Actor::~Actor()呼び出され、フラグを設定し、ワーカー スレッドに通知します。

d_keepWorkerRunning = false;
d_hasJob.notify_one();

workerThreadwhile ループでウェイクアップし、キューが空であることを確認して、再びスリープします。

ワーカー スレッドを停止するために特別な最終ジョブを送信するのが一般的な方法です。

~Actor()
{
    execJobSync([this]()->int
    {
        d_keepWorkerRunning = false;
        return 0;
    });
    d_worker.join();
}

この場合、d_keepWorkerRunningはアトミックである必要はありません。


Coliru のライブデモ


編集

サンプルにイベント キュー コードを追加しました。

と の両方に同時キューがありますEventQueueImplActor、タイプが異なります。共通部分を抽出しconcurrent_queue<T>て、任意のタイプで機能する個別のエンティティにすることができます。さまざまなクラスに散在するバグをキャッチするよりも、キューを 1 か所でデバッグおよびテストする方がはるかに簡単です。

したがって、これを使用してみることができますconcurrent_queue<T>(Coliruで)

于 2013-10-28T07:31:14.750 に答える
2

これは単なる推測です。一部のコードは実際にboost::tread::interrupt()を呼び出すことができると思います。この関数にブレークポイントを設定して、これを担当するコードを確認できます。次の場所で中断をテストできますexecJobSync

int Actor::execJobSync(const Job& job)
{
    if (boost::this_thread::interruption_requested())
        std::cout << "Interruption requested!" << std::endl;
    std::string error;
    boost::promise<int> promise;
    boost::unique_future<int> future = promise.get_future();

この場合、最も疑わしいコードは、スレッド オブジェクトへの参照を含むコードです。

いずれにしても、boost::thread コードの割り込みを認識させることをお勧めします。一部のスコープの割り込みを無効にすることもできます。

そうでない場合は、スレッド割り込みフラグが TLS に格納されているため、スレッド ローカル ストレージで動作するコードを確認する必要があります。多分あなたのコードはそれを書き直します。このようなコード片の前後で中断を確認できます。

別の可能性は、メモリが壊れていることです。boost::thread::interrupt() を呼び出すコードがなく、TLS を使用していない場合。これは最も難しいケースです。動的アナライザー (valgrind または clang メモリ サニタイザー) を使用してみてください。

オフトピック: おそらく、いくつかの並行キューを使用する必要があります。std::queue は、メモリの競合が激しいため非常に遅くなり、キャッシュのパフォーマンスが低下します。優れた同時キューにより、コードは要素を並行してエンキューおよびデキューできます。

また、アクターは任意のコードを実行することを想定したものではありません。アクター キューは、関数ではなく単純なメッセージを受け取る必要があります。ジョブ キューを作成しています :) Akkalibcpaなどのアクター システムを調べる必要があります。

于 2013-10-29T11:58:55.323 に答える