36

First of all: I am completely a newbie in mutex/multithread programming, so sorry for any error in advance...

I have a program that runs multiple threads. The threads (usually one per cpu core) do a lot of calculation and "thinking" and then sometimes they decide to call a particular (shared) method that updates some statistics. The concurrency on statistics updates is managed through the use of a mutex:

stats_mutex.lock();
common_area->update_thread_stats( ... );
stats_mutex.unlock();

Now to the problem. Of all those threads there is one particular thread that need almost
realtime priority, because it's the only thread that actually operates.

With "almost realtime priority" I mean:

Let's suppose thread t0 is the "privileged one" and t1....t15 are the normal ones.What happens now is:

  • Thread t1 acquires lock.
  • Thread t2, t3, t0 call the lock() method and wait for it to succeed.
  • Thread t1 calls unlock()
  • One (at random, as far as i know) of the threads t2, t3, t0 succeeds in acquiring the lock, and the other ones continue to wait.

What I need is:

  • Thread t1 acquire lock.
  • Thread t2, t3, t0 call the lock() method and wait for it to succeed.
  • Thread t1 calls unlock()
  • Thread t0 acquires lock since it's privileged

So, what's the best (possibly simplest) method to do this thing?

What I was thinking is to have a bool variable called "privileged_needs_lock".

But I think I need another mutex to manage access to this variable... I dont know if this is the right way...

Additional info:

  • my threads use C++11 (as of gcc 4.6.3)
  • code needs to run on both Linux and Windows (but tested only on Linux at the moment).
  • performance on locking mechanism is not an issue (my performance problem are in internal thread calculations, and thread number will always be low, one or two per cpu core at maximum)

Any idea is appreciated. Thanks


The below solution works (three mutex way):

#include <thread>
#include <iostream>
#include <mutex>
#include "unistd.h"

std::mutex M;
std::mutex N;
std::mutex L;

void lowpriolock(){
  L.lock();
  N.lock();
  M.lock();
  N.unlock();
}

void lowpriounlock(){
  M.unlock();
  L.unlock();
}

void highpriolock(){
  N.lock();
  M.lock();
  N.unlock();
}

void highpriounlock(){
  M.unlock();
}

void hpt(const char* s){
  using namespace std;
  //cout << "hpt trying to get lock here" << endl;
  highpriolock();
  cout << s << endl;
  sleep(2);
  highpriounlock();
}

void lpt(const char* s){
  using namespace std;
  //cout << "lpt trying to get lock here" << endl;
  lowpriolock();
  cout << s << endl;
  sleep(2);
  lowpriounlock();
}

int main(){
std::thread t0(lpt,"low prio t0 working here");
std::thread t1(lpt,"low prio t1 working here");
std::thread t2(hpt,"high prio t2 working here");
std::thread t3(lpt,"low prio t3 working here");
std::thread t4(lpt,"low prio t4 working here");
std::thread t5(lpt,"low prio t5 working here");
std::thread t6(lpt,"low prio t6 working here");
std::thread t7(lpt,"low prio t7 working here");
//std::cout << "All threads created" << std::endl;
t0.join();
t1.join();
t2.join();
t3.join();
t4.join();
t5.join();
t6.join();
t7.join();
return 0;
}

Tried the below solution as suggested but it does not work (compile with " g++ -std=c++0x -o test test.cpp -lpthread"):

#include <thread>
#include <mutex>

#include "time.h"
#include "pthread.h"

std::mutex l;

void waiter(){
  l.lock();
  printf("Here i am, waiter starts\n");
  sleep(2);
  printf("Here i am, waiter ends\n");
  l.unlock();
}

void privileged(int id){
  usleep(200000);
  l.lock();
  usleep(200000);
  printf("Here i am, privileged (%d)\n",id);
  l.unlock();  
}

void normal(int id){
  usleep(200000);
  l.lock();
  usleep(200000);
  printf("Here i am, normal (%d)\n",id);
  l.unlock();    
}

int main(){
  std::thread tw(waiter);
  std::thread t1(normal,1);
  std::thread t0(privileged,0);
  std::thread t2(normal,2);

  sched_param sch;
  int policy; 

  pthread_getschedparam(t0.native_handle(), &policy, &sch);
  sch.sched_priority = -19;
  pthread_setschedparam(t0.native_handle(), SCHED_FIFO, &sch);

  pthread_getschedparam(t1.native_handle(), &policy, &sch);
  sch.sched_priority = 18;
  pthread_setschedparam(t1.native_handle(), SCHED_FIFO, &sch);

  pthread_getschedparam(t2.native_handle(), &policy, &sch);
  sch.sched_priority = 18;
  pthread_setschedparam(t2.native_handle(), SCHED_FIFO, &sch);
  
  tw.join();
  t1.join();
  t0.join();
  t2.join();

  return 0;  
}
4

8 に答える 8

52

スレッドプリミティブのみを使用する3つの方法を考えることができます。

トリプルミューテックス

