Blocking queue and multi-threaded consumer, how to know when to stop Blocking queue and multi-threaded consumer, how to know when to stop multithreading multithreading

Blocking queue and multi-threaded consumer, how to know when to stop


You should continue to take() from the queue. You can use a poison pill to tell the worker to stop. For example:

private final Object POISON_PILL = new Object();@Overridepublic void run() {    //worker loop keeps taking en element from the queue as long as the producer is still running or as     //long as the queue is not empty:    while(isRunning) {        System.out.println("Consumer "+Thread.currentThread().getName()+" START");        try {            Object queueElement = inputQueue.take();            if(queueElement == POISON_PILL) {                 inputQueue.add(POISON_PILL);//notify other threads to stop                 return;            }            //process queueElement        } catch (Exception e) {            e.printStackTrace();        }    }}//this is used to signal from the main thread that he producer has finished adding stuff to the queuepublic void finish() {    //you can also clear here if you wanted    isRunning = false;    inputQueue.add(POISON_PILL);}


I'd send the workers a special work packet to signal that they should shut down:

public class ConsumerWorker implements Runnable{private static final Produced DONE = new Produced();private BlockingQueue<Produced> inputQueue;public ConsumerWorker(BlockingQueue<Produced> inputQueue) {    this.inputQueue = inputQueue;}@Overridepublic void run() {    for (;;) {        try {            Produced item = inputQueue.take();            if (item == DONE) {                inputQueue.add(item); // keep in the queue so all workers stop                break;            }            // process `item`        } catch (Exception e) {            e.printStackTrace();        }    }}

}

To stop the workers, simply add ConsumerWorker.DONE to the queue.


In your code-block where you attempt to retrive element from the queue , use poll(time,unit) instead of the take().

try {     Object queueElement = inputQueue.poll(timeout,unit);     //process queueElement         } catch (InterruptedException e) {        if(!isRunning && queue.isEmpty())         return ;  } 

By specifying appropriate values of timeout , you ensure that threads wont keep blocking in case there is a unfortunate sequence of

  1. isRunning is true
  2. Queue becomes empty , so threads enter blocked wait ( if using take()
  3. isRunning is set to false