2

以下をC++で実装したいのですが、並行性を管理するためのより簡単な方法がどれであるかわかりません。

Producerベクトルの後ろに要素を追加しているスレッドがありますV。要素が追加されると、読み取り専用と見なされます。簡単にするために、成長時にイテレータを無効にしないベクトルを使用できる、または読み取り/書き込みミューテックスでロックを処理すると仮定します。しかし、ベクターの読者には、の複数の連続した要素にアクセスしたい場合がありV、その一部はまだ生成されていない可能性があります。

いつでもV、「o」で表すいくつかの要素がありProducer、「w」で表す要素をさらに追加する可能性があります。したがって、のデータは V 概念的に次のようになります。

o o o o w w w w

Vまだ生成されていない要素/ダミーに物理的に入れたくないので、「概念的に」強調します。今、読者の1人は、まだ完全に生成されていないRセグメントに興味を持っています。V

o o o o w w w w
    | | | |
    ---R---

Vしたがって、Rは、Rが必要とするすべての要素が含まれるまで、成長するのを待つ必要があります。ある時点でj生成された要素の中で最も高いインデックスを使用して、インデックスを増やすことができます。V問題は、Rそのインデックスの特定の値を待機させる簡単な方法はありますか?

4

2 に答える 2

1

コンシューマーが 1 人いると仮定すると、次のようなことができます。

    ...
    pthread_mutex_lock(&mtx);
    while (array.size() < targetSize) {
        pthread_cond_wait(&cv, &mtx);
    }
    // read data from array
    // remove data from array consumed
    pthread_mutex_unlock(&mtx);
    ...
于 2013-01-07T20:05:09.157 に答える
1

OK、ベースチャット、ここでの問題は、これを正しく同期する方法ではなく、まだ進行できないスレッドの誤ったウェイクアップを最小限に抑える方法だと思います。

(参考までに、これは元の質問から得た印象とはまったく異なります)。

したがって、どのリーダーがスケジュールされるかを明示的に制御する単純な実装を作成できます...

#include <queue>
#include <thread>

// associate a blocked reader's desired index with the CV it waits on
struct BlockedReadToken {
    int index_;
    std::condition_variable cv_;
    explicit BlockedReadToken(int index) : index_(index) {}
};
struct TokenOrder {
    bool operator() (BlockedReadToken const *a,
                     BlockedReadToken const *b)
    {
        return a->index_ < b->index_;
    }
};

class BlockedReaderManager
{
    std::priority_queue<BlockedReadToken*,
                        std::vector<BlockedReadToken*>, TokenOrder> queue_;
public:
    // wait for the actual index to reach the required value
    void waitfor(std::unique_lock<std::mutex> &lock,
                 int required, int const &actual)
    {
        // NOTE: a good pooled allocator might be useful here
        // (note we only allocate while holding the lock anyway,
        // so no further synchronization is required)
        std::unique_ptr<BlockedReadToken> brt(new BlockedReadToken(required));
        queue_.push(brt.get());
        while (actual < required)
            brt->cv_.wait(lock);
    }
    // release every reader blocked waiting for the new actual index
    // (don't wake any whose condition isn't satisfied yet)
    void release(std::unique_lock<std::mutex> &lock, int actual)
    {
        while (!(queue_.empty() || queue_.top()->index_ > actual)) {
            queue_.top()->cv_.notify_one();
            queue_.pop();
        }
    }
};

そして、リーダーに対してこのブロックメカニズムを使用するコンテナのラッパー:

template <typename RandomAccessContainer>
class ProgressiveContainer
{
    int size_;
    std::mutex mutex_;
    BlockedReaderManager blocked_;
    RandomAccessContainer container_;
public:
    typedef typename RandomAccessContainer::size_type size_type;
    typedef typename RandomAccessContainer::value_type value_type;

    void push_back(value_type const &val) {
        std::unique_lock<std::mutex> guard(mutex_);
        container_.push_back(val);
        ++size_;
        blocked_.release(guard, size_);
    }
    void check_readable(int index) {
        // could optimistically avoid locking with atomic size here?
        std::unique_lock<std::mutex> guard(mutex_);
        if (size_ < index)
            blocked_.waitfor(guard, index, size_);
    }
    // allow un-locked [] access and require reader to call check_readable?
    value_type& operator[](int index) {
        return container_[index];
    }
    value_type& at(int index) {
        check_readable(index);
        return container_[index];
    }
};
于 2013-01-07T21:05:36.613 に答える