配列は一般にスレッドセーフではありません。Queue などのスレッドセーフなデータ構造に切り替えます。
new_look_queue = Queue.new
threads = looks.map do |look|
Thread.new do
new_look_queue.enq Look.new(ref: look["look_ref"])
end
end
puts 'waiting threads to finish...'
threads.each(&:join)
puts 'saving...'
new_looks = []
while !new_look_queue.empty?
new_look_queue << queue.deq
end
new_looks.sort_by(&:ref).each(&:save)
Queue#enqは新しいエントリをキューに入れます。Queue#deqは 1 つ取得し、ない場合はブロックします。
順番に保存する必要がない場合new_looks
、コードはより単純になります。
puts 'saving...'
while !new_look_queue.empty?
new_look_queue.deq.save
end
または、さらに単純に、スレッド内で保存するだけです。
ルックが非常に多い場合、上記のコードは必要以上に多くのスレッドを作成します。スレッドが多すぎると、リクエストの処理に時間がかかり、余分なメモリが消費されます。その場合は、いくつかのプロデューサー スレッドを作成することを検討してください。
NUM_THREADS = 8
以前と同様に、完了した作業のキューがあります。
new_look_queue = Queue.new
しかし、今やるべき作業の待ち行列もあります:
look_queue = Queue.new
looks.each do |look|
look_queue.enq look
end
各スレッドは機能しなくなるまで存続するため、「機能していない」シンボルをスレッドごとに 1 つずつキューに追加しましょう。
NUM_THREADS.times do {look_queue.enq :done}
そして今、スレッド:
threads = NUM_THREADS.times.map do
Thread.new do
while (look = look_queue.deq) != :done
new_look_queue.enq Look.new(ref: look["look_ref"])
end
end
end
new_look_queue の処理は上記と同じです。