Java - Define a timeout for Callable within a ExecutorCompletionService Java - Define a timeout for Callable within a ExecutorCompletionService multithreading multithreading

Java - Define a timeout for Callable within a ExecutorCompletionService


I suggest you to divide your problem into 2 separate ones:

  1. run on multiple threads
  2. use a timeout for each operation

For the first (multithreading), you already used the service executor that can manage that on 2 Threads : Executors.newFixedThreadPool(2). If you apply the timeout here, the timeout act for the run of all tasks, but you need a timeout for each job.

For the timout issue, you can manage it thanks to a new service executor per job in a class: JobManager.

package com.stackoverflow.q24473796;import java.util.concurrent.Callable;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;import java.util.concurrent.TimeoutException;public class JobManager implements Callable<Integer> {protected long timeout;protected TimeUnit timeUnit;protected Callable<Integer> job;public JobManager(long timeout, TimeUnit timeUnit, Callable<Integer> job) {this.timeout = timeout;this.timeUnit = timeUnit;this.job = job;}@Overridepublic Integer call() {    Integer result = new Integer(-1); // default, this could be adapted    ExecutorService exec = Executors.newSingleThreadExecutor();    try {        result = exec.submit(job).get(timeout, timeUnit);    } catch (InterruptedException | ExecutionException | TimeoutException e) {        // Whatever you want        if (e instanceof TimeoutException) {            System.out.println("Timeout get for " + job.toString());        } else {            System.out.println("exception get for " + job.toString() + " : " + e.getMessage());        }    }    exec.shutdown();    return result;    }}

Then, you can call the tasks from your main thread as following:

    Job job = new Job(i * 1000, i);    Future<Integer> future = newFixedThreadPool.submit(new JobManager(5, TimeUnit.SECONDS, job));

I addapted your CallableTest: package com.stackoverflow.q24473796;

import java.util.ArrayList;import java.util.Date;import java.util.List;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import java.util.concurrent.TimeUnit;public class CallableTest {    public static void main(String[] args) {        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);        List<Future<Integer>> futures = new ArrayList<Future<Integer>>();        for (int i = 10; i > 0; i--) {            Job job = new Job(i * 1000, i);            Future<Integer> future = newFixedThreadPool.submit(new   JobManager(5, TimeUnit.SECONDS, job));            futures.add(future);        }        ArrayList<Integer> results = new ArrayList<Integer>();        for (Future<Integer> future : futures) {            Integer result = new Integer(-1);            try {                result = future.get();            } catch (InterruptedException | ExecutionException e) {                e.printStackTrace();            }            if (result != -1) {                results.add(result);            }        }        newFixedThreadPool.shutdown();        try {            newFixedThreadPool.awaitTermination(60, TimeUnit.SECONDS); //Global Timeout        } catch (InterruptedException e) {            e.printStackTrace();        }        System.out.println(new Date() + " results:");        for (int j : results) {            System.out.println(new Date() + " " + j);        }    }}

And you'll get the following output:

Wed Apr 29 10:51:02 CEST 2015 10 startedWed Apr 29 10:51:02 CEST 2015 9 startedTimeout get for com.stackoverflow.q24473796.Job@249fe45cTimeout get for com.stackoverflow.q24473796.Job@249fe45cWed Apr 29 10:51:07 CEST 2015 8 startedWed Apr 29 10:51:07 CEST 2015 7 startedWed Apr 29 10:51:11 CEST 2015 9 finishedTimeout get for com.stackoverflow.q24473796.Job@3cd4c5a0Timeout get for com.stackoverflow.q24473796.Job@3cd4c5a0Wed Apr 29 10:51:12 CEST 2015 6 startedWed Apr 29 10:51:12 CEST 2015 5 startedWed Apr 29 10:51:12 CEST 2015 10 finishedWed Apr 29 10:51:14 CEST 2015 7 finishedWed Apr 29 10:51:15 CEST 2015 8 finishedWed Apr 29 10:51:17 CEST 2015 5 finishedWed Apr 29 10:51:17 CEST 2015 4 startedTimeout get for com.stackoverflow.q24473796.Job@2a0fded2Wed Apr 29 10:51:17 CEST 2015 3 startedWed Apr 29 10:51:18 CEST 2015 6 finishedWed Apr 29 10:51:20 CEST 2015 3 finishedWed Apr 29 10:51:20 CEST 2015 2 startedWed Apr 29 10:51:21 CEST 2015 4 finishedWed Apr 29 10:51:21 CEST 2015 1 startedWed Apr 29 10:51:22 CEST 2015 1 finishedWed Apr 29 10:51:22 CEST 2015 2 finishedWed Apr 29 10:51:22 CEST 2015 results:Wed Apr 29 10:51:22 CEST 2015 5Wed Apr 29 10:51:22 CEST 2015 4Wed Apr 29 10:51:22 CEST 2015 3Wed Apr 29 10:51:22 CEST 2015 2Wed Apr 29 10:51:22 CEST 2015 1


You've got Jobs running on two threads and you're reading results from your main thread. While you're waiting for one Job to finish, another Job is running without you waiting. I think you'll find you get the results you expect if you change your thread pool to a size of 1.

If you have to run the jobs on multiple threads, you'll have to come up with a way to track how long a Job has already been running by the time you start waiting for a result. If the Job has already been running for longer than 5 seconds, reject it. If not, wait for 5 seconds minus the amount of time it's already been running.

One option would be to store the start time in the Job. Then you could do something like

 long elapsedTime = System.currentTimeMillis() - job.getStartTime(); if (elapsedTime < 5000) {     future.get(5000 - elapsedTime, TimeUnit.MILLISECONDS); }


You can use a ScheduledExecutorService for this. First you would submit each task and retain the futures that are created. After that you can submit a new task that would cancel the retained future after some period of time.

 ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);  for (int i = 10; i > 0; i--) {        Job job = new Job(i * 1000, i);        final Future handler = executor.submit(job);        executor.schedule(new Runnable(){            public void run(){                handler.cancel();            }        }, 5000, TimeUnit.MILLISECONDS);        futures.add(handler); }

This will execute your handler (main functionality to be interrupted) for 5 seconds, then will cancel (i.e. interrupt) that specific task.

At this point you know that no Job will be allowed to run for more than 5 seconds, so

Integer content = future.get(5, TimeUnit.SECONDS);

Should work fine.