Spring Scheduled Task running in clustered environment Spring Scheduled Task running in clustered environment spring spring

Spring Scheduled Task running in clustered environment


There is a ShedLock project that serves exactly this purpose. You just annotate tasks which should be locked when executed

@Scheduled( ... )@SchedulerLock(name = "scheduledTaskName")public void scheduledTask() {   // do something}

Configure Spring and a LockProvider

@Configuration@EnableScheduling@EnableSchedulerLock(defaultLockAtMostFor = "10m")class MySpringConfiguration {    ...    @Bean    public LockProvider lockProvider(DataSource dataSource) {       return new JdbcTemplateLockProvider(dataSource);    }    ...}


The is another simple and robust way to safe execute a job in a cluster. You can based on database and execute the task only if the node is the "leader" in the cluster.

Also when a node is failed or shutdown in the cluster another node became the leader.

All you have is to create a "leader election" mechanism and every time to check if your are the leader:

@Scheduled(cron = "*/30 * * * * *")public void executeFailedEmailTasks() {    if (checkIfLeader()) {        final List<EmailTask> list = emailTaskService.getFailedEmailTasks();        for (EmailTask emailTask : list) {            dispatchService.sendEmail(emailTask);        }    }}

Follow those steps:

1.Define the object and table that holds one entry per node in the cluster:

@Entity(name = "SYS_NODE")public class SystemNode {/** The id. */@Id@GeneratedValue(strategy = GenerationType.IDENTITY)private Long id;/** The name. */@Column(name = "TIMESTAMP")private String timestamp;/** The ip. */@Column(name = "IP")private String ip;/** The last ping. */@Column(name = "LAST_PING")private Date lastPing;/** The last ping. */@Column(name = "CREATED_AT")private Date createdAt = new Date();/** The last ping. */@Column(name = "IS_LEADER")private Boolean isLeader = Boolean.FALSE;public Long getId() {    return id;}public void setId(final Long id) {    this.id = id;}public String getTimestamp() {    return timestamp;}public void setTimestamp(final String timestamp) {    this.timestamp = timestamp;}public String getIp() {    return ip;}public void setIp(final String ip) {    this.ip = ip;}public Date getLastPing() {    return lastPing;}public void setLastPing(final Date lastPing) {    this.lastPing = lastPing;}public Date getCreatedAt() {    return createdAt;}public void setCreatedAt(final Date createdAt) {    this.createdAt = createdAt;}public Boolean getIsLeader() {    return isLeader;}public void setIsLeader(final Boolean isLeader) {    this.isLeader = isLeader;}@Overridepublic String toString() {    return "SystemNode{" +            "id=" + id +            ", timestamp='" + timestamp + '\'' +            ", ip='" + ip + '\'' +            ", lastPing=" + lastPing +            ", createdAt=" + createdAt +            ", isLeader=" + isLeader +            '}';}

}

2.Create the service that a) insert the node in database , b) check for leader

