編集:以下
バッファ内のデバイスからのデータのストリーミングを担当するスレッドが 1 つあります。さらに、そのデータに対して何らかの処理を行う N 個のスレッドがあります。私のセットアップでは、ストリーマー スレッドがデバイスからデータをフェッチし、新しいデータをフェッチするかタイムアウトに達する前に、N 個のスレッドが処理を完了するまで待機したいと考えています。N スレッドは、処理を続行する前に、新しいデータがフェッチされるまで待機する必要があります。このフレームワークは、バッファで N スレッドの処理を繰り返したくない場合、およびすべてのバッファをスキップせずに処理したい場合に機能するはずです。
注意深く読んだ後、条件変数が必要であることがわかりました。私はチュートリアルやその他のスタック オーバーフローの質問に従いました。これが私が持っているものです。
グローバル変数:
boost::condition_variable cond;
boost::mutex mut;
メンバー変数:
std::vector<double> buffer
std::vector<bool> data_ready // Size equal to number of threads
データ受信ループ (1 つのスレッドがこれを実行):
while (!gotExitSignal())
{
{
boost::unique_lock<boost::mutex> ll(mut);
while(any(data_ready))
cond.wait(ll);
}
receive_data(buffer);
{
boost::lock_guard<boost::mutex> ll(mut);
set_true(data_ready);
}
cond.notify_all();
}
データ処理ループ (N 個のスレッドがこれを実行)
while (!gotExitSignal())
{
{
boost::unique_lock<boost::mutex> ll(mut);
while(!data_ready[thread_id])
cond.wait(ll);
}
process_data(buffer);
{
boost::lock_guard<boost::mutex> ll(mut);
data_ready[thread_id] = false;
}
cond.notify_all();
}
これら 2 つのループは、同じクラスの独自のメンバー関数にあります。変数 buffer はメンバー変数であるため、スレッド間で共有できます。
受信スレッドが最初に起動されます。data_ready 変数は、サイズ N の bool のベクトルです。data_ready[i] は、データを処理する準備ができている場合は true であり、スレッドがデータを既に処理している場合は false です。関数 any(data_ready) は、data_ready の要素のいずれかが true の場合に true を出力し、それ以外の場合は false を出力します。set_true(data_ready) 関数は、data_ready のすべての要素を true に設定します。受信側スレッドは、処理中のスレッドがまだ処理中かどうかを確認します。そうでない場合は、データをフェッチし、data_ready フラグを設定し、スレッドに通知し、処理が完了するまで最初に停止するループを続行します。処理スレッドは、それぞれの data_ready フラグが true であることを確認します。true になると、処理スレッドはいくつかの計算を行い、それぞれの data_ready フラグを 0 に設定して、ループを続行します。
処理スレッドが 1 つしかない場合、プログラムは正常に実行されます。スレッドを追加すると、処理の出力がガベージになるという問題が発生します。さらに、何らかの理由で処理スレッドの順序が重要になります。言い換えれば、私が起動した最後のスレッドは正しいデータを出力しますが、前のスレッドは、処理のための入力パラメーターが何であれ (有効なパラメーターを想定して) ガベージを出力します。問題の原因がスレッド コードにあるのか、デバイスまたはデータ処理の設定に問題があるのかはわかりません。処理ステップと受信ステップで couts を使用してみます。N 個の処理スレッドを使用すると、次のように出力が表示されます。
receive data
process 1
process 2
...
process N
receive data
process 1
process 2
...
条件変数の使い方は正しいですか? 何が問題なのですか?
編集:フォークの提案に従い、コードを次のように変更しました。
データ受信ループ (1 つのスレッドがこれを実行):
while (!gotExitSignal())
{
if(!any(data_ready))
{
receive_data(buffer);
boost::lock_guard<boost::mutex> ll(mut);
set_true(data_ready);
cond.notify_all();
}
}
データ処理ループ (N 個のスレッドがこれを実行)
while (!gotExitSignal())
{
// boost::unique_lock<boost::mutex> ll(mut);
boost::mutex::scoped_lock ll(mut);
cond.wait(ll);
process_data(buffer);
data_ready[thread_id] = false;
}
それはいくぶんうまく機能します。正しいロックを使用していますか?