5

おおよそのインターフェースに従う生産者/消費者シナリオを実装したいと思います。

class Consumer {
private:
    vector<char> read(size_t n) {
        // If the internal buffer has `n` elements, then dequeue them
        // Otherwise wait for more data and try again
    }
public:
    void run() {
        read(10);
        read(4839);
        // etc
    }
    void feed(const vector<char> &more) {
        // Safely queue the data
        // Notify `read` that there is now more data
    }
};

この場合、feedrunは別々のスレッドで実行されread、ブロッキング読み取り (recvと などfread) である必要があります。明らかに、deque である種の相互排除が必要にreadなります。また、再試行を通知する何らかの通知システムが必要です。

条件変数が最適だと聞きましたが、マルチスレッドの経験はすべて Windows にあり、それらについて頭を悩ませています。

助けてくれてありがとう!

(はい、ベクトルを返すのは非効率的であることはわかっています。それには立ち入りません。)

4

6 に答える 6

8

このコードは本番環境に対応していません。ライブラリ呼び出しの結果に対してエラー チェックは行われません。

ミューテックスのロック/ロック解除を LockThread でラップしたので、例外セーフです。しかし、それはそれについてです。

さらに、これを真剣に行っている場合は、ミューテックスと条件変数をオブジェクト内にラップして、Consumer の他のメソッド内で悪用できるようにします。ただし、条件変数を使用する前に (何らかの方法で) ロックを取得する必要があることに注意する限り、この単純な状況はそのまま維持できます。

興味があるので、ブースト スレッド ライブラリをチェックしましたか?

#include <iostream>
#include <vector>
#include <pthread.h>

class LockThread
{
    public:
    LockThread(pthread_mutex_t& m)
        :mutex(m)
    {
        pthread_mutex_lock(&mutex);
    }
    ~LockThread()
    {
        pthread_mutex_unlock(&mutex);
    }
    private:
        pthread_mutex_t& mutex;
};
class Consumer
{
    pthread_mutex_t     lock;
    pthread_cond_t      cond;
    std::vector<char>   unreadData;
    public:
    Consumer()
    {
        pthread_mutex_init(&lock,NULL);
        pthread_cond_init(&cond,NULL);
    }
    ~Consumer()
    {
        pthread_cond_destroy(&cond);
        pthread_mutex_destroy(&lock);
    }

    private:
        std::vector<char> read(size_t n)
        {
            LockThread  locker(lock);
            while (unreadData.size() < n)
            {
                // Must wait until we have n char.
                // This is a while loop because feed may not put enough in.

                // pthread_cond() releases the lock.
                // Thread will not be allowed to continue until
                // signal is called and this thread reacquires the lock.

                pthread_cond_wait(&cond,&lock);

                // Once released from the condition you will have re-aquired the lock.
                // Thus feed() must have exited and released the lock first.
            }

            /*
             * Not sure if this is exactly what you wanted.
             * But the data is copied out of the thread safe buffer
             * into something that can be returned.
             */
            std::vector<char>   result(n); // init result with size n
            std::copy(&unreadData[0],
                      &unreadData[n],
                      &result[0]);

            unreadData.erase(unreadData.begin(),
                             unreadData.begin() + n);
            return (result);
        }
public:
    void run()
    {
        read(10);
        read(4839);
        // etc
    }
    void feed(const std::vector<char> &more)
    {
        LockThread  locker(lock);

        // Once we acquire the lock we can safely modify the buffer.
        std::copy(more.begin(),more.end(),std::back_inserter(unreadData));

        // Only signal the thread if you have the lock
        // Otherwise race conditions happen.
        pthread_cond_signal(&cond);

        // destructor releases the lock and thus allows read thread to continue.
    }
};


int main()
{
    Consumer    c;
}
于 2008-10-15T23:42:48.093 に答える
2

私は「同期化されたキュー」と呼ぶものを使用する傾向があります。通常のキューをラップし、必要に応じてロックと読み取りブロックの両方に Semaphore クラスを使用します。

#ifndef SYNCQUEUE_20061005_H_
#define SYNCQUEUE_20061005_H_

#include <queue>
#include "Semaphore.h"

// similar, but slightly simpler interface to std::queue
// this queue implementation will serialize pushes and pops
// and block on a pop while empty (as apposed to throwing an exception)
// it also locks as neccessary on insertion and removal to avoid race 
// conditions

template <class T, class C = std::deque<T> > class SyncQueue {
protected:
    std::queue<T, C>    m_Queue;
    Semaphore           m_Semaphore;
    Mutex               m_Mutex;

public:
    typedef typename std::queue<T, C>::value_type value_type;
    typedef typename std::queue<T, C>::size_type size_type;

    explicit SyncQueue(const C& a = C()) : m_Queue(a), m_Semaphore(0) {}

    bool empty() const              { return m_Queue.empty(); }
    size_type size() const          { return m_Queue.size(); }

    void push(const value_type& x);
    value_type pop();
};

