1 つのプロデューサー スレッドと複数のコンシューマー スレッドがある、プロデューサーとコンシューマーのシナリオを実装したいと考えています。具体的に言えば、プロデューサ スレッドは一定の間隔 (たとえば 5 秒) で一連のオブジェクトを作成する必要があり、コンシューマ スレッドはオブジェクトを消費する必要があります。
オブジェクトのセットを定期的に作成する方法と、複数のコンシューマーを同期する方法がわかりません。
前もって感謝します
1 つのプロデューサー スレッドと複数のコンシューマー スレッドがある、プロデューサーとコンシューマーのシナリオを実装したいと考えています。具体的に言えば、プロデューサ スレッドは一定の間隔 (たとえば 5 秒) で一連のオブジェクトを作成する必要があり、コンシューマ スレッドはオブジェクトを消費する必要があります。
オブジェクトのセットを定期的に作成する方法と、複数のコンシューマーを同期する方法がわかりません。
前もって感謝します
あなたが抱えている問題とは、具体的にどのようなことでしょうか?従来、作業キューは条件ロックによって保護されていました。複数の「消費者」が同じロックで待機している場合、実際に取得に成功するのはそのうちの 1 つだけです。ただし、キュー全体を取得するのではなく、オブジェクトを 1 つだけ取得してキューから削除し、ジョブを処理する前にロックを解除する必要があります。
定期的にジョブを作成することに関しては、使用しているライブラリのタイマー クラスのジョブです。入力を待っている場合、select() および poll() 呼び出しにはタイムアウト値があります。何もしていない場合は、[u]sleep() を呼び出すだけです。
すべてのコードを詳しく説明することはしませんが、これにはミューテックスと条件変数 (およびリンクされたリスト) を使用できます。基本的なパターンは、プロデューサーが行うことです。
loop_forever
wait_until_interval_has_elapsed
lock_the_mutex
append_an_item_to_the_list
signal_the_condvar // can be outside the mutex
unlock_the_mutex
各消費者は次のことを行います。
loop_forever
lock_the_mutex
while(list_is_empty)
wait_the_condvar
remove_an_item_from_the_list
unlock_the_mutex
process_the_item
ミューテックスとセマフォを使用して行うこともできます。
loop_forever
wait_until_interval_has_elapsed
lock_the_mutex
append_an_item_to_the_list
unlock_the_mutex
post_the_semaphore
loop_forever
wait_the_semaphore
lock_the_mutex
remove_an_item_from_the_list
unlock_the_mutex
process_the_item
これはそのままでは少し単純ですが、消費者に作業を終了して終了するように伝えるメカニズムを追加することに決めたら、それを「ブロードキャスト」できるため、おそらく条件変数を使用したほうがよいでしょう。セマフォは誤用しやすいという評判がややあります。そのため、C++11 にはセマフォがありません。ただし、Posix はそうなので、Linux では選択の余地があります。公平を期すために、他のほぼすべてのマルチスレッド OS も同様です。
スレッドの優先度が異なる場合、mutex/condvar ペアはセマフォよりも優れた動作を提供する場合もあります。ミューテックスとは異なり、セマフォには「所有者」がいないため、優先度の継承などの手法は問題外です。
タイマーまたはスリープ機能のいずれかを使用する場合wait_until_interval_has_elapsed
-- リアルタイム システムを使用していない限り、特定の時間に実行できるかどうかは確実ではなく、特定の時間または特定の時間後に目が覚めるだけであることに注意してください。 .
利用可能な新しいオブジェクトがあることを消費者スレッドに伝える方法が必要になります。以下に例を示します ( #include
s は含めませんでした)。
#define NUM_CONSUMERS 2
static int objects[4]; //The place for produced objects
static pthread_mutex_t cond_mut = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
void* producer_func(void* arg);
void* consumer_func(void* arg);
int main(int argc, char** argv) {
pthread_t producer;
pthread_t consumers[NUM_CONSUMERS];
int res;
void* zero = 0;
res = pthread_create(&producer, NULL, producer_func, zero);
for(int i = 0; i < NUM_CONSUMERS; i++) {
res = pthread_create(&consumers[i], NULL, consumer_func, zero);
}
}
void* producer(void* arg) {
while(1) {
objects[0] = 0;
objects[1] = 1;
objects[2] = 2;
objects[3] = 3;
pthread_mutex_lock(&cond_mut);
pthread_cond_signal(&cond);
pthread_mutex_unlock(&cond_mut);
sleep(5);
}
}
void* consumer(void* arg) {
while(1) {
pthread_mutex_lock(&cond_mut);
pthread_cond_wait(&cond);
pthread_mutex_unlock(&cond_mut);
//Process objects here
}
}