@Service@Transactionalpublic class SystemNodeServiceImpl implements SystemNodeService,    ApplicationListener {/** The logger. */private static final Logger LOGGER = Logger.getLogger(SystemNodeService.class);/** The constant NO_ALIVE_NODES. */private static final String NO_ALIVE_NODES = "Not alive nodes found in list {0}";/** The ip. */private String ip;/** The system service. */private SystemService systemService;/** The system node repository. */private SystemNodeRepository systemNodeRepository;@Autowiredpublic void setSystemService(final SystemService systemService) {    this.systemService = systemService;}@Autowiredpublic void setSystemNodeRepository(final SystemNodeRepository systemNodeRepository) {    this.systemNodeRepository = systemNodeRepository;}@Overridepublic void pingNode() {    final SystemNode node = systemNodeRepository.findByIp(ip);    if (node == null) {        createNode();    } else {        updateNode(node);    }}@Overridepublic void checkLeaderShip() {    final List<SystemNode> allList = systemNodeRepository.findAll();    final List<SystemNode> aliveList = filterAliveNodes(allList);    SystemNode leader = findLeader(allList);    if (leader != null && aliveList.contains(leader)) {        setLeaderFlag(allList, Boolean.FALSE);        leader.setIsLeader(Boolean.TRUE);        systemNodeRepository.save(allList);    } else {        final SystemNode node = findMinNode(aliveList);        setLeaderFlag(allList, Boolean.FALSE);        node.setIsLeader(Boolean.TRUE);        systemNodeRepository.save(allList);    }}/** * Returns the leaded * @param list *          the list * @return  the leader */private SystemNode findLeader(final List<SystemNode> list) {    for (SystemNode systemNode : list) {        if (systemNode.getIsLeader()) {            return systemNode;        }    }    return null;}@Overridepublic boolean isLeader() {    final SystemNode node = systemNodeRepository.findByIp(ip);    return node != null && node.getIsLeader();}@Overridepublic void onApplicationEvent(final ApplicationEvent applicationEvent) {    try {        ip = InetAddress.getLocalHost().getHostAddress();    } catch (Exception e) {        throw new RuntimeException(e);    }    if (applicationEvent instanceof ContextRefreshedEvent) {        pingNode();    }}/** * Creates the node */private void createNode() {    final SystemNode node = new SystemNode();    node.setIp(ip);    node.setTimestamp(String.valueOf(System.currentTimeMillis()));    node.setCreatedAt(new Date());    node.setLastPing(new Date());    node.setIsLeader(CollectionUtils.isEmpty(systemNodeRepository.findAll()));    systemNodeRepository.save(node);}/** * Updates the node */private void updateNode(final SystemNode node) {    node.setLastPing(new Date());    systemNodeRepository.save(node);}/** * Returns the alive nodes. * * @param list *         the list * @return the alive nodes */private List<SystemNode> filterAliveNodes(final List<SystemNode> list) {    int timeout = systemService.getSetting(SettingEnum.SYSTEM_CONFIGURATION_SYSTEM_NODE_ALIVE_TIMEOUT, Integer.class);    final List<SystemNode> finalList = new LinkedList<>();    for (SystemNode systemNode : list) {        if (!DateUtils.hasExpired(systemNode.getLastPing(), timeout)) {            finalList.add(systemNode);        }    }    if (CollectionUtils.isEmpty(finalList)) {        LOGGER.warn(MessageFormat.format(NO_ALIVE_NODES, list));        throw new RuntimeException(MessageFormat.format(NO_ALIVE_NODES, list));    }    return finalList;}/** * Finds the min name node. * * @param list *         the list * @return the min node */private SystemNode findMinNode(final List<SystemNode> list) {    SystemNode min = list.get(0);    for (SystemNode systemNode : list) {        if (systemNode.getTimestamp().compareTo(min.getTimestamp()) < -1) {            min = systemNode;        }    }    return min;}/** * Sets the leader flag. * * @param list *         the list * @param value *         the value */private void setLeaderFlag(final List<SystemNode> list, final Boolean value) {    for (SystemNode systemNode : list) {        systemNode.setIsLeader(value);    }}

}

3.ping the database to send that your are alive

@Override@Scheduled(cron = "0 0/5 * * * ?")public void executeSystemNodePing() {    systemNodeService.pingNode();}@Override@Scheduled(cron = "0 0/10 * * * ?")public void executeLeaderResolution() {    systemNodeService.checkLeaderShip();}

4.you are ready! Just check if you are the leader before execute the task:

@Override@Scheduled(cron = "*/30 * * * * *")public void executeFailedEmailTasks() {    if (checkIfLeader()) {        final List<EmailTask> list = emailTaskService.getFailedEmailTasks();        for (EmailTask emailTask : list) {            dispatchService.sendEmail(emailTask);        }    }}