8

複数のスレッドからメソッドを呼び出せるクラスを作成したいと考えています。ただし、メソッドが呼び出されたスレッドでメソッドを実行するのではなく、独自のスレッドですべてを実行する必要があります。結果を返す必要はなく、呼び出し元のスレッドをブロックするべきではありません。

私が以下に含めた最初の試みの実装。パブリック メソッドは、関数ポインターとデータをジョブ キューに挿入し、ワーカー スレッドがそれを取得します。ただし、これは特に優れたコードではなく、新しいメソッドを追加するのは面倒です。

理想的には、これを基本クラスとして使用して、手間とコードの重複を最小限に抑えて (可変数の引数を使用して) メソッドを簡単に追加できるようにしたいと考えています。

これを行うためのより良い方法は何ですか? 同様のことを行う既存のコードはありますか? ありがとう

#include <queue>

using namespace std;

class GThreadObject
{
    class event
    {
        public:
        void (GThreadObject::*funcPtr)(void *);
        void * data;
    };

public:
    void functionOne(char * argOne, int argTwo);

private:
    void workerThread();
    queue<GThreadObject::event*> jobQueue;
    void functionOneProxy(void * buffer);
    void functionOneInternal(char * argOne, int argTwo);

};



#include <iostream>
#include "GThreadObject.h"

using namespace std;

/* On a continuous loop, reading tasks from queue
 * When a new event is received it executes the attached function pointer
 * It should block on a condition, but Thread code removed to decrease clutter
 */
void GThreadObject::workerThread()
{
    //New Event added, process it
    GThreadObject::event * receivedEvent = jobQueue.front();

    //Execute the function pointer with the attached data
    (*this.*receivedEvent->funcPtr)(receivedEvent->data);
}

/*
 * This is the public interface, Can be called from child threads
 * Instead of executing the event directly it adds it to a job queue
 * Then the workerThread picks it up and executes all tasks on the same thread
 */
void GThreadObject::functionOne(char * argOne, int argTwo)
{

    //Malloc an object the size of the function arguments
    int argumentSize = sizeof(char*)+sizeof(int);
    void * myData = malloc(argumentSize);
    //Copy the data passed to this function into the buffer
    memcpy(myData, &argOne, argumentSize);

    //Create the event and push it on to the queue
    GThreadObject::event * myEvent = new event;
    myEvent->data = myData;
    myEvent->funcPtr = &GThreadObject::functionOneProxy;
    jobQueue.push(myEvent);

    //This would be send a thread condition signal, replaced with a simple call here
    this->workerThread();
}

/*
 * This handles the actual event
 */
void GThreadObject::functionOneInternal(char * argOne, int argTwo)
{
    cout << "We've made it to functionTwo char*:" << argOne << " int:" << argTwo << endl;

    //Now do the work
}

/*
 * This is the function I would like to remove if possible
 * Split the void * buffer into arguments for the internal Function
 */
void GThreadObject::functionOneProxy(void * buffer)
{
    char * cBuff = (char*)buffer;
    functionOneInternal((char*)*((unsigned int*)cBuff), (int)*(cBuff+sizeof(char*)));
};

int main()
{
    GThreadObject myObj;

    myObj.functionOne("My Message", 23);

    return 0;
}
4

8 に答える 8

6

Boostと C++ 標準ライブラリに組み込まれているFuturesライブラリがあります。ACEにも同じ種類のものがありますが、誰にもお勧めしたくありません(@lotharがすでに指摘しているように、それはアクティブオブジェクトです)。

于 2009-05-29T01:05:34.647 に答える
3

以下は、「functionProxy」メソッドを必要としない実装です。新しいメソッドを追加するのは簡単ですが、それでも面倒です。

Boost::Bind と "Futures" は、この多くを整理するように見えます。ブーストコードを見て、それがどのように機能するかを確認すると思います。皆さんの提案に感謝します。

GThreadObject.h

#include <queue>

using namespace std;

class GThreadObject
{

    template <int size>
    class VariableSizeContainter
    {
        char data[size];
    };

    class event
    {
        public:
        void (GThreadObject::*funcPtr)(void *);
        int dataSize;
        char * data;
    };

public:
    void functionOne(char * argOne, int argTwo);
    void functionTwo(int argTwo, int arg2);


private:
    void newEvent(void (GThreadObject::*)(void*), unsigned int argStart, int argSize);
    void workerThread();
    queue<GThreadObject::event*> jobQueue;
    void functionTwoInternal(int argTwo, int arg2);
    void functionOneInternal(char * argOne, int argTwo);

};

GThreadObject.cpp

#include <iostream>
#include "GThreadObject.h"

using namespace std;

/* On a continuous loop, reading tasks from queue
 * When a new event is received it executes the attached function pointer
 * Thread code removed to decrease clutter
 */
