2

ブーストアップグレードミューテックスを使用してブーストリストコンテナに読み取り/書き込みミューテックスロックを実装する機能を検証するために、いくつかのサンプルテストコードを作成しました。私には10のスレッドがあり、5つはリーダー、5つはライターです。
スマートポインタを使用してメモリ管理を簡素化し、同じオブジェクトを複数のリストに含めることができるようにしました。リーダーが定期的にリストを反復処理している間、ライターは常にオブジェクトを削除してそれぞれのリストに再挿入しています。すべて期待どおりに機能しているように見えますが、リスト消去メンバー関数を呼び出すと、既に持っているときに削除するエントリを見つける必要があります。
消去方法は、エントリを再度検索しなくても消去するエントリを知るのに十分スマートですか、それともリスト要素がわかっているときに検索を削除するように最適化されていますか?検索を行う場合、それを拡張して、リスト要素の検索中ではなく、リストからの実際の削除の周囲にのみ一意のロックを適用できるようにする簡単な方法はありますか?これが私がboost1.51ライブラリとリンクし、vs2008でテストしたコードです。

  //******************************************************************************
  // INCLUDE FILES
  //******************************************************************************
  #include <boost/thread.hpp>
  #include <boost/date_time.hpp>
  #include <boost/thread/locks.hpp>  
  #include <boost/thread/shared_mutex.hpp>                  
  #include <boost/container/list.hpp>      
  #include <boost/shared_ptr.hpp>
  #include <boost/make_shared.hpp>
  #include <iostream>

  using namespace std;

  //******************************************************************************
  // LOCAL DEFINES
  //******************************************************************************
  #define NUM_THREADS  10
  #define NUM_WIDTH    5

  #ifdef UNIQUE_MUTEX
  #define MAIN_LIST_MUTEX           g_listMutex
  #define INT_LIST_MUTEX            g_intListMutex
  #define FLOAT_LIST_MUTEX          g_floatListMutex
  #else
  #define MAIN_LIST_MUTEX           g_listMutex
  #define INT_LIST_MUTEX            g_listMutex
  #define FLOAT_LIST_MUTEX          g_listMutex
  #endif

  //******************************************************************************
  // LOCAL TYPEDEFS
  //******************************************************************************
  typedef boost::upgrade_mutex                   myMutex;
  typedef boost::shared_lock<myMutex>            SharedLock;
  typedef boost::upgrade_to_unique_lock<myMutex> UniqueLock;
  typedef boost::upgrade_lock<myMutex>           UpgradeLock;
  class myDataIntf;
  typedef boost::shared_ptr<myDataIntf>          myDataIntfPtr;
  typedef boost::container::list<myDataIntfPtr>  myList;

  //******************************************************************************
  // LOCAL CLASS DECLARATIONS
  //******************************************************************************
  class myDataIntf
  {
  public:
     virtual char* getDataType(void) = 0;
  };

  class intData : public myDataIntf
  {
  private:
     int  data;

  public:
     intData(int new_data) : data(new_data){};
     ~intData(void)
     {
        extern int instIntDeletes;

        instIntDeletes++;
     };
     char* getDataType(void)
     {
        return "Int";
     }
     int getData(void)
     {
        return data;
     }
     void setData(int new_data)
     {
        data = new_data;
     }
  };

  class floatData : public myDataIntf
  {
  private:
     float  data;

  public:
     floatData(float new_data) : data(new_data){};
     ~floatData(void)
     {
        extern int instFloatDeletes;

        instFloatDeletes++;
     };
     char* getDataType(void)
     {
        return "Float";
     }
     float getData(void)
     {
        return data;
     }
     void setData(float new_data)
     {
        data = new_data;
     }
  };

  //******************************************************************************
  // LOCALLY DEFINED GLOBAL DATA
  //******************************************************************************

  // Define one mutex per linked list
  myMutex  g_listMutex;
  myMutex  g_intListMutex;
  myMutex  g_floatListMutex;

  int instReadFloatCount[NUM_THREADS];
  int instWriteFloatCount[NUM_THREADS];
  int instReadIntCount[NUM_THREADS];
  int instWriteIntCount[NUM_THREADS];
  int instFloatDeletes = 0;
  int instIntDeletes = 0;

  //******************************************************************************
  // Worker Thread function
  //******************************************************************************
  void workerFunc(int inst, myList* assigned_list, myMutex*  mutex)
  {
     boost::posix_time::millisec workTime(1*inst);
     myList::iterator            i;
     int                         add_delay = 0;
     int                         add_f_count = 0;
     int                         add_i_count = 0;

     instReadFloatCount[inst] = 0;
     instReadIntCount[inst] = 0;
     instWriteIntCount[inst] = 0;
     instWriteFloatCount[inst] = 0;

     mutex->lock();
     cout << "Worker " << inst << ": ";
     for (i =  assigned_list->begin(); i != assigned_list->end(); ++i)
     {
        cout << (*i)->getDataType();
        if ( 0 == strcmp("Float", (*i)->getDataType() ) )
        {
           floatData*  f = (floatData*)i->get();
           cout << " " << f->getData() << " ";
        }
        if ( 0 == strcmp("Int", (*i)->getDataType() ) )
        {
           intData*  f = (intData*)i->get();
           cout << " " << f->getData() << " ";
        }
     }
     cout << endl;
     mutex->unlock();

     // Do some work for 10 seconds.
     for ( int tick = 0; tick < 10000/(1*inst+1); tick++)
     {
        add_delay++;
        boost::this_thread::sleep(workTime);
        if ( inst < (NUM_THREADS/2) )
        {
           // reader - Get a shared lock that allows multiple readers to
           // access the linked list. Upgrade locks act as shared locks
           // until converted to unique locks, at which point the 
           // thread converting to the unique lock will block until
           // all existing readers are done.  New readers will wait
           // after the unique lock is released.
           SharedLock shared_lock(*mutex);

           for (i =  assigned_list->begin(); i != assigned_list->end(); ++i)
           {
              if ( 0 == strcmp("Float", (*i)->getDataType() ) )
              {
                 floatData*  f = (floatData*)i->get();
                 instReadFloatCount[inst]++;
              }
              if ( 0 == strcmp("Int", (*i)->getDataType() ) )
              {
                 intData*  f = (intData*)i->get();
                 instReadIntCount[inst]++;
              }
           }
        }
        else
        {
           // writer - get the upgrade lock that will allow us
           // to make multiple modifications to the linked list
           // without being interrupted by other writers (other writers attempting
           // to get an upgrade lock will block until the writer that
           // has it releases it.)
           UpgradeLock  upgrade_lock(*mutex);

           for (i =  assigned_list->begin(); i != assigned_list->end(); )
           {
              if ( 0 == strcmp("Float", (*i)->getDataType() ) )
              {
                 floatData*   f = (floatData*)i->get();
                 UniqueLock   unique_lock(upgrade_lock); // Convert an existing upgrade lock to unique lock

                 f->setData(f->getData() + 0.123f);
                 assigned_list->push_front(*i); 
                 assigned_list->erase(i++);
                 instWriteFloatCount[inst]++;

                 // While the unique lock is in scope let's do some additional
                 // adds & deletes
                 if ( (add_delay > 100) && (add_f_count < 2) )
                 {
                    if ( add_f_count < 1)
                    {
                       // Delete the first record
                    }
                    else if ( add_f_count < 2)
                    {
                       // Add new item using separate allocations for smart pointer & data
                       assigned_list->insert(assigned_list->end(), new floatData(-(float)(inst*10000+add_f_count)));
                    }
                    else
                    {
                       // Add new item using make_shared function template.  Both objects are created using one allocation.
                       assigned_list->insert(assigned_list->end(), boost::make_shared<floatData>(-(float)(inst*10000+add_f_count)));
                    }
                    add_f_count++;
                 }
              }
              else if ( 0 == strcmp("Int", (*i)->getDataType() ) )
              {
                 intData*     f = (intData*)i->get();
                 UniqueLock   unique_lock(upgrade_lock); // Convert an existing upgrade lock to unique lock

                 f->setData(f->getData() + 123);
                 assigned_list->push_front(*i);
                 assigned_list->erase(i++);
                 instWriteIntCount[inst]++;

                 // While the unique lock is in scope let's do some additional
                 // adds & deletes
                 if ( (add_delay > 100) && (add_i_count < 3) )
                 {
                    if ( add_i_count < 1)
                    {
                       // Delete the first record
                    }
                    else if ( add_i_count < 2)
                    {
                       // Add new item using separate allocations for smart pointer & data
                       assigned_list->insert(assigned_list->end(), new intData(-(int)(inst*10000+add_i_count)));
                    }
                    else
                    {
                       // Add new item using make_shared function template.  Both objects are created using one allocation.
                       assigned_list->insert(assigned_list->end(), boost::make_shared<intData>(-(int)(inst*10000+add_i_count)));
                    }
                    add_i_count++;
                 }
              }
              else
              {
                 ++i;
              }
           }
        }
     }

     cout << "Worker: finished" << " " << inst << endl;
  }

  //******************************************************************************
  // Main Function
  //******************************************************************************
  int main(int argc, char* argv[])
  {
     {
        myList              test_list;
        myList              test_list_ints;
        myList              test_list_floats;
        myList::iterator    i;

        // Fill the main list with some values
        test_list.insert(test_list.end(), new intData(1));
        test_list.insert(test_list.end(), new intData(2));
        test_list.insert(test_list.end(), new intData(3));
        test_list.insert(test_list.end(), new floatData(333.333f));
        test_list.insert(test_list.end(), new floatData(555.555f));
        test_list.insert(test_list.end(), new floatData(777.777f));

        // Display the main list elements and add the specific values
        // for each specialized list containing specific types of elements.
        // The end result is that each object in the main list will also
        // be in the specialized list.
        cout << "test:";
        for (i =  test_list.begin(); i != test_list.end(); ++i)
        {
           cout << " " << (*i)->getDataType();
           if ( 0 == strcmp("Float", (*i)->getDataType() ) )
           {
              floatData*  f = (floatData*)i->get();
              cout << " " << f->getData();
              test_list_floats.insert(test_list_floats.end(), *i);
           }
           if ( 0 == strcmp("Int", (*i)->getDataType() ) )
           {
              intData*  f = (intData*)i->get();
              cout << " " << f->getData();
              test_list_ints.insert(test_list_ints.end(), *i);
           }
        }
        cout << endl;

        // Display the list with float type elements
        cout << "float test:";
        for (i =  test_list_floats.begin(); i != test_list_floats.end(); ++i)
        {
           cout << " " << (*i)->getDataType();
           floatData*  f = (floatData*)i->get();
           cout << " " << f->getData();
        }
        cout << endl;

        // Display the list with integer type elements
        cout << "int test:";
        for (i =  test_list_ints.begin(); i != test_list_ints.end(); ++i)
        {
           cout << " " << (*i)->getDataType();
           intData*  f = (intData*)i->get();
           cout << " " << f->getData();
        }
        cout << endl;

        // NOTE: To reduce mutex bottleneck coupling in a real application it is recommended that
        // each linked list have it's own shareable mutex.
        // I used the same mutex here for all three lists to have the output from each thread
        // appear in a single line. If I use one mutex per thread then it would appear
        // jumbled up and almost unreadable.
        // To use a unique mutex per list enable UNIQUE_MUTEX macro.
        // For this test I did not notice any performance differences, but that will
        // depend largely on how long the unique lock is held.
        boost::thread workerThread0(workerFunc, 0, &test_list,        &MAIN_LIST_MUTEX);
        boost::thread workerThread1(workerFunc, 1, &test_list_ints,   &INT_LIST_MUTEX);
        boost::thread workerThread2(workerFunc, 2, &test_list_floats, &FLOAT_LIST_MUTEX);
        boost::thread workerThread3(workerFunc, 3, &test_list,        &MAIN_LIST_MUTEX);
        boost::thread workerThread4(workerFunc, 4, &test_list_floats, &FLOAT_LIST_MUTEX);
        boost::thread workerThread5(workerFunc, 5, &test_list_ints,   &INT_LIST_MUTEX);
        boost::thread workerThread6(workerFunc, 6, &test_list,        &MAIN_LIST_MUTEX);
        boost::thread workerThread7(workerFunc, 7, &test_list_floats, &FLOAT_LIST_MUTEX);
        boost::thread workerThread8(workerFunc, 8, &test_list,        &MAIN_LIST_MUTEX);
        boost::thread workerThread9(workerFunc, 9, &test_list_ints,   &INT_LIST_MUTEX);
        workerThread0.join();
        workerThread1.join();
        workerThread2.join();
        workerThread3.join();
        workerThread4.join();
        workerThread5.join();
        workerThread6.join();
        workerThread7.join();
        workerThread8.join();
        workerThread9.join();

        cout << "*** Test End ***:";
        for (i =  test_list.begin(); i != test_list.end(); ++i)
        {
           cout << " " << (*i)->getDataType();
           if ( 0 == strcmp("Float", (*i)->getDataType() ) )
           {
              floatData*  f = (floatData*)i->get();
              cout << " " << f->getData();
           }
           if ( 0 == strcmp("Int", (*i)->getDataType() ) )
           {
              intData*  f = (intData*)i->get();
              cout << " " << f->getData();
           }
        }
        cout << endl;
        cout << "float test end:";
        for (i =  test_list_floats.begin(); i != test_list_floats.end(); ++i)
        {
           cout << " " << (*i)->getDataType();
           floatData*  f = (floatData*)i->get();
           cout << " " << f->getData();
        }
        cout << endl;
        cout << "int test end:";
        for (i =  test_list_ints.begin(); i != test_list_ints.end(); ++i)
        {
           cout << " " << (*i)->getDataType();
           intData*  f = (intData*)i->get();
           cout << " " << f->getData();
        }
        cout << endl;
        cout << "*** thread counts***" << endl;
        for ( int idx = 0; idx < NUM_THREADS; idx++)
        {
           cout << "    thread " << idx;
           cout << ": int rd(" << setw(NUM_WIDTH) << instReadIntCount[idx];
           cout << ") int wr(" << setw(NUM_WIDTH) << instWriteIntCount[idx];
           cout << ") flt rd(" << setw(NUM_WIDTH) << instReadFloatCount[idx];
           cout << ") flt wr(" << setw(NUM_WIDTH) << instWriteFloatCount[idx];
           cout << ")" <<  endl;
        }
     }

     // All records in the linked list have now been deallocated automatically(due to smart pointer)
     // as the linked list objects have been destroyed due to going out of scope.  
     cout << "*** Object Deletion counts***" << endl;
     cout << "  int deletes: " << instIntDeletes << endl;
     cout << "float deletes: " << instFloatDeletes << endl;

     return 0;
  }
