4

Boostスレッドと条件を使用して、基本的なスレッド化されたプロデューサー-コンシューマー(スレッド1 =プロデューサー、スレッド2 =コンシューマー)を実装しました。私はwait()で無期限に頻繁に立ち往生しています。ここで何が悪いのか本当にわかりません。以下はいくつかの擬似コードです:

// main class
class Main {
public:
  void AddToQueue(...someData...)
  {
    boost::mutex::scoped_lock lock(m_mutex);
    m_queue.push_back(new QueueItem(...someData...));
    m_cond.notify_one(); 
  }

  void RemoveQueuedItem(...someCond...)
  {
    // i'm wondering if this could cause the trouble?
    boost::mutex::scoped_lock lock(m_mutex);
    // erase a item matching condition (some code not shown,
    // but should be fairly self-explanatory -- IsMatch()
    // simply looks at a flag of QueueItem
    m_queue.erase(std::remove_if(m_queue.being(), m_queue.end(),
      boost::bind(&Main::IsMatch, this, _1, someCond), m_queue.end());
  }

  friend void WorkerThread(Main* m);
private:      
  boost::ptr_deque<QueueItem> m_queue;
  boost::mutex m_mutex;
  boost::condition m_cond;
};

// worker thread
void WorkerThread(Main* m)
{
  typedef boost::ptr_deque<QueueItem>::auto_type RelType;
  RelType queueItem;

  while(!shutDown) {
    { // begin mutex scope
      boost::mutex::scoped_lock lock(m->m_mutex);
      while(m->m_queue.empty()) {
        m->m_cond.wait(lock); // <- stuck here forever quite often!
      }
      queueItem = m->m_queue->pop_front(); // pop & take ptr ownership
    } // end mutex scope

    // ... do stuff with queueItem
    // ...
    // ... queueItem is deleted when it leaves scope & we loop around
  }
}

いくつかの追加情報:

  • Boostv1.44の使用
  • LinuxとAndroidで問題が発生しています。それがWindowsで起こるかどうかはまだわかりません

何か案は?

更新:私問題を切り分けたと思います。確認したらさらに更新します。明日になることを願っています。

更新2:上記のコードに問題はないことがわかりました。私はAddToQueue()の基盤となるAPIに依存していました-ワーカースレッドでデータを処理してAPIに戻すときに、AddToQueue()を再度呼び出すという循環的なバグがありました...これは修正されました;-)

4

2 に答える 2

2

私はSTLキューを使用していますが、最近同様のことをしました。私の実装から選択できるかどうかを確認してください。wilxが言うように、あなたは状態を待つ必要があります。私の実装では、キュー内の要素に最大制限があり、それを使用してミューテックス/ガードが解放されるのを待ちます。

私はもともと、ミューテックスまたはクリティカルセクションを使用する機能を念頭に置いてWindowsでこれを実行しました。したがって、テンプレートパラメータを削除しboost::mutexて、簡略化した場合に直接使用できます。

#include <queue>
#include "Message.h"
#include <boost/thread/locks.hpp>
#include <boost/thread/condition.hpp>

template <typename T> class Queue :  private boost::noncopyable
{
public:
  // constructor binds the condition object to the Q mutex
  Queue(T & mutex, size_t max_size) :  m_max_size(max_size), m_mutex(mutex){}

  // writes messages to end of Q 
  void put(const Message & msg)
  {
    // Lock mutex to ensure exclusive access to Q
    boost::unique_lock<T> guard(m_mutex);

    // while Q is full, sleep waiting until something is taken off of it
    while (m_queue.size() == m_max_size)
    {
      cond.wait(guard);
    }

    // ok, room on the queue. 
    // Add the message to the queue
    m_queue.push(msg);

    // Indicate so data can be ready from Q
    cond.notify_one();
  }

  // Read message from front of Q. Message is removed from the Q
  Message get(void)
  {
    // Lock mutex to ensure exclusive access to Q
    boost::unique_lock<T> guard(m_mutex);

    // If Q is empty, sleep waiting for something to be put onto it
    while (m_queue.empty())
    {
      cond.wait(guard);
    }

    // Q not empty anymore, read the value
    Message msg = m_queue.front();

    // Remove it from the queue
    m_queue.pop();

    // Signal so more data can be added to Q
    cond.notify_one();

    return msg;
  }

  size_t max_size(void) const
  {
    return m_max_size;
  }


private:
  const size_t m_max_size;
  T & m_mutex;
  std::queue<Message> m_queue;
  boost::condition_variable_any cond;
};

このようにして、プロデューサー/コンシューマー間でキューを共有できます。使用例

boost::mutex mutex;

Queue<boost::mutex> q(mutex, 100);

boost::thread_group threads;

threads.create_thread(Producer<boost::mutex>(q));
threads.create_thread(Consumer<boost::mutex>(q));

threads.join_all();

生産者/消費者が以下のように定義されている場合

template <typename T> class Producer
{
public:
   // Queue passed in
   explicit Producer(Queue<T> &q) :  m_queue(q) {}

   void operator()()
   {
   }
}
于 2010-10-14T08:41:58.963 に答える
0
m->m_cond.wait(); // <- stuck here forever quite often!

する必要があります:

m->m_cond.wait( lock ); 

まだミューテックスを取得していたのに、待っていたので、クラスを完全にロックしました。他のすべてのメソッドは、同じミューテックスを取得し、ミューテックスを解放しないワーカーを待ちます。

于 2010-10-14T08:54:13.047 に答える