4

Cormen の Introduction to Algorithmsstd::threadで説明されているように、並列マージを実装する際にコンパイラとして gcc を使用しています。

コードが動作するようになったと思います。大きすぎないランダムにシードされたすべての配列を渡します。ただし、大きい 2 つの配列 (それぞれ 1e6 要素) をマージしようとすると、次のような終了が発生します。

terminate called without an active exception
terminate called recursively
terminate called recursively

gdb を使用しても役に立ちません。実行中に破損します。

生成されたスレッドが多すぎるために実行が失敗したことは確かです。

このエラーが生成された std::threads が多すぎるためであることを確認するにはどうすればよいですか?

ノート

  1. コードは n=1e4 までは機能し、n=1e5 までは失敗します
  2. 出力を見たい場合は DEBUG を定義しますが、10 や 50 のような小さな n を除いて、これはお勧めしません。

  3. STRBUF_SIZE/fprintf の使用は見苦しいですが、iostream はスレッドでうまくフラッシュされません。これはハックですが、機能します (ここで焦点を当てる必要はありません)。
  4. スレッドの周りに try/catch ブロックを使用して、Barnes53 の提案に従ってみましたが、どうやらうまくいきませんでした。
  5. 無数のスレッドを生成することが悪いことであることはわかっています。現時点では、本の内容を実装して、それが機能するかどうかを確認し、おそらくその制限が何であるかを発見しようとしています。

アップデート

  1. 以下の GManNickG の回答は役に立ちました: すべての実行ではありませんが、1e5 のいくつかの実行中に、実際にリソースがなくなっていることがわかります。
  2. このアルゴリズムがサルベージ可能でない場合は、生成されるスレッドの数を制御できる、ある種の k-way 並列ソートを検討する予定です。

コード

#include <vector>
#include <iostream>
#include <algorithm>
#include <vector>
#include <thread>
#include <cmath>
#include <cstring>
#include <cassert>

#define STRBUF_SIZE 1024

class Random
{
public:
    Random( unsigned int seed=::time(nullptr))
        : m_seed( seed )
    { }
    // between [ 0 .. n-1 ]
    unsigned int rand_uint( unsigned int n )
    {
        return static_cast<unsigned int>
                     (static_cast<float>(n) * rand_r( &m_seed ) / RAND_MAX);
    }
    unsigned int getSeed() const { return m_seed; }
private:
    unsigned int m_seed;
};

template<typename T>
char* dump( char* line, T it1, T it2 )
{
    char buf[80];
    line[0] = '\0';
    for( T it=it1; it!=it2; ++it )
    {
        sprintf( buf, "%u ", *it );
        strcat(  line, buf );
    }
    return line;
}

template< typename T, class It >
It binary_search_it( It beg, It end, const T& value )
{
    auto low  = beg;
    auto high = std::max( beg, end );   // end+1
    while( low < high )
    {
        auto mid = low + std::distance( low, high ) / 2;
        if ( value <= *mid )
            high = mid;
        else
            low = mid + 1;
    }
    return high;
}