template <class T, class C>
void SyncQueue<T, C>::push(const SyncQueue<T, C>::value_type &x) {
    // atomically push item
    m_Mutex.lock(); 
    m_Queue.push(x); 
    m_Mutex.unlock(); 

    // let blocking semaphore know another item has arrived
    m_Semaphore.v();
}

template <class T, class C>
typename SyncQueue<T, C>::value_type SyncQueue<T, C>::pop() {
    // block until we have at least one item
    m_Semaphore.p();

    // atomically read and pop front item
    m_Mutex.lock();
    value_type ret = m_Queue.front();
    m_Queue.pop();
    m_Mutex.unlock();

    return ret;
}

#endif

スレッド実装で適切なプリミティブを使用して、セマフォとミューテックスを実装できます。

注: この実装は、キュー内の単一要素の例ですが、N が提供されるまで結果をバッファリングする関数で簡単にラップできます。文字のキューの場合は次のようになります。

std::vector<char> func(int size) {
    std::vector<char> result;
    while(result.size() != size) {
        result.push_back(my_sync_queue.pop());
    }
    return result;
}
于 2008-10-15T23:57:04.117 に答える
1

いくつかのセミ擬似コードをスローします。これが私のコメントです:

1)ここに非常に大きなロックの粒子があります。より高速なアクセスが必要な場合は、データ構造を再考する必要があります。STLはスレッドセーフではありません。

2)ミューテックスが通過するまでロックはブロックされます。ミューテックス構造では、ロック/ロック解除メカニズムを使用して、一度に1つのスレッドを通過させます。ポーリングや、ある種の例外的な構造は必要ありません。

3)これは問題のかなり構文的にハッキーなカットです。私はAPIやC++構文について正確ではありませんが、意味的に正しい解決策を提供すると思います。

4)コメントに応じて編集。

class piper
{
pthread_mutex queuemutex;
pthread_mutex readymutex;
bool isReady; //init to false by constructor

//whatever else
};

piper::read()
{//whatever
pthread_mutex_lock(&queuemutex)
if(myqueue.size() >= n)
{ 
   return_queue_vector.push_back(/* you know what to do here */)

    pthread_mutex_lock(&readymutex)
    isReady = false;
    pthread_mutex_unlock(&readymutex)
}
pthread_mutex_unlock(&queuemutex)
}

piper::push_em_in()
{
//more whatever
pthread_mutex_lock(&queuemutex)
//push push push
if(myqueue.size() >= n)
{
    pthread_mutex_lock(&readymutex)
    isReady = true;
    pthread_mutex_unlock(&readymutex)
}
pthread_mutex_unlock(&queuemutex)
}
于 2008-10-15T23:13:29.743 に答える
1

さらに楽しむために、これが私の最終バージョンです。正当な理由もなくSTL化。:-)

#include <algorithm>
#include <deque>
#include <pthread.h>

template<typename T>
class MultithreadedReader {
    std::deque<T>   buffer;
    pthread_mutex_t moreDataMutex;
    pthread_cond_t  moreDataCond;

protected:
    template<typename OutputIterator>
    void read(size_t count, OutputIterator result) {
        pthread_mutex_lock(&moreDataMutex);

        while (buffer.size() < count) {
            pthread_cond_wait(&moreDataCond, &moreDataMutex);
        }
        std::copy(buffer.begin(), buffer.begin() + count, result);
        buffer.erase(buffer.begin(), buffer.begin() + count);

        pthread_mutex_unlock(&moreDataMutex);
    }

public:
    MultithreadedReader() {
        pthread_mutex_init(&moreDataMutex, 0);
        pthread_cond_init(&moreDataCond, 0);
    }

    ~MultithreadedReader() {
        pthread_cond_destroy(&moreDataCond);
        pthread_mutex_destroy(&moreDataMutex);
    }

    template<typename InputIterator>
    void feed(InputIterator first, InputIterator last) {
        pthread_mutex_lock(&moreDataMutex);

        buffer.insert(buffer.end(), first, last);
        pthread_cond_signal(&moreDataCond);

        pthread_mutex_unlock(&moreDataMutex);
    }
};
于 2008-10-16T00:36:49.483 に答える
1

楽しみのために、Boost を使用した簡単で汚い実装を次に示します。それをサポートするプラットフォームでは内部で pthread を使用し、Windows では Windows 操作を使用します。

boost::mutex access;
boost::condition cond;

// consumer
data read()
{
  boost::mutex::scoped_lock lock(access);
  // this blocks until the data is ready
  cond.wait(lock);

  // queue is ready
  return data_from_queue();
}

// producer
void push(data)
{
  boost::mutex::scoped_lock lock(access);
  // add data to queue

  if (queue_has_enough_data())
    cond.notify_one();  
}
于 2008-10-16T00:04:31.873 に答える
0

Glib 非同期キューは、探している空のキューの読み取り時にロックとスリープを提供します。http://library.gnome.org/devel/glib/2.20/glib-Asynchronous-Queues.htmlを参照してください。これらを gthreads または gthread プールと組み合わせることができます。

于 2009-04-30T16:44:36.993 に答える