void GThreadObject::workerThread()
{
    //New Event added, process it
    GThreadObject::event * receivedEvent = jobQueue.front();

    /* Create an object the size of the stack the function is expecting, then cast the function to accept this object as an argument.
     * This is the bit i would like to remove
     * Only supports 8 byte argument size e.g 2 int's OR pointer + int OR myObject8bytesSize
     * Subsequent data sizes would need to be added with an else if
     * */
    if (receivedEvent->dataSize == 8)
    {
        const int size = 8;

        void (GThreadObject::*newFuncPtr)(VariableSizeContainter<size>);
        newFuncPtr = (void (GThreadObject::*)(VariableSizeContainter<size>))receivedEvent->funcPtr;

        //Execute the function
        (*this.*newFuncPtr)(*((VariableSizeContainter<size>*)receivedEvent->data));
    }

    //Clean up
    free(receivedEvent->data);
    delete receivedEvent;

}

void GThreadObject::newEvent(void (GThreadObject::*funcPtr)(void*), unsigned int argStart, int argSize)
{

    //Malloc an object the size of the function arguments
    void * myData = malloc(argSize);
    //Copy the data passed to this function into the buffer
    memcpy(myData, (char*)argStart, argSize);

    //Create the event and push it on to the queue
    GThreadObject::event * myEvent = new event;
    myEvent->data = (char*)myData;
    myEvent->dataSize = argSize;
    myEvent->funcPtr = funcPtr;
    jobQueue.push(myEvent);

    //This would be send a thread condition signal, replaced with a simple call here
    this->workerThread();

}

/*
 * This is the public interface, Can be called from child threads
 * Instead of executing the event directly it adds it to a job queue
 * Then the workerThread picks it up and executes all tasks on the same thread
 */
void GThreadObject::functionOne(char * argOne, int argTwo)
{
    newEvent((void (GThreadObject::*)(void*))&GThreadObject::functionOneInternal, (unsigned int)&argOne, sizeof(char*)+sizeof(int));
}

/*
 * This handles the actual event
 */
void GThreadObject::functionOneInternal(char * argOne, int argTwo)
{
    cout << "We've made it to functionOne Internal char*:" << argOne << " int:" << argTwo << endl;

    //Now do the work
}

void GThreadObject::functionTwo(int argOne, int argTwo)
{
    newEvent((void (GThreadObject::*)(void*))&GThreadObject::functionTwoInternal, (unsigned int)&argOne, sizeof(int)+sizeof(int));
}

/*
 * This handles the actual event
 */
void GThreadObject::functionTwoInternal(int argOne, int argTwo)
{
    cout << "We've made it to functionTwo Internal arg1:" << argOne << " int:" << argTwo << endl;
}

main.cpp

#include <iostream>
#include "GThreadObject.h"

int main()
{

    GThreadObject myObj;

    myObj.functionOne("My Message", 23);
    myObj.functionTwo(456, 23);


    return 0;
}

編集: 完全を期すために、Boost::bind を使用して実装しました。主な違い:

queue<boost::function<void ()> > jobQueue;

void GThreadObjectBoost::functionOne(char * argOne, int argTwo)
{
    jobQueue.push(boost::bind(&GThreadObjectBoost::functionOneInternal, this, argOne, argTwo));

    workerThread();
}

void GThreadObjectBoost::workerThread()
{
    boost::function<void ()> func = jobQueue.front();
    func();
}

functionOne() の 10,000,000 回の反復に対してブースト実装を使用すると、約 19 秒かかりました。ただし、非ブーストの実装にはわずか 6.5 秒しかかかりませんでした。つまり、約 3 倍遅くなります。適切な非ロック キューを見つけることが、ここでの最大のパフォーマンスのボトルネックになると思います。しかし、それでもかなり大きな違いです。

于 2009-05-29T14:10:18.653 に答える
2

The POCO library has something along the same lines called ActiveMethod (along with some related functionality e.g. ActiveResult) in the threading section. The source code is readily available and easily understood.

于 2009-05-29T01:20:12.060 に答える
1

これは、Boost の Thread -library を使用して解決できます。このようなもの(半疑似):


class GThreadObject
{
        ...

        public:
                GThreadObject()
                : _done(false)
                , _newJob(false)
                , _thread(boost::bind(>hreadObject::workerThread, this))
                {
                }

                ~GThreadObject()
                {
                        _done = true;

                        _thread.join();
                }

                void functionOne(char *argOne, int argTwo)
                {
                        ...

                        _jobQueue.push(myEvent);

                        {
                                boost::lock_guard l(_mutex);

                                _newJob = true;
                        }

                        _cond.notify_one();
                }

        private:
                void workerThread()
                {
                        while (!_done) {
                                boost::unique_lock l(_mutex);

                                while (!_newJob) {
                                        cond.wait(l);
                                }

                                Event *receivedEvent = _jobQueue.front();

                                ...
                        }
                }

