C++11 non-blocking producer/consumer C++11 non-blocking producer/consumer multithreading multithreading

C++11 non-blocking producer/consumer


It seems like the C++11 condition variable might be useful to block the consumer thread. Can anyone show me an example of how to use it, while avoiding the possibility that the consumer sleeps with data still in the queue?

To use a condition variable you need a mutex and a condition. In your case the condition will be "there is data available in the queue". Since the producer will be using lock-free updates to produce work, the consumer has to use the same form of synchronisation to consume the work, so the mutex will not actually be used for synchronisation and is only needed by the consumer thread because there's no other way to wait on a condition variable.

// these variables are members or otherwise shared between threadsstd::mutex m_mutex;std::condition_variable m_cv;lockfree_queue m_data;// ...// in producer thread:while (true){  // add work to queue  m_data.push(x);  m_cv.notify_one();}// in consumer thread:while (true){  std::unique_lock<std::mutex> lock(m_mutex);  m_cv.wait(lock, []{ return !m_data.empty(); });  // remove data from queue and process it  auto x = m_data.pop();}

The condition variable will only block in the wait call if the queue is empty before the wait. The condition variable might wake up spuriously, or because it was notified by the producer, but in either case will only return from the wait call (rather than sleeping again) if the queue is non-empty. That's guaranteed by using the condition_variable::wait overload that takes a predicate, because the condition variable always re-checks the predicate for you.

Since the mutex is only used by the consumer thread it could in fact be local to that thread (as long as you only have one consumer, with more than one they all need to share the same mutex to wait on the same condvar).


One solution I found to this in the past was using Windows events (http://msdn.microsoft.com/en-us/library/windows/desktop/ms682396(v=vs.85).aspx). In this case the event remains signaled until it wakes up a waiting thread, and if no threads are waiting it remains signaled. So the producer simply needs to signal the event after pushing data to the queue. Then we are guaranteed that the consumer will wake up some finite time after this.

I wasn't able to find a way to implement this using the standard library though (at least not without blocking the producer thread).


I think semaphores could be used to solve this problem safely:

// in producer thread:while (true){  m_data.push();  m_semaphore.release();}// in consumer thread:while (true){  m_semaphore.wait();  m_data.pop();}

Unfortunately I don't think C++11 includes a semaphore? I have also not been able to confirm that releasing a semaphore is a non-blocking operation. Certainly implementations with mutexes e.g. C++0x has no semaphores? How to synchronize threads? will not allow for a non-blocking producer thread.