12

この質問は、前の質問と非常によく似ています。pthread_once()の競合状態?

これは本質的に同じ問題です-std::promise呼び出し中の終了 の存続期間promise::set_value(つまり、関連する未来にフラグが立てられた後、pthread_once実行される前)

ですから、私の使用法にはこの問題があることを知っているので、このように使用することはできません。しかし、これは自明ではないと思います。(Scott Meyerの賢明な言葉で:インターフェースを正しく使いやすく、正しく使いにくいようにする

以下に例を示します。

  • dispatcherキューでスピンし、「ジョブ」(a)をポップして実行するスレッド( )がありますstd::function
  • synchronous_jobディスパッチャスレッドで「ジョブ」が実行されるまで呼び出しスレッドをブロックするユーティリティクラスがあります
  • std::promiseおよびは-のstd::futureメンバーです。が設定されると、ブロックされた呼び出しスレッドが続行され、スタックがポップオフされて破棄されます。synchronous_jobfuturesynchronous_job
  • 残念ながら、この時点では、内部dispatcherでコンテキストが切り替えられました。にフラグが立てられていますが、への呼び出しが実行されておらず、pthreadスタックが何らかの理由で破損しています。つまり、次回はデッドロックになります。 promise::set_valuefuturepthread_once

呼び出しはpromise::set_valueアトミックであると思います。フラグを立てた後にさらに作業を行う必要があるという事実は、futureこれらのクラスをこのように使用する場合、必然的にこの種の問題につながります。

だから私の質問は:この同期メカニズムを提供するクラスに関連付けられたライフタイムを維持しながら、std::promiseとを使用してこの種の同期を実現するにはどうすればよいですか?std::future

@Jonathan Wakely、おそらく内部でRAIIスタイルのクラスを使用してcondition_variable、フラグを立てた後にデストラクタにを設定できますfutureか?これはpromise、への呼び出しの途中でが破棄された場合でもset_value、条件変数を設定する追加の作業が正しく完了することを意味します。ただのアイデア、それを使用できるかどうかわからない...

以下の完全に機能する例と、その後のデッドロックされたアプリのスタックトレース:

#include <iostream>
#include <thread>
#include <future>
#include <queue>

struct dispatcher
{
    dispatcher()
    {
        _thread = std::move(std::thread(&dispatcher::loop, this));
    }
    void post(std::function<void()> job)
    {
        std::unique_lock<std::mutex> l(_mtx);
        _jobs.push(job);
        _cnd.notify_one();
    }
private:
    void loop()
    {
        for (;;)
        {
            std::function<void()> job;
            {
                std::unique_lock<std::mutex> l(_mtx);
                while (_jobs.empty())
                    _cnd.wait(l);
                job.swap(_jobs.front());
                _jobs.pop();
            }
            job();
        }
    }
    std::thread                       _thread;
    std::mutex                        _mtx;
    std::condition_variable           _cnd;
    std::queue<std::function<void()>> _jobs;
};
//-------------------------------------------------------------

struct synchronous_job
{
    synchronous_job(std::function<void()> job, dispatcher& d)
        : _job(job)
        , _d(d)
        , _f(_p.get_future())
    {
    }
    void run()
    {
        _d.post(std::bind(&synchronous_job::cb, this));
        _f.wait();
    }
private:
    void cb()
    {
        _job();
        _p.set_value();
    }
    std::function<void()> _job;
    dispatcher&           _d;
    std::promise<void>    _p;
    std::future<void>     _f;
};
//-------------------------------------------------------------

struct test
{
    test()
        : _count(0)
    {
    }
    void run()
    {
        synchronous_job job(std::bind(&test::cb, this), _d);
        job.run();
    }
private:
    void cb()
    {
        std::cout << ++_count << std::endl;
    }
    int _count;
    dispatcher _d;
};
//-------------------------------------------------------------

int main()
{
    test t;
    for (;;)
    {
        t.run();
    }
}

デッドロックされたアプリのスタックトレース:

スレッド1(メインスレッド)

#0  0x00007fa112ed750c in pthread_cond_wait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1  0x00007fa112a308ec in __gthread_cond_wait (__mutex=<optimized out>, __cond=<optimized out>) at /hostname/tmp/syddev/Build/gcc-4.6.2/gcc-build/x86_64-unknown-linux-gnu/libstdc++-v3/include/x86_64-unknown-linux-gnu/bits/gthr-default.h:846
#2  std::condition_variable::wait (this=<optimized out>, __lock=...) at ../../../../libstdc++-v3/src/condition_variable.cc:56
#3  0x00000000004291d9 in std::condition_variable::wait<std::__future_base::_State_base::wait()::{lambda()#1}>(std::unique_lock<std::mutex>&, std::__future_base::_State_base::wait()::{lambda()#1}) (this=0x78e050, __lock=..., __p=...) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/condition_variable:93
#4  0x00000000004281a8 in std::__future_base::_State_base::wait (this=0x78e018) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/future:331
#5  0x000000000042a2d6 in std::__basic_future<void>::wait (this=0x7fff0ae515c0) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/future:576
#6  0x0000000000428dd8 in synchronous_job::run (this=0x7fff0ae51580) at /home/lorimer/p4/Main/Source/Trading/Confucius/Test/Scratch/Test1/main.cpp:60
#7  0x0000000000428f97 in test::run (this=0x7fff0ae51660) at /home/lorimer/p4/Main/Source/Trading/Confucius/Test/Scratch/Test1/main.cpp:83
#8  0x0000000000427ad6 in main () at /home/lorimer/p4/Main/Source/Trading/Confucius/Test/Scratch/Test1/main.cpp:99

スレッド2(ディスパッチャースレッド)

#0  0x00007fa112ed8b5b in pthread_once () from /lib64/libpthread.so.0
#1  0x0000000000427946 in __gthread_once (__once=0x78e084, __func=0x4272d0 <__once_proxy@plt>) at /hostname/sdk/gcc470/suse11/x86_64/bin/../lib/gcc/x86_64-unknown-linux-gnu/4.7.0/../../../../include/c++/4.7.0/x86_64-unknown-linux-gnu/bits/gthr-default.h:718
#2  0x000000000042948b in std::call_once<void (std::__future_base::_State_base::*)(std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter> ()>&, bool&), std::__future_base::_State_base* const, std::reference_wrapper<std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter> ()> >, std::reference_wrapper<bool> >(std::once_flag&, void (std::__future_base::_State_base::*&&)(std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter> ()>&, bool&), std::__future_base::_State_base* const&&, std::reference_wrapper<std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter> ()> >&&, std::reference_wrapper<bool>&&) (__once=..., __f=
    @0x7fa111ff6be0: (void (std::__future_base::_State_base::*)(std::__future_base::_State_base * const, std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter>()> &, bool &)) 0x42848a <std::__future_base::_State_base::_M_do_set(std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter> ()>&, bool&)>) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/mutex:819
