Suspend consumer in producer/consumer pattern Suspend consumer in producer/consumer pattern multithreading multithreading

Suspend consumer in producer/consumer pattern


I think your solution is simple and elegant, and think you should keep it with some modifications. The modifications I propose are synchronization.

Without it, thread interference and memory consistancy errors can (and very often does) occur. On top of that, you can't wait or notify on a lock you don't own (and you own it if you have it inside a synchronized block..). The fix is easy, just add a mLock synchronize block where you wait/notify on it. Also, as you're changing mLocked from a different thread you will want to mark it volatile.

private Object mLock = new Object();private volatile boolean mLocked = false;public void lock() {    mLocked = true;}public void unlock() {    synchronized(mlock) {        mLocked = false;        mLock.notify();    }}public void run() {    ....            Record r = mQueue.take();            synchronized(mLock) {                while (mLocked) {                    mLock.wait();                }            }            process(r);}


You can use java.util.concurrent.locks.Condition Java docs to pause for a while based on same condition.

This is approach looks clean to me and ReentrantLock mechanism has better throughput than synchronized. Read below excerpt from IBM article

As a bonus, the implementation of ReentrantLock is far more scalable under contention than the current implementation of synchronized. (It is likely that there will be improvements to the contended performance of synchronized in a future version of the JVM.) This means that when many threads are all contending for the same lock, the total throughput is generally going to be better with ReentrantLock than with synchronized.


BlockingQueue are best known for solution of producer-consumer problem, and it also uses Condition for waiting.

See below example taken from Java doc's Condition, which is a sample implementation of producer - consumer pattern.

class BoundedBuffer {   final Lock lock = new ReentrantLock();   final Condition notFull  = lock.newCondition();    final Condition notEmpty = lock.newCondition();    final Object[] items = new Object[100];   int putptr, takeptr, count;   public void put(Object x) throws InterruptedException {     lock.lock();     try {       while (count == items.length)         notFull.await();       items[putptr] = x;       if (++putptr == items.length) putptr = 0;       ++count;       notEmpty.signal();     } finally {       lock.unlock();     }   }   public Object take() throws InterruptedException {     lock.lock();     try {       while (count == 0)         notEmpty.await();       Object x = items[takeptr];       if (++takeptr == items.length) takeptr = 0;       --count;       notFull.signal();       return x;     } finally {       lock.unlock();     }   } }

Further reading:


Create a new class that extends an implementation of BlockingQueue. Add two new methods pause() and unpause(). Where needed, consider the paused flag and use the other blockingQueue2 to wait (in my example only in the take() method, and not in put()):

public class BlockingQueueWithPause<E> extends LinkedBlockingQueue<E> {    private static final long serialVersionUID = 184661285402L;    private Object lock1 = new Object();//used in pause() and in take()    private Object lock2 = new Object();//used in pause() and unpause()    //@GuardedBy("lock")    private volatile boolean paused;    private LinkedBlockingQueue<Object> blockingQueue2 = new LinkedBlockingQueue<Object>();    public void pause() {        if (!paused) {            synchronized (lock1) {            synchronized (lock2) {                if (!paused) {                    paused = true;                    blockingQueue2.removeAll();//make sure it is empty, e.g after successive calls to pause() and unpause() without any consumers it will remain unempty                }            }            }        }    }    public void unpause() throws InterruptedException {        if (paused) {            synchronized (lock2) {                paused = false;                blockingQueue2.put(new Object());//release waiting thread, if there is one            }        }    }    @Override    public E take() throws InterruptedException {        E result = super.take();        if (paused) {            synchronized (lock1) {//this guarantees that a single thread will be in the synchronized block, all other threads will be waiting                if (paused) {                    blockingQueue2.take();                }            }        }        return result;    }    //TODO override similarly the poll() method.}