timeout-capable を使用した C++ のブロッキング キューが必要でしたoffer()
。キューは、複数のプロデューサー、1 つのコンシューマーを対象としています。私が実装していたとき、このニーズに合った適切な既存のキューが見つからなかったので、自分でコーディングしました。
キューのメソッドからセグメンテーション違反が発生してtake()
いますが、それらは断続的です。問題がないかコードを調べてきましたが、問題があるように見えるものは何も見当たりません。
私は疑問に思っています:
- これを確実に行う既存のライブラリがあり、これを使用する必要があります (ブーストまたはヘッダーのみを推奨)。
- 私のコードに修正が必要な明らかな欠陥があれば、誰でも見つけます。
ヘッダーは次のとおりです。
class BlockingQueue
{
public:
BlockingQueue(unsigned int capacity) : capacity(capacity) { };
bool offer(const MyType & myType, unsigned int timeoutMillis);
MyType take();
void put(const MyType & myType);
unsigned int getCapacity();
unsigned int getCount();
private:
std::deque<MyType> queue;
unsigned int capacity;
};
そして関連する実装:
boost::condition_variable cond;
boost::mutex mut;
bool BlockingQueue::offer(const MyType & myType, unsigned int timeoutMillis)
{
Timer timer;
// boost::unique_lock is a scoped lock - its destructor will call unlock().
// So no need for us to make that call here.
boost::unique_lock<boost::mutex> lock(mut);
// We use a while loop here because the monitor may have woken up because
// another producer did a PulseAll. In that case, the queue may not have
// room, so we need to re-check and re-wait if that is the case.
// We use an external stopwatch to stop the madness if we have taken too long.
while (queue.size() >= this->capacity)
{
int monitorTimeout = timeoutMillis - ((unsigned int) timer.getElapsedMilliSeconds());
if (monitorTimeout <= 0)
{
return false;
}
if (!cond.timed_wait(lock, boost::posix_time::milliseconds(timeoutMillis)))
{
return false;
}
}
cond.notify_all();
queue.push_back(myType);
return true;
}
void BlockingQueue::put(const MyType & myType)
{
// boost::unique_lock is a scoped lock - its destructor will call unlock().
// So no need for us to make that call here.
boost::unique_lock<boost::mutex> lock(mut);
// We use a while loop here because the monitor may have woken up because
// another producer did a PulseAll. In that case, the queue may not have
// room, so we need to re-check and re-wait if that is the case.
// We use an external stopwatch to stop the madness if we have taken too long.
while (queue.size() >= this->capacity)
{
cond.wait(lock);
}
cond.notify_all();
queue.push_back(myType);
}
MyType BlockingQueue::take()
{
// boost::unique_lock is a scoped lock - its destructor will call unlock().
// So no need for us to make that call here.
boost::unique_lock<boost::mutex> lock(mut);
while (queue.size() == 0)
{
cond.wait(lock);
}
cond.notify_one();
MyType myType = this->queue.front();
this->queue.pop_front();
return myType;
}
unsigned int BlockingQueue::getCapacity()
{
return this->capacity;
}
unsigned int BlockingQueue::getCount()
{
return this->queue.size();
}
はい、私はテンプレートを使用してクラスを実装しませんでした-それはリストの次です:)
どんな助けでも大歓迎です。スレッド化の問題は、特定するのが非常に難しい場合があります。
-ベン