Thread safe Enumerator in Ruby
If you start the worker threads before filling up the queue, they will start consuming the queue as you fill it up, and because as a rule of thumb - network is slower than CPU, each batch should be (mostly) consumed by the time the next batch arrives:
queue = Queue.newt1 = Thread.new do while !queue.empty? p queue.pop(true) sleep(0.1) endendt2 = Thread.new do while !queue.empty? p queue.pop(true) sleep(0.1) endend(0..1000).map(&queue.method(:push))t1.joint2.join
If that proves too slow still, you can opt to use SizedQueue
, which will block the push
if the queue reaches a big enough size:
queue = SizedQueue.new(100)t1 = Thread.new do while !queue.empty? p "#{queue.pop(true)} - #{queue.size}" sleep(0.1) endendt2 = Thread.new do while !queue.empty? p queue.pop(true) sleep(0.1) endend(0..300).map(&queue.method(:push))t1.joint2.join