Java Executors: how can I set task priority?
Currently the only concrete implementations of the Executor interface are the ThreadPoolExecutor and the ScheduledThreadpoolExecutor
Instead of using the utility / factory class Executors, you should create an instance using a constructor.
You can pass a BlockingQueue to the constructors of the ThreadPoolExecutor.
One of the implementations of the BlockingQueue, the PriorityBlockingQueue lets you pass a Comparator to a constructor, that way enabling you to decide the order of execution.
The idea here is to use a PriorityBlockingQueue in the executor. For this:
- Create a comparator that would compare our futures.
- Create a proxy for the Future to hold a priority.
- Override 'newTaskFor' in order to wrap every future in our proxy.
First you need to hold priority on your future:
class PriorityFuture<T> implements RunnableFuture<T> { private RunnableFuture<T> src; private int priority; public PriorityFuture(RunnableFuture<T> other, int priority) { this.src = other; this.priority = priority; } public int getPriority() { return priority; } public boolean cancel(boolean mayInterruptIfRunning) { return src.cancel(mayInterruptIfRunning); } public boolean isCancelled() { return src.isCancelled(); } public boolean isDone() { return src.isDone(); } public T get() throws InterruptedException, ExecutionException { return src.get(); } public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return src.get(); } public void run() { src.run(); }}
Next you need to define comparator that would correctly sort the priority futures:
class PriorityFutureComparator implements Comparator<Runnable> { public int compare(Runnable o1, Runnable o2) { if (o1 == null && o2 == null) return 0; else if (o1 == null) return -1; else if (o2 == null) return 1; else { int p1 = ((PriorityFuture<?>) o1).getPriority(); int p2 = ((PriorityFuture<?>) o2).getPriority(); return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1); } }}
Next let's assume we have a lengthy job like this:
class LenthyJob implements Callable<Long> { private int priority; public LenthyJob(int priority) { this.priority = priority; } public Long call() throws Exception { System.out.println("Executing: " + priority); long num = 1000000; for (int i = 0; i < 1000000; i++) { num *= Math.random() * 1000; num /= Math.random() * 1000; if (num == 0) num = 1000000; } return num; } public int getPriority() { return priority; }}
Then in order to execute these jobs in priority the code will look like:
public class TestPQ { public static void main(String[] args) throws InterruptedException, ExecutionException { int nThreads = 2; int qInitialSize = 10; ExecutorService exec = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>(qInitialSize, new PriorityFutureComparator())) { protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { RunnableFuture<T> newTaskFor = super.newTaskFor(callable); return new PriorityFuture<T>(newTaskFor, ((LenthyJob) callable).getPriority()); } }; for (int i = 0; i < 20; i++) { int priority = (int) (Math.random() * 100); System.out.println("Scheduling: " + priority); LenthyJob job = new LenthyJob(priority); exec.submit(job); } }}
This is a lot of code but that's nearly the only way this can be accomplished.
On my machine the output is like the following:
Scheduling: 39Scheduling: 90Scheduling: 88Executing: 39Scheduling: 75Executing: 90Scheduling: 15Scheduling: 2Scheduling: 5Scheduling: 24Scheduling: 82Scheduling: 81Scheduling: 3Scheduling: 23Scheduling: 7Scheduling: 40Scheduling: 77Scheduling: 49Scheduling: 34Scheduling: 22Scheduling: 97Scheduling: 33Executing: 2Executing: 3Executing: 5Executing: 7Executing: 15Executing: 22Executing: 23Executing: 24Executing: 33Executing: 34Executing: 40Executing: 49Executing: 75Executing: 77Executing: 81Executing: 82Executing: 88Executing: 97
You can implement your own ThreadFactory and set it within ThreadPoolExecutor like this:
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, numOfWorkerThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());threadPool.setThreadFactory(new OpJobThreadFactory(Thread.NORM_PRIORITY-2));
where my OpJobThreadFactory looks like the following:
public final static class OpJobThreadFactory implements ThreadFactory { private int priority; private boolean daemon; private final String namePrefix; private static final AtomicInteger poolNumber = new AtomicInteger(1); private final AtomicInteger threadNumber = new AtomicInteger(1); public OpJobThreadFactory(int priority) { this(priority, true); } public OpJobThreadFactory(int priority, boolean daemon) { this.priority = priority; this.daemon = daemon; namePrefix = "jobpool-" +poolNumber.getAndIncrement() + "-thread-"; } @Override public Thread newThread(Runnable r) { Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement()); t.setDaemon(daemon); t.setPriority(priority); return t; }}