問題#634を作成しました。同時スレッド プールは、中止可能なワーカーを問題なくサポートできます。
require "concurrent"
Concurrent::RubyThreadPoolExecutor.class_eval do
# Inspired by "ns_kill_execution".
def ns_abort_execution aborted_worker
@pool.each do |worker|
next if worker == aborted_worker
worker.kill
end
@pool = [aborted_worker]
@ready.clear
stopped_event.set
nil
end
def abort_worker worker
synchronize do
ns_abort_execution worker
end
nil
end
def join
shutdown
# We should wait for stopped event.
# We couldn't use timeout.
stopped_event.wait nil
@pool.each do |aborted_worker|
# Rubinius could receive an error from aborted thread's "join" only.
# MRI Ruby doesn't care about "join".
# It will receive error anyway.
# We can "raise" error in aborted thread and than "join" it from this thread.
# We can "join" aborted thread from this thread and than "raise" error in aborted thread.
# The order of "raise" and "join" is not important. We will receive target error anyway.
aborted_worker.join
end
@pool.clear
nil
end
class AbortableWorker < self.const_get :Worker
def initialize pool
super
@thread.abort_on_exception = true
end
def run_task pool, task, args
begin
task.call *args
rescue StandardError => error
pool.abort_worker self
raise error
end
pool.worker_task_completed
nil
end
def join
@thread.join
nil
end
end
self.send :remove_const, :Worker
self.const_set :Worker, AbortableWorker
end
class MyError < StandardError; end
pool = Concurrent::FixedThreadPool.new 5
begin
pool.post do
sleep 1
puts "we shouldn't receive this message"
end
pool.post do
puts "raising my error"
raise MyError
end
pool.join
rescue MyError => error
puts "received my error, trace: \n#{error.backtrace.join("\n")}"
end
sleep 2
出力:
raising my error
received my error, trace:
...
このパッチは、MRI Ruby および Rubinius のどのバージョンでも正常に機能します。JRuby は動作していませんが、私は気にしません。サポートする場合は、JRuby executor にパッチを適用してください。簡単なはずです。