#3  0x000000000042827d in std::__future_base::_State_base::_M_set_result(std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter> ()>, bool) (this=0x78e018, __res=..., __ignore_failure=false) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/future:362
#4  0x00000000004288d5 in std::promise<void>::set_value (this=0x7fff0ae515a8) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/future:1206
#5  0x0000000000428e2a in synchronous_job::cb (this=0x7fff0ae51580) at /home/lorimer/p4/Main/Source/Trading/Confucius/Test/Scratch/Test1/main.cpp:66
#6  0x000000000042df53 in std::_Mem_fn<void (synchronous_job::*)()>::operator() (this=0x78c6e0, __object=0x7fff0ae51580) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/functional:554
#7  0x000000000042d77c in std::_Bind<std::_Mem_fn<void (synchronous_job::*)()> (synchronous_job*)>::__call<void, , 0ul>(std::tuple<>&&, std::_Index_tuple<0ul>) (this=0x78c6e0, __args=...) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/functional:1156
#8  0x000000000042cb28 in std::_Bind<std::_Mem_fn<void (synchronous_job::*)()> (synchronous_job*)>::operator()<, void>() (this=0x78c6e0) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/functional:1215
#9  0x000000000042b772 in std::_Function_handler<void (), std::_Bind<std::_Mem_fn<void (synchronous_job::*)()> (synchronous_job*)> >::_M_invoke(std::_Any_data const&) (__functor=...) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/functional:1926
#10 0x0000000000429f2c in std::function<void ()>::operator()() const (this=0x7fa111ff6da0) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/functional:2311
#11 0x0000000000428c3c in dispatcher::loop (this=0x7fff0ae51668) at /home/lorimer/p4/Main/Source/Trading/Confucius/Test/Scratch/Test1/main.cpp:39
4

