executing a method in parallel from a call method executing a method in parallel from a call method multithreading multithreading

executing a method in parallel from a call method


As already mentioned in the comments of your question, you can use Java's ForkJoin framework. This will save you the extra thread pool within your DataFetcherTask.

You simply need to use a ForkJoinPool in your DataClient and convert your DataFetcherTask into a RecursiveTask (one of ForkJoinTask's subtypes). This allows you to easily execute other subtasks in parallel.

So, after these modifications your code will look something like this:

DataFetcherTask

The DataFetcherTask is now a RecursiveTask which first generates the keys and invokes subtasks for each generated key. These subtasks are executed in the same ForkJoinPool as the parent task.

public class DataFetcherTask extends RecursiveTask<List<DataResponse>> {  private final DataRequest key;  private final RestTemplate restTemplate;  public DataFetcherTask(DataRequest key, RestTemplate restTemplate) {      this.key = key;      this.restTemplate = restTemplate;  }  @Override  protected List<DataResponse> compute() {    // Create subtasks for the key and invoke them    List<DataRequestTask> requestTasks = requestTasks(generateKeys());    invokeAll(requestTasks);    // All tasks are finished if invokeAll() returns.    List<DataResponse> responseList = new ArrayList<>(requestTasks.size());    for (DataRequestTask task : requestTasks) {      try {        responseList.add(task.get());      } catch (InterruptedException | ExecutionException e) {        // TODO - Handle exception properly        Thread.currentThread().interrupt();        return Collections.emptyList();      }    }    return responseList;  }  private List<DataRequestTask> requestTasks(List<DataRequest> keys) {    List<DataRequestTask> tasks = new ArrayList<>(keys.size());    for (DataRequest key : keys) {      tasks.add(new DataRequestTask(key));    }    return tasks;  }  // In this method I am making a HTTP call to another service  // and then I will make List<DataRequest> accordingly.  private List<DataRequest> generateKeys() {      List<DataRequest> keys = new ArrayList<>();      // use key object which is passed in contructor to make HTTP call to another service      // and then make List of DataRequest object and return keys.      return keys;  }  /** Inner class for the subtasks. */  private static class DataRequestTask extends RecursiveTask<DataResponse> {    private final DataRequest request;    public DataRequestTask(DataRequest request) {      this.request = request;    }    @Override    protected DataResponse compute() {      return performDataRequest(this.request);    }    private DataResponse performDataRequest(DataRequest key) {      // This will have all LogicA code here which is shown in my original design.      // everything as it is same..      return new DataResponse(DataErrorEnum.OK, DataStatusEnum.OK);    }  }}

DataClient

The DataClient will not change much except for the new thread pool:

public class DataClient implements Client {  private final RestTemplate restTemplate = new RestTemplate();  // Replace the ExecutorService with a ForkJoinPool  private final ForkJoinPool service = new ForkJoinPool(15);  @Override  public List<DataResponse> getSyncData(DataRequest key) {      List<DataResponse> responsList = null;      Future<List<DataResponse>> responseFuture = null;      try {          responseFuture = getAsyncData(key);          responsList = responseFuture.get(key.getTimeout(), key.getTimeoutUnit());      } catch (TimeoutException | ExecutionException | InterruptedException ex) {          responsList = Collections.singletonList(new DataResponse(DataErrorEnum.CLIENT_TIMEOUT, DataStatusEnum.ERROR));          responseFuture.cancel(true);          // logging exception here      }      return responsList;  }  @Override  public Future<List<DataResponse>> getAsyncData(DataRequest key) {      DataFetcherTask task = new DataFetcherTask(key, this.restTemplate);      return this.service.submit(task);  }}

Once you are on Java8 you may consider changing the implementation to CompletableFutures. Then it would look something like this:

DataClientCF

public class DataClientCF {  private final RestTemplate restTemplate = new RestTemplate();  private final ExecutorService executor = Executors.newFixedThreadPool(15);  public List<DataResponse> getData(DataRequest initialKey) {    return CompletableFuture.supplyAsync(() -> generateKeys(initialKey), this.executor)      .thenApply(requests -> requests.stream().map(this::supplyRequestAsync).collect(Collectors.toList()))      .thenApply(responseFutures -> responseFutures.stream().map(future -> future.join()).collect(Collectors.toList()))      .exceptionally(t -> { throw new RuntimeException(t); })      .join();  }  private List<DataRequest> generateKeys(DataRequest key) {    return new ArrayList<>();  }  private CompletableFuture<DataResponse> supplyRequestAsync(DataRequest key) {    return CompletableFuture.supplyAsync(() -> new DataResponse(DataErrorEnum.OK, DataStatusEnum.OK), this.executor);  }}

As mentioned in the comments, Guava's ListenableFutures would provide similar functionality for Java7 but without Lambdas they tend to get clumsy.


As I know, RestTemplate is blocking, it is said in ForkJoinPool JavaDoc in ForkJoinTask:

Computations should avoid synchronized methods or blocks, and should minimize other blocking synchronization apart from joining other tasks or using synchronizers such as Phasers that are advertised to cooperate with fork/join scheduling. ...
Tasks should also not perform blocking IO,...

Call in call is redundant.
And you don't need two executors. Also you can return partial result in getSyncData(DataRequest key). This can be done like this

DataClient.java

public class DataClient implements Client {    private RestTemplate restTemplate = new RestTemplate();    // first executor    private ExecutorService service = Executors.newFixedThreadPool(15);    @Override    public List<DataResponse> getSyncData(DataRequest key) {        List<DataResponse> responseList = null;        DataFetcherResult response = null;        try {            response = getAsyncData(key);            responseList = response.get(key.getTimeout(), key.getTimeoutUnit());        } catch (TimeoutException ex) {            response.cancel(true);            responseList = response.getPartialResult();        }        return responseList;    }    @Override    public DataFetcherResult getAsyncData(DataRequest key) {        List<DataRequest> keys = generateKeys(key);        final List<Future<DataResponse>> responseList = new ArrayList<>();        final CountDownLatch latch = new CountDownLatch(keys.size());//assume keys is not null        for (final DataRequest _key : keys) {            responseList.add(service.submit(new Callable<DataResponse>() {                @Override                public DataResponse call() throws Exception {                    DataResponse response = null;                    try {                        response = performDataRequest(_key);                    } finally {                        latch.countDown();                        return response;                    }                }            }));        }        return new DataFetcherResult(responseList, latch);    }    // In this method I am making a HTTP call to another service    // and then I will make List<DataRequest> accordingly.    private List<DataRequest> generateKeys(DataRequest key) {        List<DataRequest> keys = new ArrayList<>();        // use key object which is passed in contructor to make HTTP call to another service        // and then make List of DataRequest object and return keys.        return keys;    }    private DataResponse performDataRequest(DataRequest key) {        // This will have all LogicA code here which is shown in my original design.        // everything as it is same..        return null;    }}

DataFetcherResult.java

public class DataFetcherResult implements Future<List<DataResponse>> {    final List<Future<DataResponse>> futures;    final CountDownLatch latch;    public DataFetcherResult(List<Future<DataResponse>> futures, CountDownLatch latch) {        this.futures = futures;        this.latch = latch;    }    //non-blocking    public List<DataResponse> getPartialResult() {        List<DataResponse> result = new ArrayList<>(futures.size());        for (Future<DataResponse> future : futures) {            try {                result.add(future.isDone() ? future.get() : null);                //instead of null you can return new DataResponse(DataErrorEnum.NOT_READY, DataStatusEnum.ERROR);            } catch (InterruptedException | ExecutionException e) {                e.printStackTrace();                //ExecutionException or CancellationException could be thrown, especially if DataFetcherResult was cancelled                //you can handle them here and return DataResponse with corresponding DataErrorEnum and DataStatusEnum            }        }        return result;    }    @Override    public List<DataResponse> get() throws ExecutionException, InterruptedException {        List<DataResponse> result = new ArrayList<>(futures.size());        for (Future<DataResponse> future : futures) {            result.add(future.get());        }        return result;    }    @Override    public List<DataResponse> get(long timeout, TimeUnit timeUnit)            throws ExecutionException, InterruptedException, TimeoutException {        if (latch.await(timeout, timeUnit)) {            return get();        }        throw new TimeoutException();//or getPartialResult()    }    @Override    public boolean cancel(boolean mayInterruptIfRunning) {        boolean cancelled = true;        for (Future<DataResponse> future : futures) {            cancelled &= future.cancel(mayInterruptIfRunning);        }        return cancelled;    }    @Override    public boolean isCancelled() {        boolean cancelled = true;        for (Future<DataResponse> future : futures) {            cancelled &= future.isCancelled();        }        return cancelled;    }    @Override    public boolean isDone() {        boolean done = true;        for (Future<DataResponse> future : futures) {            done &= future.isDone();        }        return done;    }    //and etc.}

I wrote it with CountDownLatch and it looks great, but note there is a nuance.You can get stuck for a little while in DataFetcherResult.get(long timeout, TimeUnit timeUnit) because CountDownLatch is not synchronized with future's state. And it could happen that latch.getCount() == 0 but not all futures would return future.isDone() == true at the same time. Because they have already passed latch.countDown(); inside finally {} Callable's block but didn't change internal state which is still equals to NEW.
And so calling get() inside get(long timeout, TimeUnit timeUnit) can cause a small delay.
Similar case was described here.

Get with timeout DataFetcherResult.get(...) could be rewritten using futures future.get(long timeout, TimeUnit timeUnit) and you can remove CountDownLatch from a class.

public List<DataResponse> get(long timeout, TimeUnit timeUnit)        throws ExecutionException, InterruptedException{    List<DataResponse> result = new ArrayList<>(futures.size());    long timeoutMs = timeUnit.toMillis(timeout);    boolean timeout = false;    for (Future<DataResponse> future : futures) {        long beforeGet = System.currentTimeMillis();        try {            if (!timeout && timeoutMs > 0) {                result.add(future.get(timeoutMs, TimeUnit.MILLISECONDS));                timeoutMs -= System.currentTimeMillis() - beforeGet;            } else {                if (future.isDone()) {                    result.add(future.get());                } else {                    //result.add(new DataResponse(DataErrorEnum.NOT_READY, DataStatusEnum.ERROR)); ?                }            }        } catch (TimeoutException e) {            result.add(new DataResponse(DataErrorEnum.TIMEOUT, DataStatusEnum.ERROR));            timeout = true;        }        //you can also handle ExecutionException or CancellationException here    }    return result;}

This code was given as an example and it should be tested before using in production, but seems legit :)