4

1 に答える 1

2

の複雑さboost::container::list::erase(const_iterator)一定時間で償却されます( boost / container / list.hppiterator erase(const_iterator p)で検索してください)。したがって、この関数を呼び出すときに繰り返し検索が行われることはありません。

ただし、いくつかのポイントがあります。

ごく最近、並行性の専門家から、UpgradeLockable Conceptを使用するのは、明確な必要性を特定した後でのみ使用するのが賢明であるとアドバイスされました。つまり、プロファイリング後。esに関連付けられたロックは、単純なsまたはsupgrade_mutexよりも必然的に複雑であるため、パフォーマンスが低下します。boost::mutex::scoped_lockstd::lock_guard

upgrade_mutexあなたの例では、現在の(より複雑な)セットアップと、を置き換えてmutex常に排他的にロックすることとの間に、パフォーマンスに大きな違いはないことがわかるでしょう。

もう1つのポイントは、コードコメントはboost::upgrade_lock、特定のaの複数のインスタンスupgrade_mutexが共存できると考えていることを示しているように見えることです。これはそうではありません。一度に保持できるスレッドは1つだけupgrade_lockです。

shared_lockが保持されている間、他のいくつかのスレッドがを保持する場合がありますが、を一意にアップグレードする前に、upgrade_lockこれらshared_lockのスレッドを解放する必要があります。upgrade_lock

