6

共通の属性(stl :: map)にアクセスする必要がある48個のスレッドを作成するマルチスレッドアプリケーションがあります。マップはスレッドの開始時にのみ書き込まれ、残りの時間はマップが読み取られます。これはpthread_rw_lockの完璧なユースケースのようであり、すべてがうまく機能しているように見えます。

私は完全に無関係なセグメンテーション違反に遭遇し、コアの分析を開始しました。gdbを使用してコマンドを実行したinfo threadsところ、結果に非常に驚いていました。いくつかのスレッドが実際にマップから期待どおりに読み取られていることを確認しましたが、奇妙な部分は、rw_lockを待機しているpthread_rwlock_rdlock()でいくつかのスレッドがブロックされたことです。

ロックを待機しているスレッドのスタックトレースは次のとおりです。

#0  0xffffe430 in __kernel_vsyscall ()
#1  0xf76fe159 in __lll_lock_wait () from /lib/libpthread.so.0
#2  0xf76fab5d in pthread_rwlock_rdlock () from /lib/libpthread.so.0
#3  0x0804a81a in DiameterServiceSingleton::getDiameterService(void*) ()

スレッドが非常に多いため、読み取り中のスレッドの数とブロックされたスレッドの数を判断するのは困難ですが、他のスレッドがすでに読み取りを行っていることを考えると、読み取りを待機しているスレッドがブロックされる理由がわかりません。

だからここに私の質問があります:他のスレッドがすでにrw_lockから読み取っているのに、なぜいくつかのスレッドがrw_lockの読み取りを待ってブロックされているのですか?同時に読み取ることができるスレッドの数には制限があるように見えます。

pthread_rwlock_attr_t関数を調べましたが、関連するものは何もありませんでした。

OSはLinux、SUSE11です。

関連するコードは次のとおりです。

{
  pthread_rwlock_init(&serviceMapRwLock_, NULL);
}

// This method is called for each request processed by the threads
Service *ServiceSingleton::getService(void *serviceId)
{
  pthread_rwlock_rdlock(&serviceMapRwLock_);
    ServiceMapType::const_iterator iter = serviceMap_.find(serviceId);
    bool notFound(iter == serviceMap_.end());
  pthread_rwlock_unlock(&serviceMapRwLock_);

  if(notFound)
  {
    return NULL;
  }

  return iter->second;
}

// This method is only called when the app is starting
void ServiceSingleton::addService(void *serviceId, Service *service)
{
  pthread_rwlock_wrlock(&serviceMapRwLock_);
    serviceMap_[serviceId] = service;
  pthread_rwlock_unlock(&serviceMapRwLock_);
}

アップデート:

MarkBのコメントで述べたように、ライターを優先するようにpthread_rwlockattr_getkind_np()を設定し、待機中のライターがブロックされている場合、観察された動作は理にかなっています。しかし、私が信じているデフォルト値を使用しているのは、読者を優先することです。書き込みを待機しているスレッドがブロックされていないことを確認しました。また、コメントで@Shahbazによって提案されたようにコードを更新し、同じ結果を取得します。

4

2 に答える 2

6

ロックの取得に関連する固有のパフォーマンスの問題を観察しただけです。少し時間がかかりますが、たまたまその途中でそれらのスレッドをキャッチしました。これは、ロックによって保護されている操作の期間が非常に短い場合に特に当てはまります。

編集:ソースを読み取り、独自のpthreadライブラリデータ構造内のクリティカルセクションを保護するためにglibc使用します。lll_lockチェックはいくつかのpthread_rwlock_rdlockフラグをチェックし、カウンターをインクリメントするので、ロックを保持しながらそれらのことを行います。それらが完了すると、ロックは。で解放されlll_unlockます。

実例を示すために、を取得した後にスリープする短いルーチンを実装しましたrwlock。メインスレッドはそれらが終了するのを待ちます。しかし、待つ前に、スレッドによって達成された同時実行性を出力します。

