Blocking queue and multi-threaded consumer, how to know when to stop
You should continue to take()
from the queue. You can use a poison pill to tell the worker to stop. For example:
private final Object POISON_PILL = new Object();@Overridepublic void run() { //worker loop keeps taking en element from the queue as long as the producer is still running or as //long as the queue is not empty: while(isRunning) { System.out.println("Consumer "+Thread.currentThread().getName()+" START"); try { Object queueElement = inputQueue.take(); if(queueElement == POISON_PILL) { inputQueue.add(POISON_PILL);//notify other threads to stop return; } //process queueElement } catch (Exception e) { e.printStackTrace(); } }}//this is used to signal from the main thread that he producer has finished adding stuff to the queuepublic void finish() { //you can also clear here if you wanted isRunning = false; inputQueue.add(POISON_PILL);}
I'd send the workers a special work packet to signal that they should shut down:
public class ConsumerWorker implements Runnable{private static final Produced DONE = new Produced();private BlockingQueue<Produced> inputQueue;public ConsumerWorker(BlockingQueue<Produced> inputQueue) { this.inputQueue = inputQueue;}@Overridepublic void run() { for (;;) { try { Produced item = inputQueue.take(); if (item == DONE) { inputQueue.add(item); // keep in the queue so all workers stop break; } // process `item` } catch (Exception e) { e.printStackTrace(); } }}
}
To stop the workers, simply add ConsumerWorker.DONE
to the queue.
In your code-block where you attempt to retrive element from the queue , use poll(time,unit)
instead of the take()
.
try { Object queueElement = inputQueue.poll(timeout,unit); //process queueElement } catch (InterruptedException e) { if(!isRunning && queue.isEmpty()) return ; }
By specifying appropriate values of timeout , you ensure that threads wont keep blocking in case there is a unfortunate sequence of
isRunning
is true- Queue becomes empty , so threads enter blocked wait ( if using
take()
isRunning
is set to false