45

私はいくつかのJavaコードをC++に移植している最中であり、ある特定のセクションではBlockingQueueを使用して、多くのプロデューサーから単一のコンシューマーにメッセージを渡します。

Java BlockingQueueが何であるかをよく知らない場合、それはハードキャパシティを持つ単なるキューであり、キューからput()およびtake()へのスレッドセーフメソッドを公開します。put()はキューがいっぱいの場合はブロックし、take()はキューが空の場合はブロックします。また、これらのメソッドのタイムアウトセンシティブバージョンが提供されています。

タイムアウトは私のユースケースに関連しているので、タイムアウトを提供する推奨事項が理想的です。そうでない場合は、自分でコーディングできます。

私はグーグルでBoostライブラリをすばやく閲覧しましたが、このようなものは見つかりませんでした。多分私はここで盲目です...しかし誰かが良い推薦を知っていますか?

ありがとう!

4

4 に答える 4

58

これは固定サイズではなく、タイムアウトをサポートしていませんが、C++2011構造を使用して最近投稿したキューの簡単な実装を次に示します。

#include <mutex>
#include <condition_variable>
#include <deque>

template <typename T>
class queue
{
private:
    std::mutex              d_mutex;
    std::condition_variable d_condition;
    std::deque<T>           d_queue;
public:
    void push(T const& value) {
        {
            std::unique_lock<std::mutex> lock(this->d_mutex);
            d_queue.push_front(value);
        }
        this->d_condition.notify_one();
    }
    T pop() {
        std::unique_lock<std::mutex> lock(this->d_mutex);
        this->d_condition.wait(lock, [=]{ return !this->d_queue.empty(); });
        T rc(std::move(this->d_queue.back()));
        this->d_queue.pop_back();
        return rc;
    }
};

ポップするまでの時間指定待機を延長して使用するのは簡単なはずです。私がそれをしなかった主な理由は、私がこれまで考えてきたインターフェースの選択に満足していないということです。

于 2012-10-09T17:56:40.910 に答える
6

シャットダウン要求機能を備えたブロッキングキューの例を次に示します。

template <typename T> class BlockingQueue {
  std::condition_variable _cvCanPop;
  std::mutex _sync;
  std::queue<T> _qu;
  bool _bShutdown = false;

public:
  void Push(const T& item)
  {
    {
      std::unique_lock<std::mutex> lock(_sync);
      _qu.push(item);
    }
    _cvCanPop.notify_one();
  }

  void RequestShutdown() {
    {
      std::unique_lock<std::mutex> lock(_sync);
      _bShutdown = true;
    }
    _cvCanPop.notify_all();
  }

  bool Pop(T &item) {
    std::unique_lock<std::mutex> lock(_sync);
    for (;;) {
      if (_qu.empty()) {
        if (_bShutdown) {
          return false;
        }
      }
      else {
        break;
      }
      _cvCanPop.wait(lock);
    }
    item = std::move(_qu.front());
    _qu.pop();
    return true;
  }
};
于 2018-09-01T12:27:01.353 に答える
0

BlockingQueueOK私はパーティーに少し遅れていますが、これはJavaの実装により適していると思います。ここでも、1つのミューテックスと2つの条件を使用して、満杯でも空でもないことを管理しています。IMO aBlockingQueueは、他の回答では見られなかった限られた容量でより理にかなっています。簡単なテストシナリオも含めます。

#include <iostream>
#include <algorithm>
#include <queue>
#include <mutex>
#include <thread>
#include <condition_variable>

template<typename T>
class blocking_queue {
private:
    size_t _capacity;
    std::queue<T> _queue;
    std::mutex _mutex;
    std::condition_variable _not_full;
    std::condition_variable _not_empty;

public:
    inline blocking_queue(size_t capacity) : _capacity(capacity) {
        // empty
    }

    inline size_t size() const {
        std::unique_lock<std::mutex> lock(_mutex);
        return _queue.size();
    }

    inline bool empty() const {
        std::unique_lock<std::mutex> lock(_mutex);
        return _queue.empty();
    }

    inline void push(const T& elem) {
        {
            std::unique_lock<std::mutex> lock(_mutex);

            // wait while the queue is full
            while (_queue.size() >= _capacity) {
                _not_full.wait(lock);
            }
            std::cout << "pushing element " << elem << std::endl;
            _queue.push(elem);
        }
        _not_empty.notify_all();
    }

    inline void pop() {
        {
            std::unique_lock<std::mutex> lock(_mutex);

            // wait while the queue is empty
            while (_queue.size() == 0) {
                _not_empty.wait(lock);
            }
            std::cout << "popping element " << _queue.front() << std::endl;
            _queue.pop();
        }
        _not_full.notify_one();
    }

    inline const T& front() {
        std::unique_lock<std::mutex> lock(_mutex);

        // wait while the queue is empty
        while (_queue.size() == 0) {
            _not_empty.wait(lock);
        }
        return _queue.front();
    }
};

int main() {
    blocking_queue<int> queue(5);

    // create producers
    std::vector<std::thread> producers;
    for (int i = 0; i < 10; i++) {
        producers.push_back(std::thread([&queue, i]() {
            queue.push(i);
            // produces too fast
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        }));
    }

    // create consumers
    std::vector<std::thread> consumers;
    for (int i = 0; i < 10; i++) {
        producers.push_back(std::thread([&queue, i]() {
            queue.pop();
            // consumes too slowly
            std::this_thread::sleep_for(std::chrono::milliseconds(1000));
        }));
    }

    std::for_each(producers.begin(), producers.end(), [](std::thread &thread) {
        thread.join();
    });

    std::for_each(consumers.begin(), consumers.end(), [](std::thread &thread) {
        thread.join();
    });

    return EXIT_SUCCESS;
}
于 2019-08-27T21:39:39.887 に答える
0

Uは最初にセマフォアのクラスを書く必要があります

#ifndef SEMEPHORE_H
#define SEMEPHORE_H
#include <mutex>
#include <condition_variable>

class semephore {
public:
    semephore(int count = 0)
        : count(count),
          m(),
          cv()
    {

    }

    void await() {
        std::unique_lock<std::mutex> lk(m);
        --count;
        if (count < 0) {
            cv.wait(lk);
        }
    }

    void post() {
        std::unique_lock<std::mutex> lk(m);
        ++count;
        if (count <= 0) {
            cv.notify_all();
        }
    }
    
private:
    int count;
    std::mutex m;
    std::condition_variable cv;
};

#endif // SEMEPHORE_H

これで、blocked_queueはセマフォアを使用して処理できます

#ifndef BLOCKED_QUEUE_H
#define BLOCKED_QUEUE_H
#include <list>
#include "semephore.h"

template <typename T>
class blocked_queue {
public:
    blocked_queue(int count) 
        : s_products(),
          s_free_space(count),
          li()
    {

    }

    void put(const T &t) {
        s_free_space.await();
        li.push_back(t);
        s_products.post();
    }

    T take() {
        s_products.await();
        T res = li.front();
        li.pop_front();
        s_free_space.post();
        return res;
    }
private:
    semephore s_products;
    semephore s_free_space;
    std::list<T> li;
};

#endif // BLOCKED_QUEUE_H

于 2021-11-28T07:15:18.207 に答える