1

次のように、プロデューサー/コンシューマー パターン コードを作成するために packaged_task を試みています。 タスクをキューtest_thread9_producer1test_thread9_producer2プッシュし、キューtest_thread9_consumer1からタスクを取得して実行します。

ただし、実行test_thread9すると、タスクは正しく実行されますが、デバッグ エラーで終了します: 中止が呼び出されました。なぜだかわかりません。をもっと理解するのを手伝ってくれる人はいpackaged_taskますか?

2 番目の問題: コンシューマーがループで実行されているため、2 つのプロデューサーがすべてのタスクをキューにプッシュし、キュー内のすべてのタスクを実行し終わったときに終了while(1)させる適切な方法が思いつきません 。誰か私に提案をしてもらえますか?test_thread9_consumer1test_thread9_consumer1

void test_thread9()
{
    std::thread t1(test_thread9_producer1);
    std::thread t2(test_thread9_producer2);
    std::thread t3(test_thread9_consumer1);

    t1.join();
    t2.join();
    t3.join();
} 

std::deque<std::packaged_task<int()>>task_q;
std::mutex lock9;

int factial_calc2(int in)
{
    int ret = 1;
    for (int i = in; i > 1; i--)
    {
        ret = ret*i;
    }
    std::lock_guard<std::mutex> locker(lock9);
    std::cout << "input is " << in << "result is " << ret << std::endl;
    return ret;
}

void test_thread9_producer1()
{
    for (int i = 0; i < 10; i = i + 2)
    {
        std::lock_guard<std::mutex> locker(lock9);
        std::packaged_task<int()> t1(std::bind(factial_calc2, i));
        task_q.push_back(std::move(t1));
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
}

void test_thread9_producer2()
{
    for (int i = 1; i < 10; i = i + 2)
    {
        std::lock_guard<std::mutex> locker(lock9);
        std::packaged_task<int()> t1(std::bind(factial_calc2, i));
        task_q.push_back(std::move(t1));
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
}


void test_thread9_consumer1()
{
    std::packaged_task<int()>t;
    while (1)
    {
        {
            std::lock_guard<std::mutex> locker(lock9);
            if (!task_q.empty())
            {
                t = std::move(task_q.front());
                task_q.pop_front();
            }
        }
        t();
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
}
4

2 に答える 2

0

Why does it crash ?

If your consumer thread finds an empty queue, it will nevertheless try to execute the packaged task despite it was not moved. This is UB and hence the runtime error !

You can improve this by checking if the packaged_task is valid:

while (1)
{
    std::packaged_task<int()>t;  // to make sure that valid() checks this iteration
    {
       ...
    }
    if (t.valid())
        t();  // execute only if it's a valid task
    ...
}

How to avoid endless looping ?

You have somehow to keep track of what's running. A simple technique is to used an atomic variable to manage a shared state information (which can be accessed concurrently without locking).

For example you could count the nimber of finished producers

std::atomic<int>finished{0};  // count the producers that are finished
...

void test_thread9_producerN() { cout <<"start producer"<

Then you can adapt your consumer to take inte account this information:

void test_thread9_consumer1()
{
    bool nothing_to_do{false}; 
    while (!nothing_to_do && finished<2)
    {
    ...   
        nothing_to_do=task_q.empty();  // in the lock protected section 
        if (!nothing_to_do)     
    ...
    }
}

Online demo

于 2016-07-17T18:07:55.380 に答える