executing a method in parallel from a call method
As already mentioned in the comments of your question, you can use Java's ForkJoin framework. This will save you the extra thread pool within your DataFetcherTask
.
You simply need to use a ForkJoinPool
in your DataClient
and convert your DataFetcherTask
into a RecursiveTask
(one of ForkJoinTask
's subtypes). This allows you to easily execute other subtasks in parallel.
So, after these modifications your code will look something like this:
DataFetcherTask
The DataFetcherTask
is now a RecursiveTask
which first generates the keys and invokes subtasks for each generated key. These subtasks are executed in the same ForkJoinPool
as the parent task.
public class DataFetcherTask extends RecursiveTask<List<DataResponse>> { private final DataRequest key; private final RestTemplate restTemplate; public DataFetcherTask(DataRequest key, RestTemplate restTemplate) { this.key = key; this.restTemplate = restTemplate; } @Override protected List<DataResponse> compute() { // Create subtasks for the key and invoke them List<DataRequestTask> requestTasks = requestTasks(generateKeys()); invokeAll(requestTasks); // All tasks are finished if invokeAll() returns. List<DataResponse> responseList = new ArrayList<>(requestTasks.size()); for (DataRequestTask task : requestTasks) { try { responseList.add(task.get()); } catch (InterruptedException | ExecutionException e) { // TODO - Handle exception properly Thread.currentThread().interrupt(); return Collections.emptyList(); } } return responseList; } private List<DataRequestTask> requestTasks(List<DataRequest> keys) { List<DataRequestTask> tasks = new ArrayList<>(keys.size()); for (DataRequest key : keys) { tasks.add(new DataRequestTask(key)); } return tasks; } // In this method I am making a HTTP call to another service // and then I will make List<DataRequest> accordingly. private List<DataRequest> generateKeys() { List<DataRequest> keys = new ArrayList<>(); // use key object which is passed in contructor to make HTTP call to another service // and then make List of DataRequest object and return keys. return keys; } /** Inner class for the subtasks. */ private static class DataRequestTask extends RecursiveTask<DataResponse> { private final DataRequest request; public DataRequestTask(DataRequest request) { this.request = request; } @Override protected DataResponse compute() { return performDataRequest(this.request); } private DataResponse performDataRequest(DataRequest key) { // This will have all LogicA code here which is shown in my original design. // everything as it is same.. return new DataResponse(DataErrorEnum.OK, DataStatusEnum.OK); } }}
DataClient
The DataClient
will not change much except for the new thread pool:
public class DataClient implements Client { private final RestTemplate restTemplate = new RestTemplate(); // Replace the ExecutorService with a ForkJoinPool private final ForkJoinPool service = new ForkJoinPool(15); @Override public List<DataResponse> getSyncData(DataRequest key) { List<DataResponse> responsList = null; Future<List<DataResponse>> responseFuture = null; try { responseFuture = getAsyncData(key); responsList = responseFuture.get(key.getTimeout(), key.getTimeoutUnit()); } catch (TimeoutException | ExecutionException | InterruptedException ex) { responsList = Collections.singletonList(new DataResponse(DataErrorEnum.CLIENT_TIMEOUT, DataStatusEnum.ERROR)); responseFuture.cancel(true); // logging exception here } return responsList; } @Override public Future<List<DataResponse>> getAsyncData(DataRequest key) { DataFetcherTask task = new DataFetcherTask(key, this.restTemplate); return this.service.submit(task); }}
Once you are on Java8 you may consider changing the implementation to CompletableFuture
s. Then it would look something like this:
DataClientCF
public class DataClientCF { private final RestTemplate restTemplate = new RestTemplate(); private final ExecutorService executor = Executors.newFixedThreadPool(15); public List<DataResponse> getData(DataRequest initialKey) { return CompletableFuture.supplyAsync(() -> generateKeys(initialKey), this.executor) .thenApply(requests -> requests.stream().map(this::supplyRequestAsync).collect(Collectors.toList())) .thenApply(responseFutures -> responseFutures.stream().map(future -> future.join()).collect(Collectors.toList())) .exceptionally(t -> { throw new RuntimeException(t); }) .join(); } private List<DataRequest> generateKeys(DataRequest key) { return new ArrayList<>(); } private CompletableFuture<DataResponse> supplyRequestAsync(DataRequest key) { return CompletableFuture.supplyAsync(() -> new DataResponse(DataErrorEnum.OK, DataStatusEnum.OK), this.executor); }}
As mentioned in the comments, Guava's ListenableFuture
s would provide similar functionality for Java7 but without Lambdas they tend to get clumsy.
As I know, RestTemplate is blocking, it is said in ForkJoinPool JavaDoc in ForkJoinTask:
Computations should avoid synchronized methods or blocks, and should minimize other blocking synchronization apart from joining other tasks or using synchronizers such as Phasers that are advertised to cooperate with fork/join scheduling. ...
Tasks should also not perform blocking IO,...
Call in call is redundant.
And you don't need two executors. Also you can return partial result in getSyncData(DataRequest key)
. This can be done like this
DataClient.java
public class DataClient implements Client { private RestTemplate restTemplate = new RestTemplate(); // first executor private ExecutorService service = Executors.newFixedThreadPool(15); @Override public List<DataResponse> getSyncData(DataRequest key) { List<DataResponse> responseList = null; DataFetcherResult response = null; try { response = getAsyncData(key); responseList = response.get(key.getTimeout(), key.getTimeoutUnit()); } catch (TimeoutException ex) { response.cancel(true); responseList = response.getPartialResult(); } return responseList; } @Override public DataFetcherResult getAsyncData(DataRequest key) { List<DataRequest> keys = generateKeys(key); final List<Future<DataResponse>> responseList = new ArrayList<>(); final CountDownLatch latch = new CountDownLatch(keys.size());//assume keys is not null for (final DataRequest _key : keys) { responseList.add(service.submit(new Callable<DataResponse>() { @Override public DataResponse call() throws Exception { DataResponse response = null; try { response = performDataRequest(_key); } finally { latch.countDown(); return response; } } })); } return new DataFetcherResult(responseList, latch); } // In this method I am making a HTTP call to another service // and then I will make List<DataRequest> accordingly. private List<DataRequest> generateKeys(DataRequest key) { List<DataRequest> keys = new ArrayList<>(); // use key object which is passed in contructor to make HTTP call to another service // and then make List of DataRequest object and return keys. return keys; } private DataResponse performDataRequest(DataRequest key) { // This will have all LogicA code here which is shown in my original design. // everything as it is same.. return null; }}
DataFetcherResult.java
public class DataFetcherResult implements Future<List<DataResponse>> { final List<Future<DataResponse>> futures; final CountDownLatch latch; public DataFetcherResult(List<Future<DataResponse>> futures, CountDownLatch latch) { this.futures = futures; this.latch = latch; } //non-blocking public List<DataResponse> getPartialResult() { List<DataResponse> result = new ArrayList<>(futures.size()); for (Future<DataResponse> future : futures) { try { result.add(future.isDone() ? future.get() : null); //instead of null you can return new DataResponse(DataErrorEnum.NOT_READY, DataStatusEnum.ERROR); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); //ExecutionException or CancellationException could be thrown, especially if DataFetcherResult was cancelled //you can handle them here and return DataResponse with corresponding DataErrorEnum and DataStatusEnum } } return result; } @Override public List<DataResponse> get() throws ExecutionException, InterruptedException { List<DataResponse> result = new ArrayList<>(futures.size()); for (Future<DataResponse> future : futures) { result.add(future.get()); } return result; } @Override public List<DataResponse> get(long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException { if (latch.await(timeout, timeUnit)) { return get(); } throw new TimeoutException();//or getPartialResult() } @Override public boolean cancel(boolean mayInterruptIfRunning) { boolean cancelled = true; for (Future<DataResponse> future : futures) { cancelled &= future.cancel(mayInterruptIfRunning); } return cancelled; } @Override public boolean isCancelled() { boolean cancelled = true; for (Future<DataResponse> future : futures) { cancelled &= future.isCancelled(); } return cancelled; } @Override public boolean isDone() { boolean done = true; for (Future<DataResponse> future : futures) { done &= future.isDone(); } return done; } //and etc.}
I wrote it with CountDownLatch
and it looks great, but note there is a nuance.You can get stuck for a little while in DataFetcherResult.get(long timeout, TimeUnit timeUnit)
because CountDownLatch
is not synchronized with future's state. And it could happen that latch.getCount() == 0
but not all futures would return future.isDone() == true
at the same time. Because they have already passed latch.countDown();
inside finally {}
Callable's block but didn't change internal state
which is still equals to NEW
.
And so calling get()
inside get(long timeout, TimeUnit timeUnit)
can cause a small delay.
Similar case was described here.
Get with timeout DataFetcherResult.get(...)
could be rewritten using futures future.get(long timeout, TimeUnit timeUnit)
and you can remove CountDownLatch
from a class.
public List<DataResponse> get(long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException{ List<DataResponse> result = new ArrayList<>(futures.size()); long timeoutMs = timeUnit.toMillis(timeout); boolean timeout = false; for (Future<DataResponse> future : futures) { long beforeGet = System.currentTimeMillis(); try { if (!timeout && timeoutMs > 0) { result.add(future.get(timeoutMs, TimeUnit.MILLISECONDS)); timeoutMs -= System.currentTimeMillis() - beforeGet; } else { if (future.isDone()) { result.add(future.get()); } else { //result.add(new DataResponse(DataErrorEnum.NOT_READY, DataStatusEnum.ERROR)); ? } } } catch (TimeoutException e) { result.add(new DataResponse(DataErrorEnum.TIMEOUT, DataStatusEnum.ERROR)); timeout = true; } //you can also handle ExecutionException or CancellationException here } return result;}
This code was given as an example and it should be tested before using in production, but seems legit :)