Cormen の Introduction to Algorithmsstd::thread
で説明されているように、並列マージを実装する際にコンパイラとして gcc を使用しています。
コードが動作するようになったと思います。大きすぎないランダムにシードされたすべての配列を渡します。ただし、大きい 2 つの配列 (それぞれ 1e6 要素) をマージしようとすると、次のような終了が発生します。
terminate called without an active exception
terminate called recursively
terminate called recursively
gdb を使用しても役に立ちません。実行中に破損します。
生成されたスレッドが多すぎるために実行が失敗したことは確かです。
このエラーが生成された std::threads が多すぎるためであることを確認するにはどうすればよいですか?
ノート
- コードは n=1e4 までは機能し、n=1e5 までは失敗します
出力を見たい場合は DEBUG を定義しますが、10 や 50 のような小さな n を除いて、これはお勧めしません。
- STRBUF_SIZE/fprintf の使用は見苦しいですが、iostream はスレッドでうまくフラッシュされません。これはハックですが、機能します (ここで焦点を当てる必要はありません)。
- スレッドの周りに try/catch ブロックを使用して、Barnes53 の提案に従ってみましたが、どうやらうまくいきませんでした。
- 無数のスレッドを生成することが悪いことであることはわかっています。現時点では、本の内容を実装して、それが機能するかどうかを確認し、おそらくその制限が何であるかを発見しようとしています。
アップデート
- 以下の GManNickG の回答は役に立ちました: すべての実行ではありませんが、1e5 のいくつかの実行中に、実際にリソースがなくなっていることがわかります。
- このアルゴリズムがサルベージ可能でない場合は、生成されるスレッドの数を制御できる、ある種の k-way 並列ソートを検討する予定です。
コード
#include <vector>
#include <iostream>
#include <algorithm>
#include <vector>
#include <thread>
#include <cmath>
#include <cstring>
#include <cassert>
#define STRBUF_SIZE 1024
class Random
{
public:
Random( unsigned int seed=::time(nullptr))
: m_seed( seed )
{ }
// between [ 0 .. n-1 ]
unsigned int rand_uint( unsigned int n )
{
return static_cast<unsigned int>
(static_cast<float>(n) * rand_r( &m_seed ) / RAND_MAX);
}
unsigned int getSeed() const { return m_seed; }
private:
unsigned int m_seed;
};
template<typename T>
char* dump( char* line, T it1, T it2 )
{
char buf[80];
line[0] = '\0';
for( T it=it1; it!=it2; ++it )
{
sprintf( buf, "%u ", *it );
strcat( line, buf );
}
return line;
}
template< typename T, class It >
It binary_search_it( It beg, It end, const T& value )
{
auto low = beg;
auto high = std::max( beg, end ); // end+1
while( low < high )
{
auto mid = low + std::distance( low, high ) / 2;
if ( value <= *mid )
high = mid;
else
low = mid + 1;
}
return high;
}
template< class InputIt, class OutputIt >
void p_merge(
char const* msg,
unsigned depth,
unsigned parent_lvl_id,
unsigned lr,
InputIt p1, InputIt r1,
InputIt p2, InputIt r2,
OutputIt p3, OutputIt r3
)
{
#ifdef DEBUG
char buff[STRBUF_SIZE];
#endif
unsigned sum_prev = pow( 2, depth ) - 1;
unsigned lvl_id = 2*parent_lvl_id + lr;
unsigned thread_no = sum_prev + lvl_id + 1;
unsigned limit0 = sum_prev + 1;
unsigned limit1 = pow( 2, depth+1 ) - 1;
#ifdef DEBUG
char msg_dep[256];
sprintf( msg_dep, "%s [%2d] %-10d [%d,%d]", msg, depth, thread_no, limit0, limit1 );
fprintf( stderr, "%s\n", msg_dep );
#endif
if ( thread_no<limit0 || thread_no>limit1 )
{
fprintf( stderr, "OUT OF BOUNDS\n" );
exit( 1 );
}
auto n1 = std::distance( p1, r1 );
auto n2 = std::distance( p2, r2 );
#ifdef DEBUG
fprintf( stderr, "%s dist[v1]=%2ld : %s\n", msg_dep, n1, dump( buff, p1, r1 ) );
fprintf( stderr, "%s dist[v2]=%2ld : %s\n", msg_dep, n2, dump( buff, p2, r2 ) );
#endif
if ( n1<n2 )
{
std::swap( p1, p2 );
std::swap( r1, r2 );
std::swap( n1, n2 );
#ifdef DEBUG
fprintf( stderr, "%s swapped[v1] : %s\n", msg_dep, dump( buff, p1, r1 ));
fprintf( stderr, "%s swapped[v2] : %s\n", msg_dep, dump( buff, p2, r2 ));
#endif
}
if ( n1==0 )
{
#ifdef DEBUG
fprintf( stderr, "%s done \n", msg_dep );
#endif
return;
}
auto q1 = p1 + n1 / 2; // midpoint
auto q2 = binary_search_it( p2, r2, *q1 ); // <q1 q2[q1] >=q1
auto q3 = p3 + std::distance( p1, q1 ) + std::distance( p2, q2 );
*q3 = *q1;
#ifdef DEBUG
fprintf( stderr, "%s q1[median]=%u : %s\n", msg_dep, *q1, dump( buff, p1, r1 ));
fprintf( stderr, "%s q2[fulcrum]=%u : %s\n", msg_dep, *q2, dump( buff, p2, r2 ));
fprintf( stderr, "%s q3(copied)=%u : %s\n", msg_dep, *q3, dump( buff, p3, r3 ));
#endif
#ifdef DEBUG
auto d1 = std::distance( p1, q1-1 );
auto d2 = std::distance( q1+1, r1 );
fprintf( stderr, "%s q1[dist_L]=%ld : %s\n", msg_dep, d1, dump( buff, p1, r1 ));
fprintf( stderr, "%s q1[dist_M]=%ld : %s\n", msg_dep, d2, dump( buff, p1, r1 ));
#endif
try {
std::thread t1{
[&](){ p_merge( "LESS", depth+1, lvl_id, 0, p1, q1, p2, q2, p3, r3 ); }
};
std::thread t2{
[&](){ p_merge( "MORE", depth+1, lvl_id, 1, q1+1, r1, q2, r2, q3+1, r3 ); }
};
t1.join();
t2.join();
}
catch( ... )
{
fprintf( stderr, "OK - I am dying during a std::thread spawn\n" );
exit( 1 );
}
#ifdef DEBUG
fprintf( stderr, "%s synchronized\n", msg_dep );
#endif
}
int
main( int argv, char* argc[] )
{
// ok up to 1e4, fails by 1e5
unsigned n = 1e5;
Random r;
std::vector<unsigned> v1( n ), v2( n ), v3( 2 * n );
#ifdef DEBUG
fprintf( stderr, "SEED = %u\n", r.getSeed() );
#endif
std::generate( v1.begin(), v1.end(), [&]() { return r.rand_uint(n); } );
std::generate( v2.begin(), v2.end(), [&]() { return r.rand_uint(n); } );
#ifdef DEBUG
char buff[STRBUF_SIZE];
fprintf( stderr, "%s\n", dump( buff, v1.begin(), v1.end() ));
fprintf( stderr, "%s\n", dump( buff, v2.begin(), v2.end() ));
#endif
std::sort( v1.begin(), v1.end() );
std::sort( v2.begin(), v2.end() );
p_merge( "TOP ", 0, 0, 0,
v1.begin(), v1.end(), v2.begin(), v2.end(), v3.begin(), v3.end() );
assert( std::is_sorted( v3.begin(), v3.end() ));
#ifdef DEBUG
fprintf( stderr, "FINAL : %s\n", dump( buff, v3.begin(), v3.end() ));
#endif
}