"Closing" a blocking queue "Closing" a blocking queue multithreading multithreading

"Closing" a blocking queue


If you have a handle to the consumer thread, you can interrupt it. With the code you gave, that will kill the consumer. I would not expect the producer to have this; it would probably have to callback to the program controller somehow to let it know it's done. Then the controller would interrupt the consumer thread.

You can always finish doing work before obeying the interrupt. For instance:

class QueueConsumer implements Runnable {    @Override    public void run() {        while(!(Thread.currentThread().isInterrupted())) {            try {                final ComplexObject complexObject = myBlockingQueue.take();                this.process(complexObject);            } catch (InterruptedException e) {                // Set interrupted flag.                Thread.currentThread().interrupt();            }        }        // Thread is getting ready to die, but first,        // drain remaining elements on the queue and process them.        final LinkedList<ComplexObject> remainingObjects;        myBlockingQueue.drainTo(remainingObjects);        for(ComplexObject complexObject : remainingObjects) {            this.process(complexObject);        }    }    private void process(final ComplexObject complexObject) {        // Do something with the complex object.    }}

I would actually prefer that to somehow poisoning the queue anyway. If you want to kill the thread, ask the thread to kill itself.

(It's nice to see someone handling InterruptedException properly.)


There seems to be some contention about the handling of interruptions here. First, I would like everyone to read this article.

Now, with the understanding that no one actually read that, here's the deal. A thread will only receive an InterruptedException if it was currently blocking at the time of interrupt. In this case, Thread.interrupted() will return false. If it was not blocking, it will NOT receive this exception, and instead Thread.interrupted() will return true. Therefore, your loop guard should absolutely, no matter what, check Thread.interrupted(), or otherwise risk missing an interruption to the thread.

So, since you are checking Thread.interrupted() no matter what, and you are forced to catch InterruptedException (and should be dealing with it even if you weren't forced to), you now have two code areas which handle the same event, thread interruption. One way to handle this is normalize them into one condition, meaning either the boolean state check can throw the exception, or the exception can set the boolean state. I choose the later.


Edit: Note that the static Thread#interrupted method clears the the interrupted status of the current thread.


Another idea for making this simple:

class ComplexObject implements QueueableComplexObject{    /* the meat of your complex object is here as before, just need to     * add the following line and the "implements" clause above     */    @Override public ComplexObject asComplexObject() { return this; }}enum NullComplexObject implements QueueableComplexObject{    INSTANCE;    @Override public ComplexObject asComplexObject() { return null; }}interface QueueableComplexObject{    public ComplexObject asComplexObject();}

Then use BlockingQueue<QueueableComplexObject> as the queue. When you wish to end the queue's processing, do queue.offer(NullComplexObject.INSTANCE). On the consumer side, do

boolean ok = true;while (ok){    ComplexObject obj = queue.take().asComplexObject();    if (obj == null)        ok = false;    else        process(obj);}/* interrupt handling elided: implement this as you see fit, * depending on whether you watch to swallow interrupts or propagate them * as in your original post */

No instanceof required, and you don't have to construct a fake ComplexObject which may be expensive/difficult depending on its implementation.


An alternative would be to wrap the processing you're doing with an ExecutorService, and let the ExecutorService itself control whether or not jobs get added to the queue.

Basically, you take advantage of ExecutorService.shutdown(), which when called disallows any more tasks from being processed by the executor.

I'm not sure how you're currently submitting tasks to the QueueConsumer in your example. I've made the assumption that you have some sort of submit() method, and used a similar method in the example.

import java.util.concurrent.*;class QueueConsumer {    private final ExecutorService executor = Executors.newSingleThreadExecutor();    public void shutdown() {        executor.shutdown(); // gracefully shuts down the executor    }    // 'Result' is a class you'll have to write yourself, if you want.    // If you don't need to provide a result, you can just Runnable    // instead of Callable.    public Future<Result> submit(final ComplexObject complexObject) {        if(executor.isShutdown()) {            // handle submitted tasks after the executor has been told to shutdown        }        return executor.submit(new Callable<Result>() {            @Override            public Result call() {                return process(complexObject);            }        });    }    private Result process(final ComplexObject complexObject) {        // Do something with the complex object.    }}

This example is just an off-the-cuff illustration of what the java.util.concurrent package offers; there are probably some optimizations that could be made to it (e.g., QueueConsumer as its own class probably isn't even necessary; you could just provide the ExecutorService to whatever producers are submitting the tasks).

Dig through the java.util.concurrent package (starting at some of the links above). You might find that it gives you a lot of great options for what you're trying to do, and you don't even have to worry about regulating the work queue.