0

ruby アプリケーションには、状態を共有しないタスクがたくさんあり、一度に多くのタスクを起動したいと考えています。重要なのは、それらが開始された順序や戻り値を気にしないことです (完了する前にそれぞれデータベース トランザクションが発生するため)。Ruby の実装によっては、GIL によってこれらのタスクが実際に同時に実行されない可能性があることは承知していますが、実際には真の同時実行性には関心がないため、問題ありません。とにかく、これらのワーカー スレッドは、ネットワーク リクエストを介して IO バインドされます。

私がこれまでに持っているのはこれです:

def asyncDispatcher(numConcurrent, stateQueue, &workerBlock)
  workerThreads = []

  while not stateQueue.empty?
    while workerThreads.length < numConcurrent
      nextState = stateQueue.pop

      nextWorker =
        Thread.new(nextState) do |st|
          workerBlock.call(st)
        end

      workerThreads.push(nextWorker)
    end # inner while

    workerThreads.delete_if{|th| not th.alive?} # clean up dead threads
  end # outer while

  workerThreads.each{|th| th.join} # join any remaining workers
end # asyncDispatcher

そして、私はそれを次のように呼び出します:

asyncDispatcher(2, (1..10).to_a ) {|x| x + 1}

ここに潜んでいるバグや並行性の落とし穴はありますか? それとも、このタスクを簡素化するランタイムの何かでしょうか?

4

1 に答える 1

2

キューを使用する:

require 'thread'

def asyncDispatcher(numWorkers, stateArray, &processor)
  q = Queue.new
  threads = []

  (1..numWorkers).each do |worker_id|
    threads << Thread.new(processor, worker_id) do |processor, worker_id|
      while true
        next_state = q.shift      #shift() blocks if q is empty, which is the case now
        break if next_state == q  #Some sentinel that won't appear in your data
        processor.call(next_state, worker_id)
      end
    end
  end

  stateArray.each {|state| q.push state}
  stateArray.each {q.push q}     #Some sentinel that won't appear in your data

  threads.each(&:join)
end


asyncDispatcher(2, (1..10).to_a) do |state, worker_id|
  time = sleep(Random.rand 10)  #How long it took to process state
  puts "#{state} is finished being processed: worker ##{worker_id} took #{time} secs."
end

--output:--
2 is finished being processed: worker #1 took 4 secs.
3 is finished being processed: worker #1 took 1 secs.
1 is finished being processed: worker #2 took 7 secs.
5 is finished being processed: worker #2 took 1 secs.
6 is finished being processed: worker #2 took 4 secs.
7 is finished being processed: worker #2 took 1 secs.
4 is finished being processed: worker #1 took 8 secs.
8 is finished being processed: worker #2 took 1 secs.
10 is finished being processed: worker #2 took 3 secs.
9 is finished being processed: worker #1 took 9 secs.

わかりました、わかりました、誰かがその出力を見て叫びます。

ねえ、#2 は連続して 4 つのジョブを実行するのに合計 13 秒かかりましたが、#1 はわずか 8 秒しかかかりませんでした。ジョブの場合、#1 の 8 秒間の出力です。仕事はもっと早く来るべきだった。Ruby にはスレッド切り替えがありません。ルビーが壊れた!」

#1 は最初の 2 つのジョブで合計 5 秒間スリープしていましたが、#2 は同時にスリープしていたので、#1 が最初の 2 つのジョブを終了したとき、#2 がスリープする時間はあと 2 秒しかありませんでした。したがって、#2 の 7 秒を 2 秒に置き換えると、#1 が最初の 2 つのジョブを完了した後、#2 が連続して 4 つのジョブを実行するのに合計 8 秒かかったことがわかります。 8秒ジョブ。

于 2013-07-06T12:09:25.113 に答える