2

timeout-capable を使用した C++ のブロッキング キューが必要でしたoffer()キューは、複数のプロデューサー、1 つのコンシューマーを対象としています。私が実装していたとき、このニーズに合った適切な既存のキューが見つからなかったので、自分でコーディングしました。

キューのメソッドからセグメンテーション違反が発生してtake()いますが、それらは断続的です。問題がないかコードを調べてきましたが、問題があるように見えるものは何も見当たりません。

私は疑問に思っています:

  • これを確実に行う既存のライブラリがあり、これを使用する必要があります (ブーストまたはヘッダーのみを推奨)。
  • 私のコードに修正が必要な明らかな欠陥があれば、誰でも見つけます。

ヘッダーは次のとおりです。

class BlockingQueue
{
    public:
        BlockingQueue(unsigned int capacity) : capacity(capacity) { };
        bool offer(const MyType & myType, unsigned int timeoutMillis);
        MyType take();
        void put(const MyType & myType);
        unsigned int getCapacity();
        unsigned int getCount();

    private:
         std::deque<MyType> queue;
         unsigned int capacity;
};

そして関連する実装:

boost::condition_variable cond;
boost::mutex mut;

bool BlockingQueue::offer(const MyType & myType, unsigned int timeoutMillis)
{
    Timer timer;

    // boost::unique_lock is a scoped lock - its destructor will call unlock().
    // So no need for us to make that call here.
    boost::unique_lock<boost::mutex> lock(mut);

    // We use a while loop here because the monitor may have woken up because
    // another producer did a PulseAll. In that case, the queue may not have
    // room, so we need to re-check and re-wait if that is the case.
    // We use an external stopwatch to stop the madness if we have taken too long.
    while (queue.size() >= this->capacity)
    {
        int monitorTimeout = timeoutMillis - ((unsigned int) timer.getElapsedMilliSeconds());

        if (monitorTimeout <= 0)
        {
            return false;
        }

        if (!cond.timed_wait(lock, boost::posix_time::milliseconds(timeoutMillis)))
        {
            return false;
        }
    }

    cond.notify_all();

    queue.push_back(myType);

    return true;
}

void BlockingQueue::put(const MyType & myType)
{
    // boost::unique_lock is a scoped lock - its destructor will call unlock().
    // So no need for us to make that call here.
    boost::unique_lock<boost::mutex> lock(mut);

    // We use a while loop here because the monitor may have woken up because
    // another producer did a PulseAll. In that case, the queue may not have
    // room, so we need to re-check and re-wait if that is the case.
    // We use an external stopwatch to stop the madness if we have taken too long.
    while (queue.size() >= this->capacity)
    {
        cond.wait(lock);
    }

    cond.notify_all();

    queue.push_back(myType);
}

MyType BlockingQueue::take()
{
    // boost::unique_lock is a scoped lock - its destructor will call unlock().
    // So no need for us to make that call here.
    boost::unique_lock<boost::mutex> lock(mut);

    while (queue.size() == 0)
    {
        cond.wait(lock);
    }

    cond.notify_one();

    MyType myType = this->queue.front();

    this->queue.pop_front();

    return myType;
}

unsigned int BlockingQueue::getCapacity()
{
    return this->capacity;
}

unsigned int BlockingQueue::getCount()
{
    return this->queue.size();
}

はい、私はテンプレートを使用してクラスを実装しませんでした-それはリストの次です:)

どんな助けでも大歓迎です。スレッド化の問題は、特定するのが非常に難しい場合があります。

-ベン

4

2 に答える 2

0

あなたのコードの問題は、いくつかのスレッドで両端キューを変更していると思います。見て:

  1. 別のスレッドからの調整を待っています。
  2. 次に、変更する直前に deque がロック解除されていることを示すシグナルを他のスレッドにすぐに送信します。
  3. 次に、他のスレッドが両端キューがすでにロック解除されていると考えている間に両端キューを変更し、同じことを開始します。

したがって、cond.notify_*()両端キューを変更した後にすべてを配置してみてください。すなわち:

void BlockingQueue::put(const MyType & myType)
{
    boost::unique_lock<boost::mutex> lock(mut);
    while (queue.size() >= this->capacity)
    {
        cond.wait(lock);
    }

    queue.push_back(myType);  // <- modify first

    cond.notify_all();        // <- then say to others that deque is free
}

理解を深めるために、について読むことをお勧めしますpthread_cond_wait()

于 2015-02-04T09:10:51.017 に答える
0

cond と mut がグローバルなのはなぜですか? それらが BlockingQueue オブジェクトのメンバーであることを期待しています。他に何がそれらに触れているのかはわかりませんが、そこに問題がある可能性があります。

私も、より大きなプロジェクトの一部として ThreadSafeQueue を実装しました。

https://github.com/cdesjardins/QueuePtr/blob/master/include/ThreadSafeQueue.h

基本的に最大容量がないため、エンキュー(別名オファー)機能が非ブロックであることを除いて、それはあなたのものと同様の概念です。容量を強制するために、通常、システムの初期化時に N 個のバッファーが追加されたプールと、実行時にメッセージを渡すためのキューを用意します。これにより、実行時にメモリを割り当てる必要がなくなります。組み込みアプリケーションで動作します)。

プールとキューの唯一の違いは、プールはシステムの初期化時にキューに入れられた一連のバッファーを取得することです。したがって、次のようなものがあります。

ThreadSafeQueue<BufferDataType*> pool;
ThreadSafeQueue<BufferDataType*> queue;

void init()
{
    for (int i = 0; i < NUM_BUFS; i++)
    {
        pool.enqueue(new BufferDataType);
    }
}

次に、メッセージを送信する場合は、次のようにします。

void producerA()
{
    BufferDataType *buf;
    if (pool.waitDequeue(buf, timeout) == true)
    {
        initBufWithMyData(buf);
        queue.enqueue(buf);
    }
}

このように、エンキュー機能は迅速かつ簡単ですが、プールが空の場合は、誰かがバッファーをプールに戻すまでブロックされます。アイデアは、他のスレッドがキューでブロックされ、次のように処理されたときにバッファーをプールに返すというものです。

void consumer()
{
    BufferDataType *buf;
    if (queue.waitDequeue(buf, timeout) == true)
    {
        processBufferData(buf);
        pool.enqueue(buf);
    }
}

とにかくそれを見てください、多分それは助けになるでしょう。

于 2013-10-29T22:21:55.860 に答える