1

私はマルチスレッド プログラミングにかなり慣れていないので、不正確な質問かもしれませんがご容赦ください。これが私の問題です:

データを処理し、同じタイプのオブジェクトを多数生成する関数があります。これは、いくつかのネストされたループで繰り返し実行されるため、すべての繰り返しを実行し、これらのオブジェクトを何らかのコンテナーに保存してから、次のステップを実行するインターフェイス コードでそのコンテナーで作業することが実用的です。ただし、これらのオブジェクトを何百万も作成する必要があり、メモリ使用量が大幅に増加します。これらの制約は、主に私が制御できない外部要因によるものです。

一定量のデータのみを生成するのが理想的ですが、ループから抜け出し、同じ時点で後で再開することも非現実的です。私の考えは、n回の反復後に一時停止し、n個のオブジェクトがすべて完全に処理されたら再開し、再開して、すべての反復が完了するまでn次の反復を実行するという別のスレッドで処理を行うことでした。スレッドが n 回の反復をすべて完了するまで待機することが重要です。そのため、両方のスレッドが実際には並行して実行されることはありません。

これが私の問題の始まりです:ここでミューテックスのロックを適切に行うにはどうすればよいですか? 私のアプローチでは、boost::lock_errors が生成されます。私がやりたいことを示すコードは次のとおりです。

boost::recursive_mutex bla;
boost::condition_variable_any v1;
boost::condition_variable_any v2;
boost::recursive_mutex::scoped_lock lock(bla);
int got_processed = 0;
const int n = 10;

void ProcessNIterations() {
  got_processed = 0;
  // have some mutex or whatever unlocked here so that the worker thread can 
  // start or resume. 
  // my idea: have some sort of mutex lock that unlocks here and a condition
  // variable v1 that is notified while the thread is waiting for that.
  lock.unlock();
  v1.notify_one();

  // while the thread is working to do the iterations this function should wait
  // because there is no use to proceed until the n iterations are done
  // my idea: have another condition v2 variable that we wait for here and lock
  // afterwards so the thread is blocked/paused
  while (got_processed < n) {
    v2.wait(lock);
  }
}

void WorkerThread() {
  int counter = 0;
  // wait for something to start
  // my idea: acquire a mutex lock here that was locked elsewhere before and 
  // wait for ProcessNIterations() to unlock it so this can start
  boost::recursive_mutex::scoped_lock internal_lock(bla);

  for (;;) {
    for (;;) {
      // here do the iterations
      counter++;
      std::cout << "iteration #" << counter << std::endl;
      got_processed++;

      if (counter >= n) {
        // we've done n iterations; pause here
        // my idea: unlock the mutex, notify v2
        internal_lock.unlock();
        v2.notify_one();

        while (got_processed > 0) {
          // when ProcessNIterations() is called again, resume here
          // my idea: wait for v1 reacquiring the mutex again
          v1.wait(internal_lock);
        }
        counter = 0;
      }
    }
  }
}

int main(int argc, char *argv[]) {
  boost::thread mythread(WorkerThread);

  ProcessNIterations();
  ProcessNIterations();

  while (true) {}
}

上記のコードはv2.wait(lock);、次のメッセージを含む行で10 回の反復を実行した後に失敗します。

terminate called after throwing an instance of 'boost::exception_detail::clone_impl<boost::exception_detail::error_info_injector<boost::lock_error> >'
  what():  boost::lock_error

これを正しく行うにはどうすればよいですか?これが正しい方法である場合、lock_errors を回避するにはどうすればよいですか?

編集:ここで説明したような同時キューを使用して解決しました。このキューには最大サイズもあり、その後pushは少なくとも 1 つの要素がpop編集されるまで待機します。したがって、プロデューサー ワーカーは単にこのキューに入力し続けることができ、残りのコードpopは適切なエントリを作成できます。キューの外でミューテックス ロックを行う必要はありません。キューは次のとおりです。

template<typename Data>
class concurrent_queue
{
private:
  std::queue<Data> the_queue;
  mutable boost::mutex the_mutex;
  boost::condition_variable the_condition_variable;
  boost::condition_variable the_condition_variable_popped;
  int max_size_;
public:
  concurrent_queue(int max_size=-1) : max_size_(max_size) {}

  void push(const Data& data) {
    boost::mutex::scoped_lock lock(the_mutex);

    while (max_size_ > 0 && the_queue.size() >= max_size_) {
      the_condition_variable_popped.wait(lock);
    }

    the_queue.push(data);
    lock.unlock();
    the_condition_variable.notify_one();
  }

  bool empty() const {
    boost::mutex::scoped_lock lock(the_mutex);
    return the_queue.empty();
  }

  bool wait_and_pop(Data& popped_value) {
    boost::mutex::scoped_lock lock(the_mutex);
    bool locked = true;
    if (the_queue.empty()) {
      locked = the_condition_variable.timed_wait(lock, boost::posix_time::seconds(1));
    }

    if (locked && !the_queue.empty()) {
      popped_value=the_queue.front();
      the_queue.pop();
      the_condition_variable_popped.notify_one();
      return true;
    } else {
      return false;
    }
  }

  int size() {
    boost::mutex::scoped_lock lock(the_mutex);
    return the_queue.size();
  }
};
4

2 に答える 2

1

これは、条件変数を使用して実装できます。N 回の反復を実行したら、条件変数で wait() を呼び出し、オブジェクトが別のスレッドで処理されたら、条件変数で signal() を呼び出して、条件変数でブロックされている他のスレッドのブロックを解除します。

于 2012-06-14T20:02:10.827 に答える
0

おそらく、条件変数と組み合わせて、ある種の有限容量のキュー リストまたはスタックが必要になるでしょう。キューがいっぱいになると、プロデューサー スレッドは条件変数を待機し、コンシューマー スレッドがキューから要素を削除するたびに、条件変数にシグナルを送ります。これにより、プロデューサーがウェイクアップしてキューを再び満たすことができます。一度に N 個の要素を処理したい場合は、ワーカーがアイテムをキューから取り出すたびにではなく、N 個の要素のキューに容量がある場合にのみワーカーに信号を送らせます。

于 2012-06-14T20:06:24.843 に答える