enum { CONC = 50 };

pthread_rwlock_t rwlock;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
unsigned count;

void *routine(void *arg)
{
    int *fds = static_cast<int *>(arg);
    pthread_rwlock_rdlock(&rwlock);
    pthread_mutex_lock(&mutex);
    ++count;
    if (count == CONC) pthread_cond_signal(&cond);
    pthread_mutex_unlock(&mutex);
    sleep(5);
    pthread_rwlock_unlock(&rwlock);
    pthread_t self = pthread_self();
    write(fds[1], &self, sizeof(self));
    return 0;
}

そして、メインスレッドはカウンターが50に達するのを待ちます。

int main()
{
    int fds[2];
    pipe(fds);
    pthread_rwlock_init(&rwlock, 0);
    pthread_mutex_lock(&mutex);
    for (int i = 0; i < CONC; i++) {
        pthread_t tid;
        pthread_create(&tid, NULL, routine, fds);
    }
    while (count < CONC) pthread_cond_wait(&cond, &mutex);
    pthread_mutex_unlock(&mutex);
    std::cout << "count: " << count << std::endl;
    for (int i = 0; i < CONC; i++) {
        pthread_t tid;
        read(fds[0], &tid, sizeof(tid));
        pthread_join(tid, 0);
    }
    pthread_rwlock_destroy(&rwlock);
    pthread_exit(0);
}

編集: C ++ 11スレッドサポートを使用して例を簡略化しました:

enum { CONC = 1000 };
std::vector<std::thread> threads;

pthread_rwlock_t rwlock;
std::mutex mutex;
std::condition_variable cond;
unsigned count;

void *routine(int self)
{
    pthread_rwlock_rdlock(&rwlock);
    { std::unique_lock<std::mutex> lk(mutex);
      if (++count == CONC) cond.notify_one(); }
    sleep(5);
    pthread_rwlock_unlock(&rwlock);
    return 0;
}

int main()
{
    pthread_rwlock_init(&rwlock, 0);
    { std::unique_lock<std::mutex> lk(mutex);
      for (int i = 0; i < CONC; i++) {
          threads.push_back(std::thread(routine, i));
      }
      cond.wait(lk, [](){return count == CONC;}); }
    std::cout << "count: " << count << std::endl;
    for (int i = 0; i < CONC; i++) {
        threads[i].join();
    }
    pthread_rwlock_destroy(&rwlock);
    pthread_exit(0);
}
于 2012-08-08T15:23:42.567 に答える
3

ちなみに、上記のコードは壊れています。rw_lockのロックを解除するとすぐにライターがマップ内の要素を削除して、その上のイテレーターを無効にすることができるため、rw_lock'dセクションからiter->secondにアクセスすることはできません。

プログラムの実行開始後は何も書かないので、あなたがこれを行っていないことは知っていますが、それでも言及する価値があります。

また、補足として、記述した動作はシリアル化されているように見えるため(ライターは最初にマップに書き込み、次にリーダーは「読み取り専用」マップを読み取ります)、おそらく次のように記述する必要があります。

int writerDoneWithMap = 0;
// pthread_cond & mutex init here

// The writer write to the map here 

// Then they signal the reader that they are done with it
while (!__sync_bool_compare_and_swap(&writerDoneWithMap, 1, writerDoneWithMap));
pthread_cond_broadcast here


// The readers will then simply do this:
while (!writerDoneWithMap) 
{
     // pthread_cond_wait here
}
// Read the data without locks.

上記のコードは、ライターがマップの入力を完了した場合にリーダーのロックを回避し、そうでない場合は、通常のpthread_cond/mutexテクニックを使用します。上記のコードは、ライターTHENリーダーがある場合にのみ正しいです(ただし、それはあなたが言ったことです)。そうでない場合は失敗します。

于 2012-08-10T21:11:09.797 に答える