Handle exceptions in concurrent-ruby thread pool Handle exceptions in concurrent-ruby thread pool multithreading multithreading

Handle exceptions in concurrent-ruby thread pool


The following answer is from jdantonio from here https://github.com/ruby-concurrency/concurrent-ruby/issues/616

"Most applications should not use thread pools directly. Thread pools are a low-level abstraction meant for internal use. All of the high-level abstractions in this library (Promise, Actor, etc.) all post jobs to the global thread pool and all provide exception handling. Simply pick the abstraction that best fits your use case and use it.

If you feel the need to configure your own thread pool rather than use the global thread pool, you can still use the high-level abstractions. They all support an :executor option which allows you to inject your custom thread pool. You can then use the exception handling provided by the high-level abstraction.

If you absolutely insist on posting jobs directly to a thread pool rather than using our high-level abstractions (which I strongly discourage) then just create a job wrapper. You can find examples of job wrappers in all our high-level abstractions, Rails ActiveJob, Sucker Punch, and other libraries which use our thread pools."

So how about an implementation with Promises ? http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Promise.htmlIn your case it would look something like this:

promises = []products.each do |product|  new_product = generate_new_prodcut  promises << Concurrent::Promise.execute do     store_in_db(new_product)  endend# .value will wait for the Thread to finish.# The ! means, that all exceptions will be propagated to the main thread# .zip will make one Promise which contains all other promises.Concurrent::Promise.zip(*promises).value!


There may be a better way, but this does work. You will want to change the error handling within wait_for_pool_to_finish.

def process  pool = Concurrent::FixedThreadPool.new(10)  errors = Concurrent::Array.new  10_000.times do    pool.post do      begin        # do the work      rescue StandardError => e        errors << e      end    end  end  wait_for_pool_to_finish(pool, errors)endprivatedef wait_for_pool_to_finish(pool, errors)  pool.shutdown  until pool.shutdown?    if errors.any?      pool.kill      fail errors.first    end    sleep 1  end  pool.wait_for_terminationend


I've created an issue #634. Concurrent thread pool can support abortable worker without any problems.

require "concurrent"Concurrent::RubyThreadPoolExecutor.class_eval do  # Inspired by "ns_kill_execution".  def ns_abort_execution aborted_worker    @pool.each do |worker|      next if worker == aborted_worker      worker.kill    end    @pool = [aborted_worker]    @ready.clear    stopped_event.set    nil  end  def abort_worker worker    synchronize do      ns_abort_execution worker    end    nil  end  def join    shutdown    # We should wait for stopped event.    # We couldn't use timeout.    stopped_event.wait nil    @pool.each do |aborted_worker|      # Rubinius could receive an error from aborted thread's "join" only.      # MRI Ruby doesn't care about "join".      # It will receive error anyway.      # We can "raise" error in aborted thread and than "join" it from this thread.      # We can "join" aborted thread from this thread and than "raise" error in aborted thread.      # The order of "raise" and "join" is not important. We will receive target error anyway.      aborted_worker.join    end    @pool.clear    nil  end  class AbortableWorker < self.const_get :Worker    def initialize pool      super      @thread.abort_on_exception = true    end    def run_task pool, task, args      begin        task.call *args      rescue StandardError => error        pool.abort_worker self        raise error      end      pool.worker_task_completed      nil    end    def join      @thread.join      nil    end  end  self.send :remove_const, :Worker  self.const_set :Worker, AbortableWorkerendclass MyError < StandardError; endpool = Concurrent::FixedThreadPool.new 5begin  pool.post do    sleep 1    puts "we shouldn't receive this message"  end  pool.post do    puts "raising my error"    raise MyError  end  pool.joinrescue MyError => error  puts "received my error, trace: \n#{error.backtrace.join("\n")}"endsleep 2

Output:

raising my errorreceived my error, trace:...

This patch works fine for any version of MRI Ruby and Rubinius. JRuby is not working and I don't care. Please patch JRuby executor if you want to support it. It should be easy.