Pure-Ruby concurrent Hash
Okay, now that you specified the actually meaning of 'threadsafe', here are two potential implementations. The following code will run forever in MRI and JRuby. The lockless implementation follows an eventual consistency model where each thread uses it's own view of the hash if the master is in flux. There is a little trickery required to make sure storing all the information in the thread doesn't leak memory, but that is handled and tested ― process size does not grow running this code. Both implementations would need more work to be 'complete', meaning delete, update, etc. would need some thinking, but either of the two concepts below will meet your requirements.
It's very important for people reading this thread to realize the whole issue is exclusive to JRuby ― in MRI the built-in Hash is sufficient.
module Cash def Cash.new(*args, &block) env = ENV['CASH_IMPL'] impl = env ? Cash.const_get(env) : LocklessImpl klass = defined?(JRUBY_VERSION) ? impl : ::Hash klass.new(*args) end class LocklessImpl def initialize @hash = {} end def thread_hash thread = Thread.current thread[:cash] ||= {} hash = thread[:cash][thread_key] if hash hash else hash = thread[:cash][thread_key] = {} ObjectSpace.define_finalizer(self){ thread[:cash].delete(thread_key) } hash end end def thread_key [Thread.current.object_id, object_id] end def []=(key, val) time = Time.now.to_f tuple = [time, val] @hash[key] = tuple thread_hash[key] = tuple val end def [](key) # check the master value # val = @hash[key] # someone else is either writing the key or it has never been set. we # need to invalidate our own copy in either case # if val.nil? thread_val = thread_hash.delete(key) return(thread_val ? thread_val.last : nil) end # check our own thread local value # thread_val = thread_hash[key] # in this case someone else has written a value that we have never seen so # simply return it # if thread_val.nil? return(val.last) end # in this case there is a master *and* a thread local value, if the master # is newer juke our own cached copy # if val.first > thread_val.first thread_hash.delete(key) return val.last else return thread_val.last end end end class LockingImpl < ::Hash require 'sync' def initialize(*args, &block) super ensure extend Sync_m end def sync(*args, &block) sync_synchronize(*args, &block) end def [](key) sync(:SH){ super } end def []=(key, val) sync(:EX){ super } end endendif $0 == __FILE__ iteration = 0 loop do n = 42 hash = Cash.new threads = Array.new(10) { Thread.new do Thread.current.abort_on_exception = true n.times do |key| hash[key] = key raise "#{ key }=nil" if hash[key].nil? end end } threads.map{|thread| thread.join} puts "THREADSAFE: #{ iteration += 1 }" endend
Posting base/naive solution, just to boost my Stack Overflow cred:
require 'thread'class ConcurrentHash < Hash def initialize super @mutex = Mutex.new end def [](*args) @mutex.synchronize { super } end def []=(*args) @mutex.synchronize { super } endend
Yehuda, I think you mentioned ivar setting was atomic? What about a simple copy and swap then?
require 'thread'class ConcurrentHash def initialize @reader, @writer = {}, {} @lock = Mutex.new end def [](key) @reader[key] end def []=(key, value) @lock.synchronize { @writer[key] = value @reader, @writer = @writer, @reader @writer[key] = value } endend