use FutureTask for concurrency use FutureTask for concurrency multithreading multithreading

use FutureTask for concurrency


In Java 8 you can use CompletableFuture to chain your filters after each other. Use the thenApply and thenCompose family of methods in order to add new asynchronous filters to the CompletableFuture - they will execute after the previous step is finished. thenCombine combines two independent CompletableFutures when both are finished. Use allOf to wait for the result of more than two CompletableFuture objects.

If you can't use Java 8, then the Guava ListenableFuture can do the same, see Listenable Future Explained. With Guava you can wait for multiple independently running filters to finish with Futures.allAsList - this also returns a ListenableFuture.

With both approaches the idea is that after you declare your future actions, their dependencies on each other, and their threads, you get back a single Future object, which encapsulates your end result.

EDIT: The early return could be implemented by explicitly completing the CompletableFuture with the complete() method or using a Guava SettableFuture (which implements ListenableFuture)


You can use a ForkJoinPool for parallelization, which is explicitely thought for that kind of parallel computions:

(...) Method join() and its variants are appropriate for use only when completion dependencies are acyclic; that is, the parallel computation can be described as a directed acyclic graph (DAG) (...)

(see ForkJoinTask)

The advantage of a ForkJoinPool is that every task can spawn new tasks and also wait for them to complete without actually blocking the executing thread (which otherwise might cause a deadlock if more tasks are waiting for others to complete than threads are available).

This is an example that should work so far, although it has some limitations yet:

  1. It ignores filter results
  2. It does not prematurely finish execution if filter 2 returns true
  3. Exception handling is not implemented

The main idea behind this code: Every filter is represented as Node that may depend on other nodes (= filters that must complete before this filter can execute). Dependent nodes are spawned as parallel tasks.

import java.util.Arrays;import java.util.HashSet;import java.util.Set;import java.util.concurrent.Callable;import java.util.concurrent.ForkJoinPool;import java.util.concurrent.RecursiveTask;public class Node<V> extends RecursiveTask<V> {    private static final short VISITED = 1;    private final Callable<V> callable;    private final Set<Node<V>> dependencies = new HashSet<>();    @SafeVarargs    public Node(Callable<V> callable, Node<V>... dependencies) {        this.callable = callable;        this.dependencies.addAll(Arrays.asList(dependencies));    }    public Set<Node<V>> getDependencies() {        return this.dependencies;    }    @Override    protected V compute() {        try {            // resolve dependencies first            for (Node<V> node : dependencies) {                if (node.tryMarkVisited()) {                    node.fork(); // start node                }            }            // wait for ALL nodes to complete            for (Node<V> node : dependencies) {                node.join();            }            return callable.call();        } catch (Exception e) {            // TODO Auto-generated catch block            e.printStackTrace();        }        return null;    }    public boolean tryMarkVisited() {        return compareAndSetForkJoinTaskTag((short) 0, VISITED);    }}

Usage example:

public static void main(String[] args) {    Node<Void> filter1 = new Node<>(filter("filter1"));    Node<Void> filter2 = new Node<>(filter("filter2"));    Node<Void> filter3 = new Node<>(filter("filter3"), filter1, filter2);    Node<Void> filter4 = new Node<>(filter("filter4"), filter1, filter2);    Node<Void> filter5 = new Node<>(filter("filter5"), filter3, filter4);    Node<Void> root = new Node<>(() -> null, filter5);    ForkJoinPool.commonPool().invoke(root);}public static Callable<Void> filter(String name) {    return () -> {        System.out.println(Thread.currentThread().getName() + ": start " + name);        Thread.sleep(1000);        System.out.println(Thread.currentThread().getName() + ": end   " + name);        return null;    };}