14

Ruby 用の適切な ThreadPool 実装が見つからなかったので、自分で作成しました (部分的にここのコードに基づいています: http://web.archive.org/web/20081204101031/http://snippets.dzone.com:80/ posts/show/3276ですが、スレッドプールのシャットダウンのために wait/signal およびその他の実装に変更されました. ただし、しばらく実行した後 (100 のスレッドがあり、約 1300 のタスクを処理)、25 行目でデッドロックが発生し、新しいジョブを待機します。アイデアはありますか?なぜそれが起こるのでしょうか?

require 'thread'
begin
  require 'fastthread'
rescue LoadError
  $stderr.puts "Using the ruby-core thread implementation"
end 

class ThreadPool
  class Worker
    def initialize(callback)
      @mutex = Mutex.new
      @cv = ConditionVariable.new
      @callback = callback
      @mutex.synchronize {@running = true}
      @thread = Thread.new do
        while @mutex.synchronize {@running}
          block = get_block
          if block
            block.call
            reset_block
            # Signal the ThreadPool that this worker is ready for another job
            @callback.signal
          else
            # Wait for a new job
            @mutex.synchronize {@cv.wait(@mutex)} # <=== Is this line 25?
          end
        end
      end
    end

    def name
      @thread.inspect
    end

    def get_block
      @mutex.synchronize {@block}
    end

    def set_block(block)
      @mutex.synchronize do
        raise RuntimeError, "Thread already busy." if @block
        @block = block
        # Signal the thread in this class, that there's a job to be done
        @cv.signal
      end
    end

    def reset_block
      @mutex.synchronize {@block = nil}
    end

    def busy?
      @mutex.synchronize {!@block.nil?}
    end

    def stop
      @mutex.synchronize {@running = false}
      # Signal the thread not to wait for a new job
      @cv.signal
      @thread.join
    end
  end

  attr_accessor :max_size

  def initialize(max_size = 10)
    @max_size = max_size
    @workers = []
    @mutex = Mutex.new
    @cv = ConditionVariable.new
  end

  def size
    @mutex.synchronize {@workers.size}
  end

  def busy?
    @mutex.synchronize {@workers.any? {|w| w.busy?}}
  end

  def shutdown
    @mutex.synchronize {@workers.each {|w| w.stop}}
  end
  alias :join :shutdown

  def process(block=nil,&blk)
    block = blk if block_given?
    while true
      @mutex.synchronize do
         worker = get_worker 
         if worker
           return worker.set_block(block)
         else
           # Wait for a free worker
           @cv.wait(@mutex)
         end
      end
    end
  end

  # Used by workers to report ready status
  def signal
    @cv.signal
  end

  private
  def get_worker
    free_worker || create_worker
  end

  def free_worker
    @workers.each {|w| return w unless w.busy?}; nil
  end

  def create_worker
    return nil if @workers.size >= @max_size
    worker = Worker.new(self)
    @workers << worker
    worker
  end
end
4

5 に答える 5

12

さて、実装の主な問題は次のとおりです。信号が失われないようにし、デッドロックを回避する方法は?

私の経験では、これを条件変数とミューテックスで実現するのは非常に困難ですが、セマフォでは簡単です。たまたま ruby​​ が Queue (または SizedQueue) と呼ばれるオブジェクトを実装して、問題を解決するはずです。これが私の提案する実装​​です:

require 'thread'
begin
  require 'fasttread'
rescue LoadError
  $stderr.puts "Using the ruby-core thread implementation"
end

class ThreadPool
  class Worker
    def initialize(thread_queue)
      @mutex = Mutex.new
      @cv = ConditionVariable.new
      @queue = thread_queue
      @running = true
      @thread = Thread.new do
        @mutex.synchronize do
          while @running
            @cv.wait(@mutex)
            block = get_block
            if block
              @mutex.unlock
              block.call
              @mutex.lock
              reset_block
            end
            @queue << self
          end
        end
      end
    end

    def name
      @thread.inspect
    end

    def get_block
      @block
    end

    def set_block(block)
      @mutex.synchronize do
        raise RuntimeError, "Thread already busy." if @block
        @block = block
        # Signal the thread in this class, that there's a job to be done
        @cv.signal
      end
    end

    def reset_block
      @block = nil
    end

    def busy?
      @mutex.synchronize { !@block.nil? }
    end

    def stop
      @mutex.synchronize do
        @running = false
        @cv.signal
      end
      @thread.join
    end
  end

  attr_accessor :max_size

  def initialize(max_size = 10)
    @max_size = max_size
    @queue = Queue.new
    @workers = []
  end

  def size
    @workers.size
  end

  def busy?
    @queue.size < @workers.size
  end

  def shutdown
    @workers.each { |w| w.stop }
    @workers = []
  end

  alias :join :shutdown

  def process(block=nil,&blk)
    block = blk if block_given?
    worker = get_worker
    worker.set_block(block)
  end

  private

  def get_worker
    if !@queue.empty? or @workers.size == @max_size
      return @queue.pop
    else
      worker = Worker.new(@queue)
      @workers << worker
      worker
    end
  end

end

そして、ここに簡単なテストコードがあります:

tp = ThreadPool.new 500
(1..1000).each { |i| tp.process { (2..10).inject(1) { |memo,val| sleep(0.1); memo*val }; print "Computation #{i} done. Nb of tasks: #{tp.size}\n" } }
tp.shutdown
于 2008-09-17T12:49:26.060 に答える
8

プロデューサーとワーカー スレッドのプールの間で作業を調整するように設計されたwork_queue gemを試すことができます。

于 2010-01-14T09:25:10.403 に答える
1

ここでは少し偏見がありますが、これをいくつかのプロセス言語でモデル化し、モデルチェックすることをお勧めします。無料で利用できるツールには、たとえば、mCRL2ツールセット(ACPベースの言語を使用)、Mobility Workbench(パイ計算)、Spin(PROMELA)があります。

それ以外の場合は、問題に不可欠ではないコードをすべて削除し、デッドロックが発生する最小限のケースを見つけることをお勧めします。100スレッドと1300タスクがデッドロックを取得するために不可欠であるとは思えません。小さなケースでは、問題を解決するのに十分な情報を提供するデバッグプリントを追加するだけで済みます。

于 2008-09-17T11:06:14.700 に答える
1

わかりました。問題はThreadPool#signalメソッドにあるようです。何が起こる可能性があります:

1-すべての労働者が忙しく、あなたは新しい仕事を処理しようとしています

2-行90はnilワーカーを取得します

3-ワーカーは解放されてシグナルを送信しますが、ThreadPoolが待機していないため、シグナルは失われます

4-あなたは95行目に落ち、無料の労働者がいるのに待っています。

ここでのエラーは、誰も聞いていないときでも、フリーワーカーに信号を送ることができるということです。このThreadPool#signalメソッドは次のようになります。

def signal
     @mutex.synchronize { @cv.signal }
end

そして、問題はワーカーオブジェクトでも同じです。何が起こる可能性があります:

1-労働者はちょうど仕事を完了しました

2-待機中のジョブがあるかどうかをチェックします(17行目):ありません

3-スレッドプールは新しいジョブを送信してシグナルを送信します...しかしシグナルは失われます

4-ビジーとしてマークされていても、ワーカーは信号を待ちます

initializeメソッドを次のように配置する必要があります。

def initialize(callback)
  @mutex = Mutex.new
  @cv = ConditionVariable.new
  @callback = callback
  @mutex.synchronize {@running = true}
  @thread = Thread.new do
    @mutex.synchronize do
      while @running
        block = get_block
        if block
          @mutex.unlock
          block.call
          @mutex.lock
          reset_block
          # Signal the ThreadPool that this worker is ready for another job
          @callback.signal
        else
          # Wait for a new job
          @cv.wait(@mutex)
        end
      end
    end
  end
end

次に、Worker#get_blockメソッドとWorker#reset_blockメソッドを同期しないようにする必要があります。そうすれば、ブロックのテストと信号の待機の間に、ワーカーにブロックを割り当てることはできません。

于 2008-09-17T11:09:48.390 に答える