1

現在、次のように機能するステージングジョブシステムに取り組んでいます(疑似コード):

cyclicbarrier.init(numthreads)
on each thread:
  for each stage s:
    loop forever:
      pop job from joblist[s]
      if no job:
        break
      execute job
    wait at cyclicbarrier

これにより、ステージ内のすべてのジョブが次のステージに移る前に実行を終了します。

cyclicbarrier2 つのセマフォを使用して実装されます。

sem1(0)
sem2(0)
n = 0

function wait:
  if atomic_incr(n) == maxthreads:
    sem1.signal(maxthreads)
  sem1.wait()

  if atomic_decr(n) == 0:
    sem2.signal(maxthreads)
  sem2.wait()

現在実行中のステージに 1 つ以上のジョブを追加できるジョブのサポートを追加し、ステージが続行する前にそれらのジョブも実行できるようにしたいと考えています。上記のコードはこれを行いますが、スレッドがバリアに到達した後にこのジョブが追加された場合、待機中のスレッドがジョブを実行している可能性があるのに実行していないため、最適に実行されません。

問題を要約すると、一種のハイブリッドsemaphore/が必要になりました。cyclicbarrier便宜上、これを と呼びますsembarrier。これがどのように機能するかは次のとおりです。

function wait:
  n--
  if n < 0 and n > -maxthreads:
    suspend until signaled
  else if n <= -maxthreads:
    signal(maxthreads)

基本的に、これsembarrierは と同じように機能しますsemaphoreが、十分な数のスレッドが待機している場合、 のように動作し、cyclicbarrierそれらをすべて解放します。

を使用した新しい実装を次に示しsembarrierます。

sembarrierコンストラクターは次のようになります。

sembarrier(initial_value, num_threads)

sembarrierステージごとに を作成します。

n = num_theads
for each stage s:
  sembarrier[s].init(0, n)

sembarrierこれは、配列を使用して変更されたループになります。

on each thread:
  for each stage s:
    loop forever:
      sembarrier[s].wait()
      pop job from joblist[s]
      if no job:
        break;
      execute job

さらに、 に追加する場合joblist:

function AddJob(stage s, job j):
  joblist[s].push(j)
  sembarrier[s].signal()

質問:

1) どうすれば効率的に実装できsembarrierますか? ミューテックスやセマフォなどの基本的な同時実行構造にアクセスできます。また、標準のアトミック操作もあります。

2) 元の問題に対する別の解決策はありますか?

ありがとう!

4

0 に答える 0