現在、次のように機能するステージングジョブシステムに取り組んでいます(疑似コード):
cyclicbarrier.init(numthreads)
on each thread:
for each stage s:
loop forever:
pop job from joblist[s]
if no job:
break
execute job
wait at cyclicbarrier
これにより、ステージ内のすべてのジョブが次のステージに移る前に実行を終了します。
cyclicbarrier
2 つのセマフォを使用して実装されます。
sem1(0)
sem2(0)
n = 0
function wait:
if atomic_incr(n) == maxthreads:
sem1.signal(maxthreads)
sem1.wait()
if atomic_decr(n) == 0:
sem2.signal(maxthreads)
sem2.wait()
現在実行中のステージに 1 つ以上のジョブを追加できるジョブのサポートを追加し、ステージが続行する前にそれらのジョブも実行できるようにしたいと考えています。上記のコードはこれを行いますが、スレッドがバリアに到達した後にこのジョブが追加された場合、待機中のスレッドがジョブを実行している可能性があるのに実行していないため、最適に実行されません。
問題を要約すると、一種のハイブリッドsemaphore
/が必要になりました。cyclicbarrier
便宜上、これを と呼びますsembarrier
。これがどのように機能するかは次のとおりです。
function wait:
n--
if n < 0 and n > -maxthreads:
suspend until signaled
else if n <= -maxthreads:
signal(maxthreads)
基本的に、これsembarrier
は と同じように機能しますsemaphore
が、十分な数のスレッドが待機している場合、 のように動作し、cyclicbarrier
それらをすべて解放します。
を使用した新しい実装を次に示しsembarrier
ます。
sembarrier
コンストラクターは次のようになります。
sembarrier(initial_value, num_threads)
sembarrier
ステージごとに を作成します。
n = num_theads
for each stage s:
sembarrier[s].init(0, n)
sembarrier
これは、配列を使用して変更されたループになります。
on each thread:
for each stage s:
loop forever:
sembarrier[s].wait()
pop job from joblist[s]
if no job:
break;
execute job
さらに、 に追加する場合joblist
:
function AddJob(stage s, job j):
joblist[s].push(j)
sembarrier[s].signal()
質問:
1) どうすれば効率的に実装できsembarrier
ますか? ミューテックスやセマフォなどの基本的な同時実行構造にアクセスできます。また、標準のアトミック操作もあります。
2) 元の問題に対する別の解決策はありますか?
ありがとう!