Thread and Queue
There is a Queue
class in thread
in the standard library. Using that you can do something like this:
require 'thread'queue = Queue.newthreads = []# add work to the queuequeue << work_unit4.times do threads << Thread.new do # loop until there are no more things to do until queue.empty? # pop with the non-blocking flag set, this raises # an exception if the queue is empty, in which case # work_unit will be set to nil work_unit = queue.pop(true) rescue nil if work_unit # do work end end # when there is no more work, the thread will stop endend# wait until all threads have completed processingthreads.each { |t| t.join }
The reason I pop with the non-blocking flag is that between the until queue.empty?
and the pop another thread may have pop'ed the queue, so unless the non-blocking flag is set we could get stuck at that line forever.
If you're using MRI, the default Ruby interpreter, bear in mind that threads will not be absolutely concurrent. If your work is CPU bound you may just as well run single threaded. If you have some operation that blocks on IO you may get some parallelism, but YMMV. Alternatively, you can use an interpreter that allows full concurrency, such as jRuby or Rubinius.
There area a few gems that implement this pattern for you; parallel, peach,and mine is called threach
(or jruby_threach
under jruby). It's a drop-in replacement for #each but allows you to specify how many threads to run with, using a SizedQueue underneath to keep things from spiraling out of control.
So...
(1..10).threach(4) {|i| do_my_work(i) }
Not pushing my own stuff; there are plenty of good implementations out there to make things easier.
If you're using JRuby, jruby_threach
is a much better implementation -- Java just offers a much richer set of threading primatives and data structures to use.
Executable descriptive example:
require 'thread'p tasks = [ {:file => 'task1'}, {:file => 'task2'}, {:file => 'task3'}, {:file => 'task4'}, {:file => 'task5'}]tasks_queue = Queue.newtasks.each {|task| tasks_queue << task}# run workersworkers_count = 3workers = []workers_count.times do |n| workers << Thread.new(n+1) do |my_n| while (task = tasks_queue.shift(true) rescue nil) do delay = rand(0) sleep delay task[:result] = "done by worker ##{my_n} (in #{delay})" p task end endend# wait for all threadsworkers.each(&:join)# output resultsputs "all done"p tasks