4 に答える 4

16

std::promise他のオブジェクトとまったく同じです。一度にアクセスできるのは1つのスレッドからのみです。この場合、set_value()十分な同期なしに別々のスレッドからオブジェクトを呼び出して破棄しています。仕様のどこにも、準備ができた後にオブジェクトにset_value触れないということはありません。promisefuture

ただし、このfutureはワンショット同期に使用されるため、とにかくそれを行う必要はありません。promise/ futureペアをで作成し、run()promiseをスレッドに渡します。

struct synchronous_job
{
    synchronous_job(std::function<void()> job, dispatcher& d)
        : _job(job)
        , _d(d)
    {
    }
    void run(){
        std::promise<void> p;
        std::future<void> f=p.get_future();

        _d.post(
            [&]{
                cb(std::move(p));
            });

        f.wait();
    }
private:
    void cb(std::promise<void> p)
    {
        _job();
        p.set_value();
    }
    std::function<void()> _job;
    dispatcher&           _d;
};
于 2012-09-21T09:33:18.860 に答える
3

あなたの質問への直接の答えでは、正しい答えはstd::promiseスレッドに与えることです。そうすれば、スレッドが必要とする限り存在することが保証されます。

内部的には、std::futurestd::promiseは両方が指す共有状態を持ち、両側が破壊されるまで利用可能なままであることが保証されています。概念的には、これは、同じオブジェクトへのshared_ptrの個別のコピーを持つpromiseとfutureの両方に似ています。このオブジェクトには、状態、ブロック、およびその他の操作を渡すために必要な基本的なメカニズムが含まれています。

破壊の合図を試みることに関して、問題はこの条件変数がどこに存在するかということです。関連するすべてのフューチャーとプロミスが破棄されると、共有エリアは破棄されます。デッドロックが発生しているのは、その領域がまだ使用されている間にその領域が破棄されているためです(コンパイラーは、破棄されているときに別のスレッドがまだPromiseにアクセスしていることに気付いていないため)。共有状態に条件変数を追加しても、それらも破棄されるため、役に立ちません。

于 2012-09-21T01:30:02.200 に答える
1

私自身の質問に答えて、実行可能な解決策を提供します。std::promiseまたはを使用しませんstd::futureが、私が探している同期を実現します。

synchronous_jobを使用するように更新しstd::condition_variablestd::mutex代わりに:

編集:DaveSによって提案されたブールフラグを含むように更新されました

struct synchronous_job
{
    synchronous_job(std::function<void()> job, dispatcher& d)
        : _job(job)
        , _d(d)
        , _done(false)
    {
    }
    void run()
    {
        _d.post(std::bind(&synchronous_job::cb, this));
        std::unique_lock<std::mutex> l(_mtx);
        if (!_done)
            _cnd.wait(l);
    }
private:
    void cb()
    {
        _job();
        std::unique_lock<std::mutex> l(_mtx);
        _done = true;
        _cnd.notify_all();
    }
    std::function<void()>   _job;
    dispatcher&             _d;
    std::condition_variable _cnd;
    std::mutex              _mtx;
    bool                    _done;
};
于 2012-09-21T01:29:12.227 に答える
1

正規の答えは、std :: bindをこれにバインドするのではなく、std::weak_ptrにバインドすることです。コールバックを取得したら、それをlock()して、コールバックを呼び出す前にNULLをチェックします。

または、言い換えると、shared_ptrをオブジェクトに保持していないスコープから(外部から)メンバー関数を呼び出さないでください。

于 2014-06-22T02:47:50.540 に答える