11

私には、タイムステップごとに1回ずつ、多くの小さな独立したタスクを実行する「メイン」機能があります。ただし、各タイムステップの後、すべてのタスクが完了するのを待ってから先に進む必要があります。

プログラムをマルチスレッドにしたい。ブーストオフシュートスレッドプールを使用した実装を試し、スレッドのベクトル(への共有ポインター)を使用してみました。また、asioスレッドプールのアイデアを試しました(io_serviceを使用し、作業を確立してから、実行をスレッドとio_serviceへの投稿ハンドラー)。

これらはすべて、私の「多くの小さなタスク」のスレッドの作成と破棄に多くのオーバーヘッドがあるようです。できればasioツールを使用して、1つのio_service、1つのthread_groupをインスタンス化し、ハンドラーをio_serviceに投稿して待機する方法が必要です。さらにタスクを投稿する前に、単一のタイムステップの作業を終了するため。これを行う良い方法はありますか?これが私が今働いているものの(取り除いた)コードです:

boost::asio::io_service io_service;
for(int theTime = 0; theTime != totalTime; ++theTime)
{
    io_service.reset();
    boost::thread_group threads;
    // scoping to destroy the work object after work is finished being assigned
    {
        boost::asio::io_service::work work(io_service);
        for (int i = 0; i < maxNumThreads; ++i)
        {
            threads.create_thread(boost::bind(&boost::asio::io_service::run,
                &io_service));
        }

        for(int i = 0; i < numSmallTasks; ++i)
        {
            io_service.post(boost::bind(&process_data, i, theTime));
        }
    }
    threads.join_all(); 
}

これが私がむしろ持っていたものです(しかし、実装する方法がわかりません):

boost::asio::io_service io_service;
boost::thread_group threads;
boost::asio::io_service::work work(io_service);
for (int i = 0; i < maxNumThreads; ++i)
{
    threads.create_thread(boost::bind(&boost::asio::io_service::run,
         &io_service));
}

for(int theTime = 0; theTime != totalTime; ++theTime)
{
    for(int i = 0; i < numSmallTasks; ++i)
    {
        io_service.post(boost::bind(&process_data, i, theTime));
    }
    // wait here until all of these tasks are finished before looping 
    // **** how do I do this? *****
}
// destroy work later and join all threads later...
4

3 に答える 3

11

データ処理に先物を使用し、を使用して先物と同期することができますboost::wait_for_all()。これにより、スレッドではなく、実行された作業の一部に関して操作できるようになります。

int process_data() {...}

// Pending futures
std::vector<boost::unique_future<int>> pending_data;

for(int i = 0; i < numSmallTasks; ++i)
{
   // Create task and corresponding future
   // Using shared ptr and binding operator() trick because
   // packaged_task is non-copyable, but asio::io_service::post requires argument to be copyable

   // Boost 1.51 syntax
   // For Boost 1.53+ or C++11 std::packaged_task shall be boost::packaged_task<int()>
   typedef boost::packaged_task<int> task_t;

   boost::shared_ptr<task_t> task = boost::make_shared<task_t>(
      boost::bind(&process_data, i, theTime));

   boost::unique_future<int> fut = task->get_future();

   pending_data.push_back(std::move(fut));
   io_service.post(boost::bind(&task_t::operator(), task));    
}

// After loop - wait until all futures are evaluated
boost::wait_for_all(pending_data.begin(), pending_data.end()); 
于 2012-10-30T21:14:27.183 に答える
0

次のようにboost::barrierを使用できる可能性があります。

void thread_proc( boost::barrier& b ) {
    while( true ) {
        if( !ioservice.run_one() ) break; // io_service stopped
        b.wait();
    }
}
于 2012-10-30T19:46:36.117 に答える
0

Rostの方法は基本的に機能しますが、boost::make_sharedはそのままではコンパイルできません。以下は、動作中のバージョン(vs2012)です。

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/make_shared.hpp>
#include <boost/function_types/result_type.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/function.hpp>
#include <boost/thread.hpp>

std::vector<boost::unique_future<void>> pending_data;
typedef boost::packaged_task<void> task_t;

boost::shared_ptr< boost::packaged_task<void> > pt(new boost::packaged_task<void> ([&,i](){...}));
boost::unique_future<void> result = pt->get_future();
pending_data.push_back(boost::move(result));
io_service.post(boost::bind(&task_t::operator(), pt));

boost::wait_for_all(pending_data.begin(), pending_data.end()); 
pending_data.clear();

packaged_task typedefで引数を使用すると、コンパイルされません。asioおよびfutureメソッドによるこのスレッドプールは、新しいスレッドメソッドを作成する各ループと比較して8%の時間を節約しました。

于 2014-10-10T21:41:59.487 に答える