Execute a few threads in parallel and a few in serial by CompletableFuture Execute a few threads in parallel and a few in serial by CompletableFuture multithreading multithreading

Execute a few threads in parallel and a few in serial by CompletableFuture


Awesome question. Though, technically, it is surely possible to do this using ExecutorService and Future purely, the better way as per me will be to use reactive programming rather than depend purely on Future or CompletableFuture or CompletionService and the like. The main reason is that it may quickly become a difficult-to-read code.

Here is how I did it using RxJava 2.2.16 and ExecutorService:

  1. Execute actions that don't depend on others or all of their dependencies have been completed using ExecutorService to submit() actions.
  2. To know that an action is completed, use BehaviorSubject of RxJava. When an action is complete, trigger step (1) for each of its dependencies.
  3. Shutdown the ExecutorService when all actions are completed. For this, use another BehaviorSubject.

I am sorry, I have written the entire logic in my own way because of the new approach. But it is still around the main requirement given by you. It will be good to first look at the Action model class and createActions() method in AppRxjava. from there, you should be able to follow the code. To simulate some time consumption, I have used the famous Thread.sleep() technique.

public class AppRxJava{    /* To listen to the completion of a task, so that the dependent tasks may be scheduled. */    private Subject<Action> completionSub = io.reactivex.subjects.BehaviorSubject.create();    /* To listen to the completion of all tasks, so that ExecutorService may shut down. */    private Subject<Boolean> allActionCompletedSub = io.reactivex.subjects.BehaviorSubject.create();    private ExecutorService SVC = Executors.newCachedThreadPool();    private List<Action> allActions;    public static void main( String[] args ){        new AppRxJava().start();    }    private void start() {        this.allActions = createActions();        subscribeToActionCompletions();        subscribeToSvcShutdown();        startAllActions( this.allActions );    }    private void subscribeToSvcShutdown(){        /* If all actions have been completed, shut down the ExecutorService. */        this.allActionCompletedSub.subscribe( allScheduled -> {            if( allScheduled ) {                SVC.shutdown();                try {                    SVC.awaitTermination( 2, TimeUnit.SECONDS );                } catch (InterruptedException e) {                    // TODO Auto-generated catch block                    e.printStackTrace();                }            }        });    }    private void subscribeToActionCompletions(){        this.completionSub.subscribe( complAction -> {            /* Get the actions that are dependent on this recently completed action and "attempt" to start them. */            List<Action> deps = getDeps( complAction, this.allActions );            startAllActions( deps );            /* If all actions have got completed, raise the flag. */            if( allActionsCompleted() ) this.allActionCompletedSub.onNext( true );        });    }    /* Attempts to start all actions that are present in the passed list. */    private void startAllActions( List<Action> actions ){        for( Action action : actions ) {            startAction( action, actions );        }    }    /* Attempts to start an action. Only if it is still pending and all of its dependencies are completed. */    private void startAction( Action a, List<Action> list ){        if( !a.isPending() ) return;        if( !allDepsCompleted( a, allActions ) ) return;        if( a.isPending() ) {            synchronized (a.LOCK ) {                if( a.isPending() ) {                    a.setStatus( 1 ); //Set to running, so that it is not picked up twice.                     SVC.submit( () -> {                        try {                            a.getAction().call();                        } catch (Exception e) {                            // TODO Auto-generated catch block                            e.printStackTrace();                        }                        a.setStatus( 2 ); //Set to completed. (We may have to synchronize this.)                        this.completionSub.onNext( a );                    } );                }            }        }    }    private boolean allActionsCompleted(){        for( Action a : this.allActions ) if( !a.isCompleted() ) return false;        return true;    }    private static boolean allDepsCompleted( Action a, List<Action> allActions ){        for( Action dep : allActions ) {            if( a.getDependencies().contains( dep ) && !dep.isCompleted() ) return false;        }        return true;    }    /* Returns the actions that are dependent on Action <code>a</code>. */    private List<Action> getDeps( Action a, List<Action> list ){        List<Action> deps = new ArrayList<>();        for( Action dep : list ) if( dep.getDependencies().contains( a ) ) deps.add( dep );        return deps;    }    /* Creates the action list with respective dependencies. */    private List<Action> createActions(){        List<Action> actions = new ArrayList<>();        Action a = createAction( 5000, "ServiceA", null );        Action b = createAction( 5000, "ServiceB", null );        Action c = createAction( 2000, "ServiceC", a, b );        Action d = createAction( 2000, "ServiceD", c );        Action e = createAction( 2000, "ServiceE", d );        actions.add( a ); actions.add( b ); actions.add( c ); actions.add( d ); actions.add( e );        return actions;    }    private Action createAction( final long sleepMillis, final String name, Action... dependencies ) {        List<Action> deps = null;        if( dependencies != null ) {            deps = new ArrayList<>();            for( Action a : dependencies ) deps.add( a );        }        return Action.of( () -> {            System.out.println( "Service (" + name + ") started" );            try {                Thread.sleep( sleepMillis );            } catch (InterruptedException e) {                // TODO Auto-generated catch block                e.printStackTrace();            }            System.out.println( "Service (" + name + ") completed" );            return true;        }, name, deps );    }}

And the Action model class. This represents one action and a list of actions that it is dependent upon. (A slight difference from your original representation. But either way is OK, if you handle it accordingly, I think.)

public class Action{    Callable<Boolean> action;    String name;    List<Action> dependencies = new ArrayList<>();    AtomicInteger status = new AtomicInteger( 0 ); //0 = Pending, 1 = Scheduled, 2 = Completed    public static final Object LOCK = new Object();    private Action(Callable<Boolean> action, String name, List<Action> dependencies) {        super();        this.action = action;        this.name = name;        if( dependencies != null ) this.dependencies = dependencies;    }    public static Action of( Callable<Boolean> action, String name, List<Action> dependencies ){        return new Action( action, name, dependencies );    }    public Callable<Boolean> getAction(){        return action;    }    public String getName(){        return name;    }    public List<Action> getDependencies(){        return dependencies;    }    public boolean isCompleted(){        return this.status.get() == 2;    }    public boolean isPending(){        return this.status.get() == 0;    }    public boolean isScheduled(){        return this.status.get() == 1;    }    public void setStatus( int status ){        this.status.getAndSet( status );    }    @Override    public int hashCode(){        final int prime = 31;        int result = 1;        result = prime * result + ((name == null) ? 0 : name.hashCode());        return result;    }    @Override    public boolean equals( Object obj ){        if (this == obj) return true;        if (obj == null) return false;        if (getClass() != obj.getClass()) return false;        Action other = (Action) obj;        if (name == null) {            if (other.name != null)                return false;        } else if (!name.equalsIgnoreCase( other.name )) return false;        return true;    }}


thenCombine can be used to express dependencies between CompletionStages allowing you to perform a task after both have completed. You can then preform the subsequent actions with thenApply:

CompletionStage<ServiceAResponse> serviceAResponse = callServiceA();CompletionStage<ServiceBResponse> serviceBResponse = callServiceB();CompletionStage<ServiceEResponse> result = serviceA.thenCombine(serviceBResponse, (aResponse, bResponse) -> serviceC.call(aResponse, bResponse))                                                                                              .thenApply(cResponse -> serviceD.call(cResponse))                                                                                             .thenApply(dResponse -> serviceE.call(eResponse))public CompletionStage<ServiceAResponse> callServiceA() {    return CompletableFuture.supplyAsync(() -> serviceA.call());}public CompletionStage<ServiceBResponse> callServiceB() {    return CompletableFuture.supplyAsync(() -> serviceB.call());}


Just couldn't take my mind off the basic question of doing it with pure Java. So, here is a modified version of my earlier answer. This answer contains both styles - RxJava and ExecutorService. It contains 3 classes:

  1. DependentSeriesOfActionsBase: A base class that contains some reusable methods and common fields. This is just for convenience and easy understanding of the code.
  2. DependentSeriesOfActionsCoreJava: This is the ExecutorService based implementation. I am using Future.get() to wait for the results of an action, with the difference that the waiting itself is happening asynchronously. Take a look at startAction().
  3. DependentSeriesOfActionsRxJava: The earlier RxJava based implementation.

Code: DependentSeriesOfActionsBase

abstract class DependentSeriesOfActionsBase{    protected List<Action> allActions;    protected ExecutorService SVC = Executors.newCachedThreadPool();    protected boolean allActionsCompleted(){        for( Action a : this.allActions ) if( !a.isCompleted() ) return false;        return true;    }    protected static boolean allDepsCompleted( Action a, List<Action> allActions ){        for( Action dep : allActions ) {            if( a.getDependencies().contains( dep ) && !dep.isCompleted() ) return false;        }        return true;    }    /* Returns the actions that are dependent on Action <code>a</code>. */    protected List<Action> getDeps( Action a, List<Action> list ){        List<Action> deps = new ArrayList<>();        for( Action dep : list ) if( dep.getDependencies().contains( a ) ) deps.add( dep );        return deps;    }    /* Creates the action list with respective dependencies. */    protected List<Action> createActions(){        List<Action> actions = new ArrayList<>();        Action a = createAction( 5000, "ServiceA", null );        Action b = createAction( 5000, "ServiceB", null );        Action c = createAction( 2000, "ServiceC", a, b );        Action d = createAction( 2000, "ServiceD", c );        Action e = createAction( 2000, "ServiceE", d );        actions.add( a ); actions.add( b ); actions.add( c ); actions.add( d ); actions.add( e );        return actions;    }    protected Action createAction( final long sleepMillis, final String name, Action... dependencies ) {        List<Action> deps = null;        if( dependencies != null ) {            deps = new ArrayList<>();            for( Action a : dependencies ) deps.add( a );        }        return Action.of( () -> {            System.out.println( "Service (" + name + ") started" );            try {                Thread.sleep( sleepMillis );            } catch (InterruptedException e) {                // TODO Auto-generated catch block                e.printStackTrace();            }            System.out.println( "Service (" + name + ") completed" );            return true;        }, name, deps );    }    /* Attempts to start all actions that are present in the passed list. */    protected void startAllActions( List<Action> actions ){        for( Action action : actions ) {            startAction( action, actions );        }    }    protected abstract void startAction( Action action, List<Action> actions );    protected void shutdown(){        SVC.shutdown();        try {            SVC.awaitTermination( 2, TimeUnit.SECONDS );        } catch (InterruptedException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }    }}

Code: DependentSeriesOfActionsCoreJava

public class DependentSeriesOfActionsCoreJava extends DependentSeriesOfActionsBase{    public static void main( String[] args ){        new DependentSeriesOfActionsCoreJava().start();    }    private void start() {        this.allActions = createActions();        startAllActions( this.allActions );    }    protected void startAction( Action a, List<Action> list ){        if( !a.isPending() ) return;        if( !allDepsCompleted( a, allActions ) ) return;        if( a.isPending() ) {            synchronized (a.LOCK ) {                if( a.isPending() ) {                    a.setStatus( 1 ); //Set to running, so that it is not picked up twice.                     /* Submit the action to the ExecutorService and get the handle to the Future. */                    final Future<?> fut = SVC.submit( () -> a.action.call() );                    /* Submit the Future.get() action to the ExecutorService and execute the dependencies when it returns. */                    SVC.submit( () -> {                        Object returnVal = null;                        /* Wait */                        try {                            fut.get();                             a.setStatus( 2 );                            /* If all actions are completed, shut down the ExecutorService. */                            if( allActionsCompleted() ) shutdown();                        } catch (InterruptedException | ExecutionException e) {                            e.printStackTrace();                        }                        startAllActions( getDeps( a, this.allActions ) );                    } );                }            }        }    }}

Code: DependentSeriesOfActionsRxJava

public class DependentSeriesOfActionsRxJava extends DependentSeriesOfActionsBase{    /* To listen to the completion of a task, so that the dependent tasks may be scheduled. */    private Subject<Action> completionSub = io.reactivex.subjects.BehaviorSubject.create();    /* To listen to the completion of all tasks, so that ExecutorService may shut down. */    private Subject<Boolean> allActionCompletedSub = io.reactivex.subjects.BehaviorSubject.create();    public static void main( String[] args ){        new DependentSeriesOfActionsRxJava().start();    }    private void start() {        this.allActions = createActions();        subscribeToActionCompletions();        subscribeToSvcShutdown();        startAllActions( this.allActions );    }    private void subscribeToSvcShutdown(){        /* If all actions have been completed, shut down the ExecutorService. */        this.allActionCompletedSub.subscribe( allScheduled -> {            if( allScheduled ) shutdown();        });    }    private void subscribeToActionCompletions(){        this.completionSub.subscribe( complAction -> {            /* Get the actions that are dependent on this recently completed action and "attempt" to start them. */            List<Action> deps = getDeps( complAction, this.allActions );            startAllActions( deps );            /* If all actions have got completed, raise the flag. */            if( allActionsCompleted() ) this.allActionCompletedSub.onNext( true );        });    }    /* Attempts to start an action. Only if it is still pending and all of its dependencies are completed. */    protected void startAction( Action a, List<Action> list ){        if( !a.isPending() ) return;        if( !allDepsCompleted( a, allActions ) ) return;        if( a.isPending() ) {            synchronized (a.LOCK ) {                if( a.isPending() ) {                    a.setStatus( 1 ); //Set to running, so that it is not picked up twice.                     SVC.submit( () -> {                        try {                            a.getAction().call();                        } catch (Exception e) {                            // TODO Auto-generated catch block                            e.printStackTrace();                        }                        a.setStatus( 2 ); //Set to completed. (We may have to synchronize this.)                        this.completionSub.onNext( a );                    } );                }            }        }    }}