Thread and Queue Thread and Queue multithreading multithreading

Thread and Queue


There is a Queue class in thread in the standard library. Using that you can do something like this:

require 'thread'queue = Queue.newthreads = []# add work to the queuequeue << work_unit4.times do  threads << Thread.new do    # loop until there are no more things to do    until queue.empty?      # pop with the non-blocking flag set, this raises      # an exception if the queue is empty, in which case      # work_unit will be set to nil      work_unit = queue.pop(true) rescue nil      if work_unit        # do work      end    end    # when there is no more work, the thread will stop  endend# wait until all threads have completed processingthreads.each { |t| t.join }

The reason I pop with the non-blocking flag is that between the until queue.empty? and the pop another thread may have pop'ed the queue, so unless the non-blocking flag is set we could get stuck at that line forever.

If you're using MRI, the default Ruby interpreter, bear in mind that threads will not be absolutely concurrent. If your work is CPU bound you may just as well run single threaded. If you have some operation that blocks on IO you may get some parallelism, but YMMV. Alternatively, you can use an interpreter that allows full concurrency, such as jRuby or Rubinius.


There area a few gems that implement this pattern for you; parallel, peach,and mine is called threach (or jruby_threach under jruby). It's a drop-in replacement for #each but allows you to specify how many threads to run with, using a SizedQueue underneath to keep things from spiraling out of control.

So...

(1..10).threach(4) {|i| do_my_work(i) }

Not pushing my own stuff; there are plenty of good implementations out there to make things easier.

If you're using JRuby, jruby_threach is a much better implementation -- Java just offers a much richer set of threading primatives and data structures to use.


Executable descriptive example:

require 'thread'p tasks = [    {:file => 'task1'},    {:file => 'task2'},    {:file => 'task3'},    {:file => 'task4'},    {:file => 'task5'}]tasks_queue = Queue.newtasks.each {|task| tasks_queue << task}# run workersworkers_count = 3workers = []workers_count.times do |n|    workers << Thread.new(n+1) do |my_n|        while (task = tasks_queue.shift(true) rescue nil) do            delay = rand(0)            sleep delay            task[:result] = "done by worker ##{my_n} (in #{delay})"            p task        end    endend# wait for all threadsworkers.each(&:join)# output resultsputs "all done"p tasks