詳細については、UpgradeLockableConceptのブーストドキュメントを参照してください。


編集

以下のコメントで指摘されている点を確認するために、次の例は、存在している間は新しいshared_locksを取得できますが、upgrade_lock存在している間は取得できないことを示していupgrade_to_unique_lockます(ブースト1.51でテスト済み)。

#include <iostream>
#include <vector>

#include <boost/thread.hpp>
#include <boost/date_time.hpp>
#include <boost/thread/locks.hpp>
#include <boost/thread/shared_mutex.hpp>

typedef boost::shared_lock<boost::upgrade_mutex>            SharedLock;
typedef boost::upgrade_to_unique_lock<boost::upgrade_mutex> UniqueLock;
typedef boost::upgrade_lock<boost::upgrade_mutex>           UpgradeLock;

boost::upgrade_mutex the_mutex;

void Write() {
  UpgradeLock upgrade_lock(the_mutex);
  std::cout << "\tPreparing to write\n";
  boost::this_thread::sleep(boost::posix_time::seconds(1));
  UniqueLock unique_lock(upgrade_lock);
  std::cout << "\tStarting to write\n";
  boost::this_thread::sleep(boost::posix_time::seconds(5));
  std::cout << "\tDone writing.\n";
}

void Read() {
  SharedLock lock(the_mutex);
  std::cout << "Starting to read.\n";
  boost::this_thread::sleep(boost::posix_time::seconds(1));
  std::cout << "Done reading.\n";
}

int main() {
  // Start a read operation
  std::vector<boost::thread> reader_threads;
  reader_threads.push_back(std::move(boost::thread(Read)));
  boost::this_thread::sleep(boost::posix_time::milliseconds(250));

  // Start a write operation.  This will block trying to upgrade
  // the UpgradeLock to UniqueLock since a SharedLock currently exists.
  boost::thread writer_thread(Write);

  // Start several other read operations.  These won't be blocked
  // since only an UpgradeLock and SharedLocks currently exist.
  for (int i = 0; i < 25; ++i) {
    boost::this_thread::sleep(boost::posix_time::milliseconds(100));
    reader_threads.push_back(std::move(boost::thread(Read)));
  }

  // Join the readers.  This allows the writer to upgrade to UniqueLock
  // since it's currently the only lock.
  for (auto& reader_thread : reader_threads)
    reader_thread.join();

  // Start a new read operation.  This will be blocked since a UniqueLock
  // currently exists.
  boost::this_thread::sleep(boost::posix_time::milliseconds(100));
  boost::thread reader_thread(Read);

  writer_thread.join();
  reader_thread.join();

  return 0;
}
于 2012-11-07T01:43:48.913 に答える