Deadlock in ThreadPool Deadlock in ThreadPool multithreading multithreading

Deadlock in ThreadPool


Ok, so the main problem with the implementation is: how to make sure no signal is lost and avoid dead locks ?

In my experience, this is REALLY hard to achieve with condition variables and mutex, but easy with semaphores. It so happens that ruby implement an object called Queue (or SizedQueue) that should solve the problem. Here is my suggested implementation:

require 'thread'begin  require 'fasttread'rescue LoadError  $stderr.puts "Using the ruby-core thread implementation"endclass ThreadPool  class Worker    def initialize(thread_queue)      @mutex = Mutex.new      @cv = ConditionVariable.new      @queue = thread_queue      @running = true      @thread = Thread.new do        @mutex.synchronize do          while @running            @cv.wait(@mutex)            block = get_block            if block              @mutex.unlock              block.call              @mutex.lock              reset_block            end            @queue << self          end        end      end    end    def name      @thread.inspect    end    def get_block      @block    end    def set_block(block)      @mutex.synchronize do        raise RuntimeError, "Thread already busy." if @block        @block = block        # Signal the thread in this class, that there's a job to be done        @cv.signal      end    end    def reset_block      @block = nil    end    def busy?      @mutex.synchronize { !@block.nil? }    end    def stop      @mutex.synchronize do        @running = false        @cv.signal      end      @thread.join    end  end  attr_accessor :max_size  def initialize(max_size = 10)    @max_size = max_size    @queue = Queue.new    @workers = []  end  def size    @workers.size  end  def busy?    @queue.size < @workers.size  end  def shutdown    @workers.each { |w| w.stop }    @workers = []  end  alias :join :shutdown  def process(block=nil,&blk)    block = blk if block_given?    worker = get_worker    worker.set_block(block)  end  private  def get_worker    if !@queue.empty? or @workers.size == @max_size      return @queue.pop    else      worker = Worker.new(@queue)      @workers << worker      worker    end  endend

And here is a simple test code:

tp = ThreadPool.new 500(1..1000).each { |i| tp.process { (2..10).inject(1) { |memo,val| sleep(0.1); memo*val }; print "Computation #{i} done. Nb of tasks: #{tp.size}\n" } }tp.shutdown


You can try the work_queue gem, designed to coordinate work between a producer and a pool of worker threads.


I'm slightly biased here, but I would suggest modelling this in some process language and model check it. Freely available tools are, for example, the mCRL2 toolset (using a ACP-based language), the Mobility Workbench (pi-calculus) and Spin (PROMELA).

Otherwise I would suggest removing every bit of code that is not essential to the problem and finding a minimal case where the deadlock occurs. I doubt that it the 100 threads and 1300 tasks are essential to get a deadlock. With a smaller case you can probably just add some debug prints which provide enough information the solve the problem.