template< class InputIt, class OutputIt >
void p_merge( 
    char const*  msg, 
    unsigned     depth,
    unsigned     parent_lvl_id,
    unsigned     lr,
    InputIt  p1, InputIt  r1, 
    InputIt  p2, InputIt  r2, 
    OutputIt p3, OutputIt r3
    )
{
#ifdef DEBUG
    char buff[STRBUF_SIZE];
#endif
    unsigned sum_prev  = pow( 2, depth ) - 1;
    unsigned lvl_id    = 2*parent_lvl_id + lr;
    unsigned thread_no = sum_prev + lvl_id + 1;

    unsigned limit0    = sum_prev + 1;
    unsigned limit1    = pow( 2, depth+1 ) - 1;

#ifdef DEBUG
    char msg_dep[256];
    sprintf( msg_dep, "%s [%2d] %-10d [%d,%d]", msg, depth, thread_no, limit0, limit1 );
    fprintf( stderr, "%s\n", msg_dep );
#endif

    if ( thread_no<limit0 || thread_no>limit1 )
    {
        fprintf( stderr, "OUT OF BOUNDS\n" );
        exit( 1 );
    }

    auto n1 = std::distance( p1, r1 );
    auto n2 = std::distance( p2, r2 );
#ifdef DEBUG
    fprintf( stderr, "%s dist[v1]=%2ld   : %s\n", msg_dep, n1, dump( buff, p1, r1 ) );
    fprintf( stderr, "%s dist[v2]=%2ld   : %s\n", msg_dep, n2, dump( buff, p2, r2 ) );
#endif
    if ( n1<n2 )
    {
        std::swap( p1, p2 );
        std::swap( r1, r2 );
        std::swap( n1, n2 );
#ifdef DEBUG
      fprintf( stderr, "%s swapped[v1]   : %s\n", msg_dep, dump( buff, p1, r1 ));
      fprintf( stderr, "%s swapped[v2]   : %s\n", msg_dep, dump( buff, p2, r2 ));
#endif
    }
    if ( n1==0 )
    {
#ifdef DEBUG
      fprintf( stderr, "%s done              \n", msg_dep );
#endif
        return;
    }
    auto q1 = p1 + n1 / 2;   // midpoint
    auto q2 = binary_search_it( p2, r2, *q1 );  // <q1   q2[q1]   >=q1
    auto q3 = p3 + std::distance( p1, q1 ) + std::distance( p2, q2 );
    *q3 = *q1;

#ifdef DEBUG
    fprintf( stderr, "%s q1[median]=%u  : %s\n", msg_dep, *q1, dump( buff, p1, r1 ));
    fprintf( stderr, "%s q2[fulcrum]=%u : %s\n", msg_dep, *q2, dump( buff, p2, r2 ));
    fprintf( stderr, "%s q3(copied)=%u  : %s\n", msg_dep, *q3, dump( buff, p3, r3 ));
#endif

#ifdef DEBUG
    auto d1 = std::distance( p1,   q1-1 );
    auto d2 = std::distance( q1+1, r1   );
    fprintf( stderr, "%s q1[dist_L]=%ld  : %s\n", msg_dep, d1, dump( buff, p1, r1 ));
    fprintf( stderr, "%s q1[dist_M]=%ld  : %s\n", msg_dep, d2, dump( buff, p1, r1 ));
#endif


    try {
        std::thread t1{ 
            [&](){ p_merge( "LESS", depth+1, lvl_id, 0, p1, q1,   p2, q2,   p3, r3 ); } 
        };
        std::thread t2{ 
            [&](){ p_merge( "MORE", depth+1, lvl_id, 1, q1+1, r1, q2, r2, q3+1, r3 ); } 
        };
        t1.join();
        t2.join();
    }
    catch( ... )
    {
        fprintf( stderr, "OK - I am dying during a std::thread spawn\n" );
        exit( 1 );
    }

#ifdef DEBUG
    fprintf( stderr, "%s synchronized\n", msg_dep );
#endif
}

int
main( int argv, char* argc[] )
{
    // ok up to 1e4, fails by 1e5
    unsigned n = 1e5; 
    Random   r;
    std::vector<unsigned> v1( n ), v2( n ), v3( 2 * n );

#ifdef DEBUG
    fprintf( stderr, "SEED = %u\n", r.getSeed() );
#endif

    std::generate( v1.begin(), v1.end(), [&]() { return r.rand_uint(n); } );
    std::generate( v2.begin(), v2.end(), [&]() { return r.rand_uint(n); } );

#ifdef DEBUG
    char buff[STRBUF_SIZE];
    fprintf( stderr, "%s\n", dump( buff, v1.begin(), v1.end() ));
    fprintf( stderr, "%s\n", dump( buff, v2.begin(), v2.end() ));
#endif

    std::sort( v1.begin(), v1.end() );
    std::sort( v2.begin(), v2.end() );

    p_merge( "TOP ", 0, 0, 0,
        v1.begin(), v1.end(), v2.begin(), v2.end(), v3.begin(), v3.end() );

    assert( std::is_sorted( v3.begin(), v3.end() ));

#ifdef DEBUG
    fprintf( stderr, "FINAL : %s\n", dump( buff, v3.begin(), v3.end() ));
#endif
}
4

2 に答える 2

5

std::system_errorコードがresource_unavailable_try_again次のとおりであるかどうかをキャッチして確認できます。

#include <atomic>
#include <iostream>
#include <system_error>
#include <thread>
#include <vector>

