1

リソースの幅優先探索を実行するアルゴリズムがあります。

def crawl(starting_node)
  items=[starting_node]
  until items.empty?
    item = items.shift
    kids = item.slow_network_action # takes seconds
    kids.each{ |kid| items << kid }
  end
end

いくつかの同時スレッドを使用してslow_network_actionを並列化したいと思います。
これを行うための合理的な方法は何ですか?

これが機能するテクニックですが、私は確かに正しいアプローチではないと感じています:

def crawl(starting_node)
  mutex = Mutex.new
  items = [starting_node]
  4.times.map{
    loop do
      unless item=mutex.synchronize{ items.shift }
        sleep LONGER_THAN_LONGEST_NETWORK_ACTION
        break unless item=mutex.synchronize{ items.shift }
      end
      kids = item.slow_network_action
      mutex.synchronize{
        kids.each{ |kid| items << kid }
      }
    end
  }.each(&:join)
end

アイテムがキューに追加されるのを待っている間、スレッドを実際にスリープさせ、アイテムが追加されたときにウェイクアップし、誰もが追加されていないときにすべてのスレッドを終了させるようなことをしたいと思います。


この代替コードはほとんど機能しますが、発生する可能性のある(そして発生する)デッドロック、および適切な出口戦略の完全な欠如に対して:

require 'thread'
def crawl(starting_node)
  items = Queue.new
  items << starting_node
  4.times.map{
    while item=items.shift
      kids = item.slow_network_action
      kids.each{ |kid| items << kid }
    end
  }.each(&:join)
end
4

1 に答える 1

2

これはあなたを正しい方向に向けるはずです:

require 'monitor'

NUM_THREADS = 4

def crawl(starting_node)
  items = [starting_node]
  items.extend MonitorMixin
  item_cond = items.new_cond

  threads = []
  working_threads = 0
  finished = false

  NUM_THREADS.times do
    items.synchronize do
      working_threads += 1
    end
    threads << Thread.new do
      item = nil
      kids = []
      loop do
        items.synchronize do

          #add any new items to array
          items.concat kids

          if (items.empty? && working_threads == 1)
            #all other threads are waiting, and there's no more items
            #to process, so we must be done
            finished = true
          end

          #wake up all waiting threads, either to finish or do more work
          #watch out for thundering herds
          item_cond.broadcast unless (items.empty? && !finished)

          #wait, but first decrement count of working threads
          #so we can determine when to finish
          working_threads -= 1
          item_cond.wait_while { items.empty? && !finished}
          Thread.exit if finished
          working_threads += 1

          #get next item
          item = items.shift
        end

        kids = item.slow_network_action
      end

    end
  end

  threads.each(&:join)
end

これにより、itemsアレイがモニターになり、モニターから作成された関連付けとともに、それを介して同期が実行ConditionVariableされます。

これはQueue、すべての作業がいつ終了したかをチェックすることを除いて、内部での作業方法と似ています(実際には少し複雑になります)。

スレッドのメインループは、ループ内に2つの別個の同期ブロックが必要になるのを避けるためにkids追加される空の配列と、それに伴う競合状態から始まります。items

これを使用broadcastすると、待機中のすべてのスレッドがウェイクアップし、雷鳴の群れが発生する可能性があることに注意してください。ここで問題が発生することはないと思います。別の方法は、一度に1つの要素を追加し、それぞれkidsを呼び出すsignalことです。ただし、これにより、すべての作業が終了した場合のケースの処理がさらに複雑になります。

于 2012-04-27T21:35:07.630 に答える