6

消費者/生産者の問題で、マルチスレッド コードを boost::asio とスレッド プールに置き換えることができるようにするという考えです。現在、各コンシューマー スレッドはboost::condition_variable、プロデューサーがキューに何かを追加すると、notify_one/notify_allを呼び出してすべてのコンシューマーに通知します。では、(潜在的に) 1,000 人以上の消費者がいる場合はどうなるでしょうか? スレッドはスケーリングしません!

を使用することにboost::asioしましたが、条件変数がないことに気付きました。そしてasync_condition_variable生まれました:

class async_condition_variable
{
private:
    boost::asio::io_service& service_;
    typedef boost::function<void ()> async_handler;
    std::queue<async_handler> waiters_;

public:
    async_condition_variable(boost::asio::io_service& service) : service_(service)
    {
    }

    void async_wait(async_handler handler)
    {
        waiters_.push(handler);
    }

    void notify_one()
    {
        service_.post(waiters_.front());
        waiters_.pop();
    }

    void notify_all()
    {
        while (!waiters_.empty()) {
            notify_one();
        }
    }
};

基本的に、各消費者は を呼び出しますasync_condition_variable::wait(...)。その後、プロデューサーは最終的にasync_condition_variable::notify_one()orを呼び出しますasync_condition_variable::notify_all()。各コンシューマーのハンドルが呼び出され、条件に基づいて動作するか、async_condition_variable::wait(...)再度呼び出します。これは実現可能ですか、それとも私はここで夢中ですか? これがスレッドプールで実行されるという事実を考えると、どのような種類のロック (ミューテックス) を実行する必要がありますか?

PS: はい、これは質問というより RFC (Request for Comments) です :)。

4

3 に答える 3

3

イベントが発生したときに実行する必要があることのリストを作成します。そのリストに何かを追加する関数と、そのリストから何かを削除する関数を用意してください。次に、イベントが発生したら、現在実行する必要があるジョブのリストでスレッドのプールを動作させます。イベントを特に待機しているスレッドは必要ありません。

于 2012-07-10T00:26:17.200 に答える
1

Boost::asio は、頭を包み込むのがちょっと難しいかもしれません。少なくとも、私はそれをするのに苦労しています。

スレッドを待機させる必要はありません。やるべき仕事がないときは、彼らは自分でそれをします。あなたがやりたいことのように見えた例は、アイテムごとに io_service に投稿された作業でした。

次のコードは、このリンクから着想を得たものです。それは実際に、あなたがそれをどのように使用して多くのことを行うことができるかについて私の目を開きます.

これは完璧ではないと確信していますが、一般的なアイデアは得られると思います。これが役立つことを願っています。

コード

#include <iostream>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
class ServerProcessor
{
protected:
    void handleWork1(WorkObject1* work)
    {
        //The code to do task 1 goes in here
    }
    void handleWork2(WorkObject2* work)
    {
        //The code to do task 2 goes in here
    }

    boost::thread_group worker_threads_;

    boost::asio::io_service io_service_;
    //This is used to keep io_service from running out of work and exiting to soon.
    boost::shared_ptr<boost::asio::io_service::work> work_;


public:
    void start(int numberOfThreads)
    {
        boost::shared_ptr<boost::asio::io_service::work> myWork(new boost::asio::io_service::work(io_service_));
        work_=myWork;

        for (int x=0; x < numberOfThreads; ++x)
            worker_threads_.create_thread( boost::bind( &ServerProcessor::threadAction, this ) );

    }

    void doWork1(WorkObject1* work)
    {
        io_service_.post(boost::bind(&ServerProcessor::handleWork1, this, work));
    }

    void doWork2(WorkObject2* work)
    {
        io_service_.post(boost::bind(&ServerProcessor::handleWork2, this, work));
    }


    void threadAction()
    {
        io_service_.run();
    }

    void stop()
    {
        work_.reset();
        io_service_.stop();
        worker_threads_.join_all();
    }

};

int main()
{
    ServerProcessor s;

    std::string input;
    std::cout<<"Press f to stop"<<std::endl;

    s.start(8);

    std::cin>>input;

    s.stop();

    return 0;
}
于 2012-11-19T22:56:13.370 に答える
0

boost::signals2 を使用するのはどうですか?

これは、クライアントが送信されるシグナルへのコールバックをサブスクライブできるようにする、boost::signals のスレッドセーフなスピンオフです。

次に、io_service ディスパッチされたジョブでシグナルが非同期的に発行されると、登録されているすべてのコールバックが (シグナルを発行した同じスレッドで) 実行されます。

于 2016-10-13T04:04:02.187 に答える