class thread_collection
{
public:
    thread_collection() :
    mStop(false)
    {}

    ~thread_collection()
    {
        clear();
    }

    template <typename Func, typename... Args>
    bool add(Func&& func, Args&&... args)
    {
        try
        {
            mThreads.emplace_back(std::forward<Func>(func),
                                  std::cref(mStop),
                                  std::forward<Args>(args)...);
        }
        catch (const std::system_error& e)
        {
            if (e.code().value() == std::errc::resource_unavailable_try_again)
                return false; // not possible to make more threads right now
            else
                throw; // something else
        }

        return true; // can keep going
    }

    void clear()
    {
        mStop = true;
        for (auto& thread : mThreads)
        {
            if (thread.joinable())
                thread.join();
        }

        mThreads.clear();
        mStop = true;
    }

    std::size_t size() const
    {
        return mThreads.size();
    }

private:
    thread_collection(const thread_collection&);
    thread_collection& operator=(const thread_collection&);

    std::atomic<bool> mStop;
    std::vector<std::thread> mThreads;
};

void worker(const std::atomic<bool>& stop)
{
    while (!stop)
        std::this_thread::yield();
}

int main()
{
    thread_collection threads;

    try
    {
        while (threads.add(worker))
            continue;

        std::cout << "Exhausted thread resources!" << std::endl;
    }
    catch (const std::exception& e)
    {
        std::cout << "Stopped for some other reason: " << e.what() << std::endl;
    }

    std::cout << "Made: " << threads.size() << " threads." << std::endl;
    threads.clear();
}

(これは自己責任で実行してください!)

§30.3.1.2/4によると、これはスレッド作成の失敗を示すために使用されるエラーコードです。

エラー状態:
resource_unavailable_try_again —システムに別のスレッドを作成するために必要なリソースが不足しているか、プロセス内のスレッド数にシステムが課した制限を超えています。

これは、結果のスレッドにコピーされる独自の引数によってスローされる可能性があることに注意してください。これを保証するには、引数を事前に作成してから、引数をスレッド関数に移動しないようにする必要があります。

とはいえ、とにかくスレッドの作成に制限を設ける方がはるかに良いでしょう。コアが実行できるよりも多くのスレッドを実行しても意味がありません。std::thread::hardware_concurrencyその番号を取得するために使用します。

于 2013-01-18T23:06:57.277 に答える
2
try {
    std::thread t1{ 
        [&](){ p_merge( "LESS", depth+1, lvl_id, 0, p1, q1,   p2, q2,   p3, r3 ); } 
    };
    std::thread t2{ 
        [&](){ p_merge( "MORE", depth+1, lvl_id, 1, q1+1, r1, q2, r2, q3+1, r3 ); } 
    };
    t1.join();
    t2.join();
}
catch( ... )
{
    fprintf( stderr, "OK - I am dying during a std::thread spawn\n" );
    exit( 1 );
}

このコードは期待どおりに動作しない場合があります。構築中t2に例外がスローされた場合t1は破棄されますが、そのスレッドは結合可能であるため呼び出さstd::terminate()れるためcatch、例外は処理されません。

表示されている理由の 1 つはterminate called recursively、多数のスレッドが同時に同じ問題を抱えているため、多数のスレッドが同時に呼び出しを行っている可能性がありますterminate()

これは代わりに機能します:

std::thread t1;
std::thread t2;

try {
    t1 = std::thread{ 
        [&](){ p_merge( "LESS", depth+1, lvl_id, 0, p1, q1,   p2, q2,   p3, r3 ); } 
    };
    t2 = std::thread{ 
        [&](){ p_merge( "MORE", depth+1, lvl_id, 1, q1+1, r1, q2, r2, q3+1, r3 ); } 
    };
}
catch( ... )
{
    fprintf( stderr, "OK - I am dying during a std::thread spawn\n" );
    exit( 1 );
}

t1.join();
t2.join();

p_mergeスローできる唯一の場所はブロック内であるため、ここでは問題ではないと思いますtryが、例外が実行された関数を離れると、それstd::threadも呼び出さstd::terminate()れるため、それはあなたが望むものではないことに注意する必要がありますnoexcept関数 (または非スロー関数) を に渡す必要がありますstd::thread

于 2013-01-19T16:33:28.633 に答える