        private:
                volatile bool             _done;
                volatile bool             _newJob;
                boost::thread             _thread;
                boost::mutex              _mutex;
                boost::condition_variable _cond;
                std::queue<Event*>        _jobQueue;
};

また、 RAIIにより、このコードを小さくして管理しやすくする方法にも注目してください。

于 2009-05-29T08:42:34.337 に答える
1

ACE フレームワークのACE パターンの1 つであるActive Objectに興味があるかもしれません。

Nikolaiが指摘したように、将来的には標準 C++ の将来が計画されています (しゃれが意図されています)

于 2009-05-29T00:57:29.360 に答える
1

拡張性と保守性 (およびその他の機能) のために、スレッドが実行する「ジョブ」の抽象クラス (またはインターフェイス) を定義できます。次に、スレッド プールのユーザーがこのインターフェイスを実装し、オブジェクトへの参照をスレッド プールに渡します。これは、Symbian Active Object の設計に非常に似ています。すべての AO は CActive をサブクラス化し、Run() や Cancel() などのメソッドを実装する必要があります。

簡単にするために、インターフェイス (抽象クラス) は次のように単純にすることができます。

class IJob
{
    virtual Run()=0;
};

次に、スレッドプール、またはリクエストを受け入れる単一のスレッドは次のようになります。

class CThread
{
   <...>
public:
   void AddJob(IJob* iTask);
   <...>
};

当然のことながら、あらゆる種類の追加のセッター/ゲッター/属性、およびあらゆる人生で必要なものをすべて持つことができる複数のタスクがあります。ただし、必要なのは、時間のかかる計算を実行するメソッド Run() を実装することだけです。

class CDumbLoop : public IJob
{
public:
    CDumbJob(int iCount) : m_Count(iCount) {};
    ~CDumbJob() {};
    void Run()
    {
        // Do anything you want here
    }
private:
    int m_Count;
};
于 2009-05-29T01:26:47.413 に答える
0

Boost ASIO ライブラリを見てください。イベントを非同期にディスパッチするように設計されています。Boost Thread ライブラリと組み合わせて、説明したシステムを構築できます。

boost::asio::io_service単一のオブジェクトをインスタンス化し、一連の非同期イベント (boost::asio::io_service::postまたは) をスケジュールする必要がありますboost::asio::io_service::dispatch。次に、n 個のスレッドrunからメンバー関数を呼び出します。オブジェクトはスレッドセーフであり、非同期ハンドラーが呼び出し元のスレッドでのみディスパッチされることが保証されます。io_serviceio_service::run

このboost::asio::strandオブジェクトは、単純なスレッド同期にも役立ちます。

ASIO ライブラリは、この問題に対する非常に洗練されたソリューションだと思います。

于 2009-05-29T12:22:34.097 に答える
0

同様の目的で作成したクラスを次に示します (イベント処理に使用しますが、もちろん名前を ActionQueue に変更し、そのメソッドの名前を変更することもできます)。

次のように使用します。

呼び出したい関数で:void foo (const int x, const int y) { /*...*/ }

と:EventQueue q;

q.AddEvent (boost::bind (foo, 10, 20));

ワーカースレッドで

q.PlayOutEvents ();

注: CPU サイクルを使い果たすのを避けるために、条件付きでブロックにコードを追加するのはかなり簡単です。

コード (ブースト 1.34.1 を使用した Visual Studio 2003):

#pragma once

#include <boost/thread/recursive_mutex.hpp>
#include <boost/function.hpp>
#include <boost/signals.hpp>
#include <boost/bind.hpp>
#include <boost/foreach.hpp>
#include <string>
using std::string;


// Records & plays out actions (closures) in a safe-thread manner.

class EventQueue
{
    typedef boost::function <void ()> Event;

public:

    const bool PlayOutEvents ()
    {
        // The copy is there to ensure there are no deadlocks.
        const std::vector<Event> eventsCopy = PopEvents ();

        BOOST_FOREACH (const Event& e, eventsCopy)
        {
            e ();
            Sleep (0);
        }

        return eventsCopy.size () > 0;
    }

    void AddEvent (const Event& event)
    {
        Mutex::scoped_lock lock (myMutex);

        myEvents.push_back (event);
    }

protected:

    const std::vector<Event> PopEvents ()
    {
        Mutex::scoped_lock lock (myMutex);

        const std::vector<Event> eventsCopy = myEvents;
        myEvents.clear ();

        return eventsCopy;
    }

private:

    typedef boost::recursive_mutex Mutex;
    Mutex myMutex;

    std::vector <Event> myEvents;

};

これが役立つことを願っています。:)

マーティン・ビルスキー

于 2009-05-29T11:33:12.777 に答える