How to create a blocking background loader in Java 8? How to create a blocking background loader in Java 8? multithreading multithreading

How to create a blocking background loader in Java 8?


You already have a threadpool to execute the task.It's not necessarily and make thing complicated to run the task in another async executor (ForkJoinPool when you use CompletableFuture)

Make it simple:

public static void loadInBackground(int taskId) {    // create the loading task    BackgroundTask backgroundTask = new BackgroundTask(taskId);    // No need to run in async, as it already in executor    backgroundTask.run();}

The ScheduledExecutorService will ensure only one task is run at a time when you invoked it with scheduleAtFixedRate

Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently with the given period; that is executions will commence after initialDelay then initialDelay+period, then initialDelay + 2 * period, and so on. If any execution of the task encounters an exception, subsequent executions are suppressed. Otherwise, the task will only terminate via cancellation or termination of the executor. If any execution of this task takes longer than its period, then subsequent executions may start late, but will not concurrently execute.


Taking the following as the requirements:

  • data should be loaded in background
  • after the loading the data should be displayed
  • while data are loaded no further requests should be accepted
  • if there were requests while the data were loaded another loading should be scheduled after a certain timeout (e. g. 5 seconds)

The solution ca be build based on the Executors.newSingleThreadExecutor(), CompletableFuture and LinkedBlockingQueue:

public class SingleThreadedLoader {  private static class BackgroundTask extends CompletableFuture<String> {    private final String query;    private BackgroundTask(String query) {      this.query = query;    }    public String getQuery() {      return query;    }  }  private final BlockingQueue<BackgroundTask> tasks = new LinkedBlockingQueue<>();  // while data are loaded no further requests should be accepted  private final Executor executor = Executors.newSingleThreadExecutor();  private final int delaySeconds;  private AtomicReference<Instant> lastExecution = new AtomicReference<>(Instant.EPOCH);  public SingleThreadedLoader(int delaySeconds) {    this.delaySeconds = delaySeconds;    setupLoading();  }  public BackgroundTask loadInBackground(String query) {    log("Enqueued query " + query);    BackgroundTask task = new BackgroundTask(query);    tasks.add(task);    return task;  }  private void setupLoading() {    // data should be loaded in background    executor.execute(() -> {      while (true) {        try {          // if there were requests while the data were loaded          // another loading should be scheduled after a certain timeout (e. g. 5 seconds)          Instant prev = lastExecution.get();          long delay = Duration.between(prev, Instant.now()).toSeconds();          if (delay < delaySeconds) {            log("Waiting for 5 seconds before next data loading");            TimeUnit.SECONDS.sleep(delaySeconds - delay);          }          BackgroundTask task = tasks.take();          try {            String query = task.getQuery();            String data = loadData(query);            task.complete(data);          } catch (Exception e) {            task.completeExceptionally(e);          }          lastExecution.set(Instant.now());        } catch (InterruptedException e) {          log(e.getMessage());          return;        }      }    });  }  private String loadData(String query) {    try {      log("Loading data for " + query);      TimeUnit.SECONDS.sleep(2);      log("Loaded data for " + query);      return "Result " + query;    } catch (InterruptedException e) {      throw new RuntimeException(e);    }  }  private static void log(String str) {    String time = LocalTime.now().truncatedTo(ChronoUnit.SECONDS).format(DateTimeFormatter.ISO_TIME);    String thread = Thread.currentThread().getName();    System.out.println(time + ' ' + thread + ": " + str);  }  public static void main(String[] args) throws Exception {    SingleThreadedLoader loader = new SingleThreadedLoader(5);    // after the loading the data should be displayed    loader.loadInBackground("1").thenAccept(SingleThreadedLoader::log);    loader.loadInBackground("2").thenAccept(SingleThreadedLoader::log);    loader.loadInBackground("3").thenAccept(SingleThreadedLoader::log);    log("Do another work in the main thread");    TimeUnit.SECONDS.sleep(30);  }}

After the execution the stdout will have the following output:

10:29:26 main: Enqueued query 110:29:26 pool-1-thread-1: Loading data for 110:29:26 main: Enqueued query 210:29:26 main: Enqueued query 310:29:26 main: Do another work in the main thread10:29:28 pool-1-thread-1: Loaded data for 110:29:28 pool-1-thread-1: Result 110:29:28 pool-1-thread-1: Waiting for 5 seconds before next data loading10:29:33 pool-1-thread-1: Loading data for 210:29:36 pool-1-thread-1: Loaded data for 210:29:36 pool-1-thread-1: Result 210:29:36 pool-1-thread-1: Waiting for 5 seconds before next data loading10:29:41 pool-1-thread-1: Loading data for 310:29:43 pool-1-thread-1: Loaded data for 310:29:43 pool-1-thread-1: Result 3


I have added an AtomicInteger which will act as a counter for running tasks with simple lock() and unlock() methods with this minor change into your original code I got output:

Start #0: Thread[ForkJoinPool.commonPool-worker-2,5,main]background task cancelled  1background task cancelled  2Finish #0: Thread[ForkJoinPool.commonPool-worker-2,5,main]Background task finished:task 0Start #3: Thread[ForkJoinPool.commonPool-worker-2,5,main]background task cancelled  4Finish #3: Thread[ForkJoinPool.commonPool-worker-2,5,main]background task cancelled  5Background task finished:task 3Start #6: Thread[ForkJoinPool.commonPool-worker-3,5,main]background task cancelled  7Finish #6: Thread[ForkJoinPool.commonPool-worker-3,5,main]background task cancelled  8Background task finished:task 6Start #9: Thread[ForkJoinPool.commonPool-worker-2,5,main]background task cancelled  10Cancelled

Here is my solution for your task:

public class LoadInBackgroundExample {    //Added new exception    public static class AlreadyIsRunningException extends RuntimeException {        long taskId;        public AlreadyIsRunningException(String message, long taskId) {            super(message);            this.taskId = taskId;        }        public long getTaskId() {            return taskId;        }        public void setTaskId(long taskId) {            this.taskId = taskId;        }    }    /**     * A simple background task which should perform the data loading operation. In this minimal example it simply invokes Thread.sleep     */    public static class BackgroundTask implements Runnable {        //this atomicInteger acts as a global lock counter for BackgroundTask objects        private static AtomicInteger counter = new AtomicInteger(0);        private int id;        public BackgroundTask(int id) {            this.id = id;        }        private void unlock() {            counter.decrementAndGet();        }        private void lock() {            //we need to check this way to avoid some unlucky timing between threads            int lockValue = counter.incrementAndGet();            //if we got counter different than 1 that means that some other task is already running (it has already acquired the lock)            if (lockValue != 1) {                //rollback our check                counter.decrementAndGet();                //throw an exception                throw new AlreadyIsRunningException("Some other task already is running", id);            }        }        /**         * Sleep for a given amount of time to simulate loading.         */        @Override        public void run() {            //Check if we can acquire lock            lock();            //we have a lock to            try {                System.out.println("Start #" + id + ": " + Thread.currentThread());                long sleepTime = 2000;                Thread.sleep(sleepTime);            } catch (InterruptedException e) {                e.printStackTrace();            } finally {                System.out.println("Finish #" + id + ": " + Thread.currentThread());                unlock();            }        }    }    /**     * CompletableFuture which simulates loading and showing data.     *     * @param taskId Identifier of the current task     */    public static void loadInBackground(int taskId) {        // create the loading task        BackgroundTask backgroundTask = new BackgroundTask(taskId);        // "load" the data asynchronously        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(new Supplier<String>() {            @Override            public String get() {                CompletableFuture<Void> future = CompletableFuture.runAsync(backgroundTask);                try {                    future.get();                } catch (ExecutionException e) {                    if (e.getCause() instanceof AlreadyIsRunningException) {                        System.out.println("background task cancelled  " + ((AlreadyIsRunningException) e.getCause()).getTaskId());                        throw (AlreadyIsRunningException) e.getCause();                    }                } catch (InterruptedException e) {                    e.printStackTrace();                }                return "task " + backgroundTask.id;            }        });        // display the data after they are loaded        CompletableFuture<Void> future = completableFuture.thenAccept(x -> {            System.out.println("Background task finished:" + x);        });    }    ArrayList<BackgroundTask> backgroundTasks = new ArrayList<>();    public static void main(String[] args) {        // runnable which invokes the background loader every second        Runnable trigger = new Runnable() {            int taskId = 0;            public void run() {                loadInBackground(taskId++);            }        };        // create scheduler        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);        ScheduledFuture<?> beeperHandle = scheduler.scheduleAtFixedRate(trigger, 0, 1, TimeUnit.SECONDS);        // cancel the scheudler and the application after 10 seconds        scheduler.schedule(() -> beeperHandle.cancel(true), 10, TimeUnit.SECONDS);        try {            beeperHandle.get();        } catch (Throwable th) {        }        System.out.println("Cancelled");        System.exit(0);    }

UPDATE

I have changed lock() and unlock() methods to more simple form:

private static  AtomicBoolean atomicBoolean = new AtomicBoolean(false);        private void unlock() {            atomicBoolean.set(false);        }        private void lock() {            //if 'changed' is false that means some other task is already running            boolean changed = atomicBoolean.compareAndSet(false,true);            if (!changed) {                throw new AlreadyIsRunningException("Some other task  is already running", id);            }        }