10

BrBを使用して、フォークした Ruby 1.9 のさまざまなワーカー プロセスのデータソースを共有しますProcess#fork

Thread.abort_on_exception = true

fork do
  puts "Initializing data source process... (PID: #{Process.pid})"
  data = DataSource.new(files)

  BrB::Service.start_service(:object => data, :verbose => false, :host => host, :port => port)
  EM.reactor_thread.join
end

ワーカーは次のようにフォークされます。

8.times do |t|  
  fork do
    data = BrB::Tunnel.create(nil, "brb://#{host}:#{port}", :verbose => false)

    puts "Launching #{threads_num} worker threads... (PID: #{Process.pid})"    

    threads = []
    threads_num.times { |i|
      threads << Thread.new {
        while true
          begin
            worker = Worker.new(data, config)

          rescue OutOfTargetsError
            break

          rescue Exception => e
            puts "An unexpected exception was caught: #{e.class} => #{e}"
            sleep 5

          end
        end
      }
    }
    threads.each { |t| t.join }

    data.stop_service
    EM.stop
  end
end

これはほぼ完全に機能しますが、約 10 分間実行した後、次のエラーが発生します。

bootstrap.rb:47:in `join': deadlock detected (fatal)
    from bootstrap.rb:47:in `block in <main>'
    from bootstrap.rb:39:in `fork'
    from bootstrap.rb:39:in `<main>'</pre>

このエラーは、デッドロックが実際に発生している場所についてはあまり教えてくれませんjoin。EventMachine スレッドのことを示しているだけです。

プログラムが停止した時点を追跡するにはどうすればよいですか?

4

2 に答える 2

5

親スレッドでロックアップしjoinています。その情報は正確です。子スレッドでロックされている場所を追跡するには、スレッドの作業をtimeoutblockでラップしてみてください。rescueタイムアウト例外を発生させるには、キャッチオールを一時的に削除する必要があります。

現在、親スレッドはすべてのスレッドに順番に参加しようとし、完了するまでそれぞれをブロックします。ただし、各スレッドはOutOfTargetsError. 有効期間が短いスレッドを使用し、whileループを親に移動することで、デッドロックを回避できる場合があります。保証はありませんが、おそらくこのようなものは機能しますか?

8.times do |t|  
  fork do
    running = true
    Signal.trap("INT") do
      puts "Interrupt signal received, waiting for threads to finish..."
      running = false
    end

    data = BrB::Tunnel.create(nil, "brb://#{host}:#{port}", :verbose => false)

    puts "Launching max #{threads_num} worker threads... (PID: #{Process.pid})"    

    threads = []
    while running
      # Start new threads until we have threads_num running
      until threads.length >= threads_num do
        threads << Thread.new {
          begin
            worker = Worker.new(data, config)
          rescue OutOfTargetsError
          rescue Exception => e
            puts "An unexpected exception was caught: #{e.class} => #{e}"
            sleep 5
          end
        }
      end

      # Make sure the parent process doesn't spin too much
      sleep 1

      # Join finished threads
      finished_threads = threads.reject &:status
      threads -= finished_threads
      finished_threads.each &:join
    end

    data.stop_service
    EM.stop
  end
end
于 2011-04-30T02:28:05.297 に答える
3

私は同じ問題を抱えていましたが、次のコード スニペットを使用して解決しました。

# Wait for all threads (other than the current thread and
# main thread) to stop running.
# Assumes that no new threads are started while waiting
def join_all
  main     = Thread.main       # The main thread
  current  = Thread.current    # The current thread
  all      = Thread.list       # All threads still running
  # Now call join on each thread
  all.each{|t| t.join unless t == current or t == main }
end

出典: Ruby プログラミング言語、O'Reilly (2008)

于 2012-07-13T13:47:50.600 に答える