Handle exceptions in concurrent-ruby thread pool
The following answer is from jdantonio from here https://github.com/ruby-concurrency/concurrent-ruby/issues/616
"Most applications should not use thread pools directly. Thread pools are a low-level abstraction meant for internal use. All of the high-level abstractions in this library (Promise, Actor, etc.) all post jobs to the global thread pool and all provide exception handling. Simply pick the abstraction that best fits your use case and use it.
If you feel the need to configure your own thread pool rather than use the global thread pool, you can still use the high-level abstractions. They all support an :executor option which allows you to inject your custom thread pool. You can then use the exception handling provided by the high-level abstraction.
If you absolutely insist on posting jobs directly to a thread pool rather than using our high-level abstractions (which I strongly discourage) then just create a job wrapper. You can find examples of job wrappers in all our high-level abstractions, Rails ActiveJob, Sucker Punch, and other libraries which use our thread pools."
So how about an implementation with Promises ? http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Promise.htmlIn your case it would look something like this:
promises = []products.each do |product| new_product = generate_new_prodcut promises << Concurrent::Promise.execute do store_in_db(new_product) endend# .value will wait for the Thread to finish.# The ! means, that all exceptions will be propagated to the main thread# .zip will make one Promise which contains all other promises.Concurrent::Promise.zip(*promises).value!
There may be a better way, but this does work. You will want to change the error handling within wait_for_pool_to_finish
.
def process pool = Concurrent::FixedThreadPool.new(10) errors = Concurrent::Array.new 10_000.times do pool.post do begin # do the work rescue StandardError => e errors << e end end end wait_for_pool_to_finish(pool, errors)endprivatedef wait_for_pool_to_finish(pool, errors) pool.shutdown until pool.shutdown? if errors.any? pool.kill fail errors.first end sleep 1 end pool.wait_for_terminationend
I've created an issue #634. Concurrent thread pool can support abortable worker without any problems.
require "concurrent"Concurrent::RubyThreadPoolExecutor.class_eval do # Inspired by "ns_kill_execution". def ns_abort_execution aborted_worker @pool.each do |worker| next if worker == aborted_worker worker.kill end @pool = [aborted_worker] @ready.clear stopped_event.set nil end def abort_worker worker synchronize do ns_abort_execution worker end nil end def join shutdown # We should wait for stopped event. # We couldn't use timeout. stopped_event.wait nil @pool.each do |aborted_worker| # Rubinius could receive an error from aborted thread's "join" only. # MRI Ruby doesn't care about "join". # It will receive error anyway. # We can "raise" error in aborted thread and than "join" it from this thread. # We can "join" aborted thread from this thread and than "raise" error in aborted thread. # The order of "raise" and "join" is not important. We will receive target error anyway. aborted_worker.join end @pool.clear nil end class AbortableWorker < self.const_get :Worker def initialize pool super @thread.abort_on_exception = true end def run_task pool, task, args begin task.call *args rescue StandardError => error pool.abort_worker self raise error end pool.worker_task_completed nil end def join @thread.join nil end end self.send :remove_const, :Worker self.const_set :Worker, AbortableWorkerendclass MyError < StandardError; endpool = Concurrent::FixedThreadPool.new 5begin pool.post do sleep 1 puts "we shouldn't receive this message" end pool.post do puts "raising my error" raise MyError end pool.joinrescue MyError => error puts "received my error, trace: \n#{error.backtrace.join("\n")}"endsleep 2
Output:
raising my errorreceived my error, trace:...
This patch works fine for any version of MRI Ruby and Rubinius. JRuby is not working and I don't care. Please patch JRuby executor if you want to support it. It should be easy.