Java stop executor service once one of his assigned tasks fails for any reason Java stop executor service once one of his assigned tasks fails for any reason multithreading multithreading

Java stop executor service once one of his assigned tasks fails for any reason


The idea is that the tasks are pushing to a common object TaskCompleteEvent. If they push an error the scheduler is stopped and all the tasks will stop.

You can check the results of every task-iteration in the maps "errors" and "success".

public class SchedulerTest {    @Test    public void scheduler() throws InterruptedException {        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);        TaskCompleteEvent taskCompleteEvent = new TaskCompleteEvent(scheduledExecutorService);        Runnable task1 = () -> {            int num = new Random().nextInt(100);            if (num < 5) {                taskCompleteEvent.message("task1-"+UUID.randomUUID().toString(), "Num "+num+" was obatined. Breaking all the executions.", true);            }        };        Runnable task2 = () -> {            int num = new Random().nextInt(100);            taskCompleteEvent.message("task2-"+UUID.randomUUID().toString(), num < 50, false);        };        scheduledExecutorService.scheduleAtFixedRate(task1, 0, 1, TimeUnit.SECONDS);        scheduledExecutorService.scheduleAtFixedRate(task2, 0, 1, TimeUnit.SECONDS);        scheduledExecutorService.awaitTermination(60, TimeUnit.SECONDS);        System.out.println("Success: "+taskCompleteEvent.getSuccess());        System.out.println("Errors: "+taskCompleteEvent.getErrors());        System.out.println("Went well?: "+taskCompleteEvent.getErrors().isEmpty());    }    public static class TaskCompleteEvent {        private final ScheduledExecutorService scheduledExecutorService;        private final Map<String, Object> errors = new LinkedHashMap<>();        private final Map<String, Object> success = new LinkedHashMap<>();        public TaskCompleteEvent(ScheduledExecutorService scheduledExecutorService) {            this.scheduledExecutorService = scheduledExecutorService;        }        public synchronized void message(String id, Object response, boolean error) {            if (error) {                errors.put(id, response);                scheduledExecutorService.shutdown();            } else {                success.put(id, response);            }        }        public synchronized Map<String, Object> getErrors() {            return errors;        }        public synchronized Map<String, Object> getSuccess() {            return success;        }    }}


You just need to add an additional task whose job it is to monitor all of the other running tasks -- and when any of the monitored tasks fail, they need to set a semaphore (flag) that the assassin can inspect.

    ScheduledExecutorService executor = (ScheduledExecutorService) Executors.newScheduledThreadPool(2);    // INSTANTIATE THE REMOTE-FILE-MONITOR:    RemoteFileMonitor monitor = new RemoteFileMonitor(remotesource, localtarget);    // THIS TimerTask PERIODICALLY TRIGGERS THE RemoteFileMonitor:     TimerTask remote = new TimerTask() {        // RUN FORREST... RUN !        public void run() {            try {                 kae.trace("TimerTask::run() --> Calling RemoteFileMonitor.check()");                monitor.check();            } catch (Exception ex) {                // NULL TRAP: ALLOWS US TO CONTINUE AND RETRY:            }        }    };    // THIS TimerTask PERIODICALLY TRIES TO KILL THE REMOTE-FILE-MONITOR:    TimerTask assassin = new TimerTask() {        // WHERE DO BAD FOLKS GO WHEN THEY DIE ?         private final LocalDateTime death = LocalDateTime.now().plus(ConfigurationOptions.getPollingCycleTime(), ChronoUnit.MINUTES);        // RUN FORREST... RUN !        public void run() {            // IS THERE LIFE AFTER DEATH ???            if (LocalDateTime.now().isAfter(death)) {                // THEY GO TO A LAKE OF FIRE AND FRY:                kae.error(ReturnCode.MONITOR_POLLING_CYCLE_EXCEEDED);                               }        }    };    // SCHEDULE THE PERIODIC EXECUTION OF THE RemoteFileMonitor: (remote --> run() monitor --> check())    executor.scheduleAtFixedRate(remote, delay, interval, TimeUnit.MINUTES);    // SCHEDULE PERIODIC ASSASSINATION ATTEMPTS AGAINST THE RemoteFileMonitor: (assassin --> run() --> after death --> die())    executor.scheduleAtFixedRate(assassin, delay, 60L, TimeUnit.SECONDS);    // LOOP UNTIL THE MONITOR COMPLETES:    do {        try {            // I THINK I NEED A NAP:            Thread.sleep(interval * 10);                        } catch (InterruptedException e) {            // FAIL && THEN cleanexit();            kae.error(ReturnCode.MONITORING_ERROR, "Monitoring of the XXXXXX-Ingestion site was interrupted");        }        // NOTE: THE MONITOR IS SET TO 'FINISHED' WHEN THE DONE-File IS DELIVERED AND RETRIEVED:    } while (monitor.isNotFinished());    // SHUTDOWN THE MONITOR TASK:    executor.shutdown();