1

別のスレッド (生産者) が何かを作るときに、スレッド (消費者) が興味を示すことができるようにしたいと考えています。しかし、いつもではありません。

基本的にはワンショットコンシューマを作りたいと思っています。理想的には、プロデューサーは、1 人 (または多数) のコンシューマーが何かを欲しがっているというシグナルを出すまで、そのビジネスについて陽気に進みます。コンシューマーは、変数が満たされるまで待機します。

また、ワンショット コンシューマーが待機時間が長すぎると判断し、待機を放棄できるようにする必要もあります (a la pthread_cond_timedwait) 。

スレッドを同期するさまざまな方法について、多くの記事と SO の質問を読んできました。現在、私は条件変数アプローチに傾倒しています。

これが良い方法なのか (スレッド プログラミングの初心者なので、おそらくかなりの数のバグが含まれている可能性があります)、またはこの状況でセマフォを (乱用) 使用する方がよいかどうかを知りたいですか? それともまったく別のものですか?可能であれば、ポインター変数へのアトミック代入だけですか? おそらく私は安全な側に留まろうとしているからでしょう.このアプリケーションはロックアップすることなく何ヶ月も実行されるはずです. プロデューサーのミューテックスなしでできますか? つまり、条件変数を通知するだけですか?

私の現在のコードは次のようになります。

consumer {
   pthread_mutex_lock(m);

   pred = true; /* signal interest */

   while (pred) {
       /* wait a bit and hopefully get an answer before timing out */
       pthread_cond_timedwait(c, m, t);

       /* it is possible that the producer never produces anything, in which
          case the pred will stay true, we must "designal" interest here,
          unfortunately the also means that a spurious wake could make us miss
          a good answer, no? How to combat this? */
       pred = false;
   }

   /* if we got here that means either an answer is available or we timed out */
   //... (do things with answer if not timed out, otherwise assign default answer)

   pthread_mutex_unlock(m);
}

/* this thread is always producing, but it doesn't always have listeners */
producer {
   pthread_mutex_lock(m);

   /* if we have a listener */
   if (pred) {
      buffer = "work!";

      pred = false;

      pthread_cond_signal(c);
   }

   pthread_mutex_unlock(m);
}

注: 私は最新の Linux を使用しており、必要に応じてプラットフォーム固有の機能を利用できます。 注 2: 一見グローバル変数 m、c、および t を使用しました。しかし、これらは消費者ごとに異なります。

概要の要約

スレッドがイベントに登録し、指定された時間待機してから続行できるようにしたい。理想的には、複数のスレッドが同時に登録でき、すべてのスレッドが同じイベント (タイムスパンに発生したすべてのイベント) を取得できる必要があります。

4

1 に答える 1

1

あなたが望むのはstd::future、C ++(doc)の a に似たものです。コンシューマーは、特定の関数を使用してプロデューサーが実行するタスクを要求します。その関数は、ミューテックス、タスクに関連付けられた条件変数、および結果の void ポインターを保持するfuture (またはpromise ) と呼ばれる構造体を作成し、それを呼び出し元に返します。また、その構造体、タスク ID、およびパラメーター (存在する場合) を、プロデューサーによって処理されるワーク キューに入れます。

struct future_s {
    pthread_mutex_t m;
    pthread_cond_t c;
    int flag;
    void *result;
};

// basic task outline
struct task_s {
    struct future_s result;
    int taskid;
};

// specific "mytask" task
struct mytask_s {
    struct future_s result;
    int taskid;
    int p1;
    float p2;
};

future_s *do_mytask(int p1, float p2){
     // allocate task data
     struct  mytask_s * t = alloc_task(sizeof(struct mytask_s));
     t->p1 = p1;
     t->p2 = p2;
     t->taskid = MYTASK_ID;
     task_queue_add(t);
    return (struct future_s *)t;
}

次に、プロデューサーはタスクをキューから引き出して処理し、終了したら、結果を未来に入れ、変数をトリガーします。

消費者は未来を待つか、何か他のことをするかもしれません。

キャンセル可能な先物については、構造体にフラグを含めて、タスクがキャンセルされたことを示します。将来は次のいずれかになります。

  • 配信された場合、消費者は所有者であり、割り当てを解除する必要があります
  • キャンセルされた場合、プロデューサーは所有者のままで、それを処分します。

したがって、プロデューサーは、条件変数をトリガーする前に、未来がキャンセルされていないことを確認する必要があります。

「共有」の未来の場合、フラグは購読者数に変わります。数値がゼロより大きい場合、注文は配達されなければなりません。結果を所有する消費者は、すべての消費者の間で決定されます (先着順ですか?結果はすべての消費者に渡されますか?)。

future 構造体へのアクセスはすべてミューテックスにする必要があります (これは条件変数でうまく機能します)。

キューに関しては、リンクされたリストまたは配列を使用して実装できます (容量が制限されているバージョンの場合)。フューチャを作成する関数は同時に呼び出される可能性があるため、通常はミューテックスで実装されるロックで保護する必要があります。

于 2013-04-19T19:35:39.403 に答える