ここでは、3つのミューテックスが機能します。

  • データミューテックス('M')
  • 次にアクセスするミューテックス(「N」)、および
  • 低優先度アクセスミューテックス('L')

アクセスパターンは次のとおりです。

  • 優先度の低いスレッド:Lをロック、Nをロック、Mをロック、Nをロック解除、{何かを行う}、Mをロック解除、Lをロック解除
  • 優先度の高いスレッド:Nをロックし、Mをロックし、Nをロック解除し、{何かを行う}、Mをロック解除します

これにより、データへのアクセスが保護され、優先度の高いスレッドが優先度の低いスレッドよりも先にデータにアクセスできるようになります。

ミューテックス、条件変数、アトミックフラグ

これを行うための基本的な方法は、条件変数とアトミックを使用することです。

  • Mutex M;
  • Condvar C;
  • アトミックブールhpt_waiting;

データアクセスパターン:

  • 優先度の低いスレッド:Mをロックし、(hpt_waiting)CをMで待機し、{do stuff}、Cをブロードキャストし、Mのロックを解除します
  • 優先度の高いスレッド:hpt_waiting:= true、ロックM、hpt_waiting:= false、{do stuff}、ブロードキャストC、ロック解除M

Mutex、条件変数、2つの非アトミックフラグ

または、condvarで2つの非アトミックboolを使用することもできます。この手法では、mutex / condvarがフラグを保護し、データはミューテックスではなくフラグによって保護されます。

  • Mutex M;

  • Condvar C;

  • bool data_held、hpt_waiting;

  • 優先度の低いスレッド:Mをロックし、(hpt_waitingまたはdata_held)がCをMで待機している間、data_held:= true、Mのロックを解除、{do stuff}、Mをロック、data_held:= false、Cをブロードキャスト、Mのロックを解除

  • 優先度の高いスレッド:ロックM、hpt_waiting:= true、(data_held)待機C on M、data_held:= true、ロック解除M、{do stuff}、ロックM、data_held:= false、hpt_waiting:= false、ブロードキャストC 、Mのロックを解除

于 2012-07-26T16:15:54.163 に答える
8

Put requesting threads on a 'priority queue'. The privileged thread can get first go at the data when it's free.

One way to do this would be withan array of ConcurrentQueues[privilegeLevel], a lock and some events.

Any thread that wants at the data enters the lock. If the data is free, (boolean), it gets the data object and exits the lock. If the data is in use by another thread, the requesting thread pushes an event onto one of the concurrent queues, depending on its privilege level, exits the lock and waits on the event.

When a thread wants to release its ownership of the data object, it gets the lock and iterates the array of ConcurrentQueues from the highest-privilege end down, looking for an event, (ie queue count>0). If it finds one, it signals it and exits the lock, if not, it sets the 'dataFree' boolean and and exits the lock.

When a thread waiting on an event for access to the data is made ready, it may access the data object.

I thnk that should work. Please, other developers, check this design and see if you can think of any races etc? I'm still suffering somewhat from 'hospitality overload' after a trip to CZ..

Edit - probably don't even need concurrent queues because of the explicit lock across them all. Any old queue would do.

于 2012-07-26T11:04:30.677 に答える
3
#include <thread>
#include <mutex>
#include <condition_variable>
#include <cassert>

class priority_mutex {
  std::condition_variable cv_;
  std::mutex gate_;
  bool locked_;
  std::thread::id pr_tid_; // priority thread
public:
  priority_mutex() : locked_(false) {}
  ~priority_mutex() { assert(!locked_); }
  priority_mutex(priority_mutex&) = delete;
  priority_mutex operator=(priority_mutex&) = delete;

  void lock(bool privileged = false) {
    const std::thread::id tid = std::this_thread::get_id();
    std::unique_lock<decltype(gate_)> lk(gate_);
    if (privileged)
      pr_tid_ = tid;
    cv_.wait(lk, [&]{
      return !locked_ && (pr_tid_ == std::thread::id() || pr_tid_ == tid);
    });
    locked_ = true;
  }

  void unlock() {
    std::lock_guard<decltype(gate_)> lk(gate_);
    if (pr_tid_ == std::this_thread::get_id())
      pr_tid_ = std::thread::id();
    locked_ = false;
    cv_.notify_all();
  }
};

注意:これpriority_mutexは不公平なスレッドスケジューリングを提供します。特権スレッドが頻繁にロックを取得する場合、他の非特権スレッドはほとんどスケジュールされていない可能性があります。

使用例:

#include <mutex>
priority_mutex mtx;

void privileged_thread()
{
  //...
  {
    mtx.lock(true);  // acquire 'priority lock'
    std::unique_lock<decltype(mtx)> lk(mtx, std::adopt_lock);
    // update shared state, etc.
  }
  //...
}

void normal_thread()
{
  //...
  {
    std::unique_lock<decltype(mtx)> lk(mtx);  // acquire 'normal lock'
    // do something
  }
  //...
}
于 2012-08-03T02:00:06.523 に答える
2

