Pure-Ruby concurrent Hash Pure-Ruby concurrent Hash ruby ruby

Pure-Ruby concurrent Hash


Okay, now that you specified the actually meaning of 'threadsafe', here are two potential implementations. The following code will run forever in MRI and JRuby. The lockless implementation follows an eventual consistency model where each thread uses it's own view of the hash if the master is in flux. There is a little trickery required to make sure storing all the information in the thread doesn't leak memory, but that is handled and tested ― process size does not grow running this code. Both implementations would need more work to be 'complete', meaning delete, update, etc. would need some thinking, but either of the two concepts below will meet your requirements.

It's very important for people reading this thread to realize the whole issue is exclusive to JRuby ― in MRI the built-in Hash is sufficient.

module Cash  def Cash.new(*args, &block)    env = ENV['CASH_IMPL']    impl = env ? Cash.const_get(env) : LocklessImpl    klass = defined?(JRUBY_VERSION) ? impl : ::Hash    klass.new(*args)  end  class LocklessImpl    def initialize      @hash = {}    end    def thread_hash      thread = Thread.current      thread[:cash] ||= {}      hash = thread[:cash][thread_key]      if hash        hash      else        hash = thread[:cash][thread_key] = {}        ObjectSpace.define_finalizer(self){ thread[:cash].delete(thread_key) }        hash      end    end    def thread_key      [Thread.current.object_id, object_id]    end    def []=(key, val)      time = Time.now.to_f      tuple = [time, val]      @hash[key] = tuple      thread_hash[key] = tuple      val    end    def [](key)    # check the master value    #      val = @hash[key]    # someone else is either writing the key or it has never been set.  we    # need to invalidate our own copy in either case    #      if val.nil?        thread_val = thread_hash.delete(key)        return(thread_val ? thread_val.last : nil)      end    # check our own thread local value    #      thread_val = thread_hash[key]    # in this case someone else has written a value that we have never seen so    # simply return it    #      if thread_val.nil?        return(val.last)      end    # in this case there is a master *and* a thread local value, if the master    # is newer juke our own cached copy    #      if val.first > thread_val.first        thread_hash.delete(key)        return val.last      else        return thread_val.last      end    end  end  class LockingImpl < ::Hash    require 'sync'    def initialize(*args, &block)      super    ensure      extend Sync_m    end    def sync(*args, &block)      sync_synchronize(*args, &block)    end    def [](key)      sync(:SH){ super }    end    def []=(key, val)      sync(:EX){ super }    end  endendif $0 == __FILE__  iteration = 0  loop do    n = 42    hash = Cash.new    threads =      Array.new(10) {        Thread.new do          Thread.current.abort_on_exception = true          n.times do |key|            hash[key] = key            raise "#{ key }=nil" if hash[key].nil?          end        end      }    threads.map{|thread| thread.join}    puts "THREADSAFE: #{ iteration += 1 }"  endend


Posting base/naive solution, just to boost my Stack Overflow cred:

require 'thread'class ConcurrentHash < Hash  def initialize    super    @mutex = Mutex.new  end  def [](*args)    @mutex.synchronize { super }  end  def []=(*args)    @mutex.synchronize { super }  endend


Yehuda, I think you mentioned ivar setting was atomic? What about a simple copy and swap then?

require 'thread'class ConcurrentHash  def initialize    @reader, @writer = {}, {}    @lock = Mutex.new  end  def [](key)    @reader[key]  end  def []=(key, value)    @lock.synchronize {      @writer[key] = value      @reader, @writer = @writer, @reader      @writer[key] = value    }  endend