Java - Define a timeout for Callable within a ExecutorCompletionService
I suggest you to divide your problem into 2 separate ones:
- run on multiple threads
- use a timeout for each operation
For the first (multithreading), you already used the service executor that can manage that on 2 Threads : Executors.newFixedThreadPool(2)
. If you apply the timeout here, the timeout act for the run of all tasks, but you need a timeout for each job.
For the timout issue, you can manage it thanks to a new service executor per job in a class: JobManager.
package com.stackoverflow.q24473796;import java.util.concurrent.Callable;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;import java.util.concurrent.TimeoutException;public class JobManager implements Callable<Integer> {protected long timeout;protected TimeUnit timeUnit;protected Callable<Integer> job;public JobManager(long timeout, TimeUnit timeUnit, Callable<Integer> job) {this.timeout = timeout;this.timeUnit = timeUnit;this.job = job;}@Overridepublic Integer call() { Integer result = new Integer(-1); // default, this could be adapted ExecutorService exec = Executors.newSingleThreadExecutor(); try { result = exec.submit(job).get(timeout, timeUnit); } catch (InterruptedException | ExecutionException | TimeoutException e) { // Whatever you want if (e instanceof TimeoutException) { System.out.println("Timeout get for " + job.toString()); } else { System.out.println("exception get for " + job.toString() + " : " + e.getMessage()); } } exec.shutdown(); return result; }}
Then, you can call the tasks from your main thread as following:
Job job = new Job(i * 1000, i); Future<Integer> future = newFixedThreadPool.submit(new JobManager(5, TimeUnit.SECONDS, job));
I addapted your CallableTest: package com.stackoverflow.q24473796;
import java.util.ArrayList;import java.util.Date;import java.util.List;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import java.util.concurrent.TimeUnit;public class CallableTest { public static void main(String[] args) { ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2); List<Future<Integer>> futures = new ArrayList<Future<Integer>>(); for (int i = 10; i > 0; i--) { Job job = new Job(i * 1000, i); Future<Integer> future = newFixedThreadPool.submit(new JobManager(5, TimeUnit.SECONDS, job)); futures.add(future); } ArrayList<Integer> results = new ArrayList<Integer>(); for (Future<Integer> future : futures) { Integer result = new Integer(-1); try { result = future.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } if (result != -1) { results.add(result); } } newFixedThreadPool.shutdown(); try { newFixedThreadPool.awaitTermination(60, TimeUnit.SECONDS); //Global Timeout } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(new Date() + " results:"); for (int j : results) { System.out.println(new Date() + " " + j); } }}
And you'll get the following output:
Wed Apr 29 10:51:02 CEST 2015 10 startedWed Apr 29 10:51:02 CEST 2015 9 startedTimeout get for com.stackoverflow.q24473796.Job@249fe45cTimeout get for com.stackoverflow.q24473796.Job@249fe45cWed Apr 29 10:51:07 CEST 2015 8 startedWed Apr 29 10:51:07 CEST 2015 7 startedWed Apr 29 10:51:11 CEST 2015 9 finishedTimeout get for com.stackoverflow.q24473796.Job@3cd4c5a0Timeout get for com.stackoverflow.q24473796.Job@3cd4c5a0Wed Apr 29 10:51:12 CEST 2015 6 startedWed Apr 29 10:51:12 CEST 2015 5 startedWed Apr 29 10:51:12 CEST 2015 10 finishedWed Apr 29 10:51:14 CEST 2015 7 finishedWed Apr 29 10:51:15 CEST 2015 8 finishedWed Apr 29 10:51:17 CEST 2015 5 finishedWed Apr 29 10:51:17 CEST 2015 4 startedTimeout get for com.stackoverflow.q24473796.Job@2a0fded2Wed Apr 29 10:51:17 CEST 2015 3 startedWed Apr 29 10:51:18 CEST 2015 6 finishedWed Apr 29 10:51:20 CEST 2015 3 finishedWed Apr 29 10:51:20 CEST 2015 2 startedWed Apr 29 10:51:21 CEST 2015 4 finishedWed Apr 29 10:51:21 CEST 2015 1 startedWed Apr 29 10:51:22 CEST 2015 1 finishedWed Apr 29 10:51:22 CEST 2015 2 finishedWed Apr 29 10:51:22 CEST 2015 results:Wed Apr 29 10:51:22 CEST 2015 5Wed Apr 29 10:51:22 CEST 2015 4Wed Apr 29 10:51:22 CEST 2015 3Wed Apr 29 10:51:22 CEST 2015 2Wed Apr 29 10:51:22 CEST 2015 1
You've got Jobs running on two threads and you're reading results from your main thread. While you're waiting for one Job to finish, another Job is running without you waiting. I think you'll find you get the results you expect if you change your thread pool to a size of 1.
If you have to run the jobs on multiple threads, you'll have to come up with a way to track how long a Job has already been running by the time you start waiting for a result. If the Job has already been running for longer than 5 seconds, reject it. If not, wait for 5 seconds minus the amount of time it's already been running.
One option would be to store the start time in the Job. Then you could do something like
long elapsedTime = System.currentTimeMillis() - job.getStartTime(); if (elapsedTime < 5000) { future.get(5000 - elapsedTime, TimeUnit.MILLISECONDS); }
You can use a ScheduledExecutorService for this. First you would submit each task and retain the futures that are created. After that you can submit a new task that would cancel the retained future after some period of time.
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); for (int i = 10; i > 0; i--) { Job job = new Job(i * 1000, i); final Future handler = executor.submit(job); executor.schedule(new Runnable(){ public void run(){ handler.cancel(); } }, 5000, TimeUnit.MILLISECONDS); futures.add(handler); }
This will execute your handler (main functionality to be interrupted) for 5 seconds, then will cancel (i.e. interrupt) that specific task.
At this point you know that no Job will be allowed to run for more than 5 seconds, so
Integer content = future.get(5, TimeUnit.SECONDS);
Should work fine.