私はマルチスレッド プログラミングにかなり慣れていないので、不正確な質問かもしれませんがご容赦ください。これが私の問題です:
データを処理し、同じタイプのオブジェクトを多数生成する関数があります。これは、いくつかのネストされたループで繰り返し実行されるため、すべての繰り返しを実行し、これらのオブジェクトを何らかのコンテナーに保存してから、次のステップを実行するインターフェイス コードでそのコンテナーで作業することが実用的です。ただし、これらのオブジェクトを何百万も作成する必要があり、メモリ使用量が大幅に増加します。これらの制約は、主に私が制御できない外部要因によるものです。
一定量のデータのみを生成するのが理想的ですが、ループから抜け出し、同じ時点で後で再開することも非現実的です。私の考えは、n回の反復後に一時停止し、n個のオブジェクトがすべて完全に処理されたら再開し、再開して、すべての反復が完了するまでn次の反復を実行するという別のスレッドで処理を行うことでした。スレッドが n 回の反復をすべて完了するまで待機することが重要です。そのため、両方のスレッドが実際には並行して実行されることはありません。
これが私の問題の始まりです:ここでミューテックスのロックを適切に行うにはどうすればよいですか? 私のアプローチでは、boost::lock_errors が生成されます。私がやりたいことを示すコードは次のとおりです。
boost::recursive_mutex bla;
boost::condition_variable_any v1;
boost::condition_variable_any v2;
boost::recursive_mutex::scoped_lock lock(bla);
int got_processed = 0;
const int n = 10;
void ProcessNIterations() {
got_processed = 0;
// have some mutex or whatever unlocked here so that the worker thread can
// start or resume.
// my idea: have some sort of mutex lock that unlocks here and a condition
// variable v1 that is notified while the thread is waiting for that.
lock.unlock();
v1.notify_one();
// while the thread is working to do the iterations this function should wait
// because there is no use to proceed until the n iterations are done
// my idea: have another condition v2 variable that we wait for here and lock
// afterwards so the thread is blocked/paused
while (got_processed < n) {
v2.wait(lock);
}
}
void WorkerThread() {
int counter = 0;
// wait for something to start
// my idea: acquire a mutex lock here that was locked elsewhere before and
// wait for ProcessNIterations() to unlock it so this can start
boost::recursive_mutex::scoped_lock internal_lock(bla);
for (;;) {
for (;;) {
// here do the iterations
counter++;
std::cout << "iteration #" << counter << std::endl;
got_processed++;
if (counter >= n) {
// we've done n iterations; pause here
// my idea: unlock the mutex, notify v2
internal_lock.unlock();
v2.notify_one();
while (got_processed > 0) {
// when ProcessNIterations() is called again, resume here
// my idea: wait for v1 reacquiring the mutex again
v1.wait(internal_lock);
}
counter = 0;
}
}
}
}
int main(int argc, char *argv[]) {
boost::thread mythread(WorkerThread);
ProcessNIterations();
ProcessNIterations();
while (true) {}
}
上記のコードはv2.wait(lock);
、次のメッセージを含む行で10 回の反復を実行した後に失敗します。
terminate called after throwing an instance of 'boost::exception_detail::clone_impl<boost::exception_detail::error_info_injector<boost::lock_error> >'
what(): boost::lock_error
これを正しく行うにはどうすればよいですか?これが正しい方法である場合、lock_errors を回避するにはどうすればよいですか?
編集:ここで説明したような同時キューを使用して解決しました。このキューには最大サイズもあり、その後push
は少なくとも 1 つの要素がpop
編集されるまで待機します。したがって、プロデューサー ワーカーは単にこのキューに入力し続けることができ、残りのコードpop
は適切なエントリを作成できます。キューの外でミューテックス ロックを行う必要はありません。キューは次のとおりです。
template<typename Data>
class concurrent_queue
{
private:
std::queue<Data> the_queue;
mutable boost::mutex the_mutex;
boost::condition_variable the_condition_variable;
boost::condition_variable the_condition_variable_popped;
int max_size_;
public:
concurrent_queue(int max_size=-1) : max_size_(max_size) {}
void push(const Data& data) {
boost::mutex::scoped_lock lock(the_mutex);
while (max_size_ > 0 && the_queue.size() >= max_size_) {
the_condition_variable_popped.wait(lock);
}
the_queue.push(data);
lock.unlock();
the_condition_variable.notify_one();
}
bool empty() const {
boost::mutex::scoped_lock lock(the_mutex);
return the_queue.empty();
}
bool wait_and_pop(Data& popped_value) {
boost::mutex::scoped_lock lock(the_mutex);
bool locked = true;
if (the_queue.empty()) {
locked = the_condition_variable.timed_wait(lock, boost::posix_time::seconds(1));
}
if (locked && !the_queue.empty()) {
popped_value=the_queue.front();
the_queue.pop();
the_condition_variable_popped.notify_one();
return true;
} else {
return false;
}
}
int size() {
boost::mutex::scoped_lock lock(the_mutex);
return the_queue.size();
}
};