On linux you can check this man: pthread_setschedparam and also man sched_setscheduler

pthread_setschedparam(pthread_t thread, int policy, const struct sched_param *param);

Check this also for c++2011: http://msdn.microsoft.com/en-us/library/system.threading.thread.priority.aspx#Y78

于 2012-07-26T09:48:04.153 に答える
2

pthreads has thread priorities:

pthread_setschedprio( (pthread_t*)(&mThreadId), wpri );

If multiple threads are sleeping waiting in a lock, the scheduler will wake the highest priority thread first.

于 2012-07-26T14:35:39.063 に答える
1

Try something like the following. You could make the class a thread-safe singleton and you could even make it a functor.

#include <pthread.h>
#include <semaphore.h>
#include <map>

class ThreadPrioFun
{
    typedef std::multimap<int, sem_t*> priomap_t;
public:
    ThreadPrioFun()
    {
        pthread_mutex_init(&mtx, NULL);
    }
    ~ThreadPrioFun()
    {
        pthread_mutex_destroy(&mtx);
    }
    void fun(int prio, sem_t* pSem)
    {
        pthread_mutex_lock(&mtx);
        bool bWait = !(pm.empty());
        priomap_t::iterator it = pm.insert(std::pair<int, sem_t*>(prio, pSem) );
        pthread_mutex_unlock(&mtx);

        if( bWait ) sem_wait(pSem);

        // do the actual job
        // ....
        //

        pthread_mutex_lock(&mtx);
        // done, remove yourself
        pm.erase(it);
        if( ! pm.empty() )
        {
             // let next guy run:
            sem_post((pm.begin()->second));
        }
        pthread_mutex_unlock(&mtx);
    }
private:
    pthread_mutex_t mtx;
    priomap_t pm;
};
于 2012-07-26T14:21:23.717 に答える
0

Since thread priorities isn't working for you:

Create 2 mutexes, a regular lock and a priority lock.

Regular threads must first lock the normal lock, and then the priority lock. The priority thread only has to lock the priority lock:

Mutex mLock;
Mutex mPriLock;


doNormal()
{
   mLock.lock();
   pthread_yield();
   doPriority();
   mLock.unlock();
}

doPriority()
{
   mPriLock.lock();
   doStuff();
   mPriLock.unlock();
}
于 2012-07-26T15:11:33.350 に答える
0

Modified slightly ecatmur answer, adding a 4th mutex to handle multiple high priority threads contemporaneously (note that this was not required in my original question):

#include <thread>
#include <iostream>
#include "unistd.h"

std::mutex M; //data access mutex
std::mutex N; // 'next to access' mutex
std::mutex L; //low priority access mutex
std::mutex H; //hptwaiting int access mutex

int hptwaiting=0;

void lowpriolock(){
  L.lock();
  while(hptwaiting>0){
    N.lock();
    N.unlock();
  }
  N.lock();
  M.lock();
  N.unlock();
}

void lowpriounlock(){
  M.unlock();
  L.unlock();
}

void highpriolock(){
  H.lock();
  hptwaiting++;
  H.unlock();
  N.lock();
  M.lock();
  N.unlock();
}

void highpriounlock(){
  M.unlock();
  H.lock();
  hptwaiting--;
  H.unlock();
}

void hpt(const char* s){
  using namespace std;
  //cout << "hpt trying to get lock here" << endl;
  highpriolock();
  cout << s << endl;
  usleep(30000);
  highpriounlock();
}

void lpt(const char* s){
  using namespace std;
  //cout << "lpt trying to get lock here" << endl;
  lowpriolock();
  cout << s << endl;
  usleep(30000);
  lowpriounlock();
}

int main(){
std::thread t0(lpt,"low  prio t0  working here");
std::thread t1(lpt,"low  prio t1  working here");
std::thread t2(hpt,"high prio t2  working here");
std::thread t3(lpt,"low  prio t3  working here");
std::thread t4(lpt,"low  prio t4  working here");
std::thread t5(lpt,"low  prio t5  working here");
std::thread t6(hpt,"high prio t6  working here");
std::thread t7(lpt,"low  prio t7  working here");
std::thread t8(hpt,"high prio t8  working here");
std::thread t9(lpt,"low  prio t9  working here");
std::thread t10(lpt,"low  prio t10 working here");
std::thread t11(lpt,"low  prio t11 working here");
std::thread t12(hpt,"high prio t12 working here");
std::thread t13(lpt,"low  prio t13 working here");
//std::cout << "All threads created" << std::endl;
t0.join();
t1.join();
t2.join();
t3.join();
t4.join();
t5.join();
t6.join();
t7.join();
t8.join();
t9.join();
t10.join();
t11.join();
t12.join();
t13.join();
return 0;
}

What do you think? Is it ok? It's true that a semaphore could handle better this kind of thing, but mutexes are much more easy to manage to me.

于 2012-07-27T00:59:21.680 に答える