11

並行 Ruby スレッド プール ( http://ruby-concurrency.github.io/concurrent-ruby/file.thread_pools.html ) で例外を処理するには?

例:

pool = Concurrent::FixedThreadPool.new(5) 
pool.post do
  raise 'something goes wrong'
end

# how to rescue this exception here

アップデート:

これが私のコードの簡略版です:

def process
  pool = Concurrent::FixedThreadPool.new(5)

  products.each do |product|
    new_product = generate_new_product

    pool.post do
      store_in_db(new_product) # here exception is raised, e.g. connection to db failed
    end
  end

  pool.shutdown
  pool.wait_for_terminaton
end

したがって、私が達成したいのは、例外が発生した場合に処理を停止する (ループを中断する) ことです。

この例外はアプリケーションの上位レベルでもレスキューされ、いくつかのクリーニング ジョブが実行されます (モデルの状態を失敗に設定し、通知を送信するなど)。

4

3 に答える 3

1

問題#634を作成しました。同時スレッド プールは、中止可能なワーカーを問題なくサポートできます。

require "concurrent"

Concurrent::RubyThreadPoolExecutor.class_eval do
  # Inspired by "ns_kill_execution".
  def ns_abort_execution aborted_worker
    @pool.each do |worker|
      next if worker == aborted_worker
      worker.kill
    end

    @pool = [aborted_worker]
    @ready.clear

    stopped_event.set
    nil
  end

  def abort_worker worker
    synchronize do
      ns_abort_execution worker
    end
    nil
  end

  def join
    shutdown

    # We should wait for stopped event.
    # We couldn't use timeout.
    stopped_event.wait nil

    @pool.each do |aborted_worker|
      # Rubinius could receive an error from aborted thread's "join" only.
      # MRI Ruby doesn't care about "join".
      # It will receive error anyway.

      # We can "raise" error in aborted thread and than "join" it from this thread.
      # We can "join" aborted thread from this thread and than "raise" error in aborted thread.
      # The order of "raise" and "join" is not important. We will receive target error anyway.

      aborted_worker.join
    end

    @pool.clear
    nil
  end

  class AbortableWorker < self.const_get :Worker
    def initialize pool
      super
      @thread.abort_on_exception = true
    end

    def run_task pool, task, args
      begin
        task.call *args
      rescue StandardError => error
        pool.abort_worker self
        raise error
      end

      pool.worker_task_completed
      nil
    end

    def join
      @thread.join
      nil
    end
  end

  self.send :remove_const, :Worker
  self.const_set :Worker, AbortableWorker
end

class MyError < StandardError; end

pool = Concurrent::FixedThreadPool.new 5

begin
  pool.post do
    sleep 1
    puts "we shouldn't receive this message"
  end

  pool.post do
    puts "raising my error"
    raise MyError
  end

  pool.join

rescue MyError => error
  puts "received my error, trace: \n#{error.backtrace.join("\n")}"
end

sleep 2

出力:

raising my error
received my error, trace:
...

このパッチは、MRI Ruby および Rubinius のどのバージョンでも正常に機能します。JRuby は動作していませんが、私は気にしません。サポートする場合は、JRuby executor にパッチを適用してください。簡単なはずです。

于 2017-03-03T11:54:40.690 に答える