Spring Scheduled Task running in clustered environment
There is a ShedLock project that serves exactly this purpose. You just annotate tasks which should be locked when executed
@Scheduled( ... )@SchedulerLock(name = "scheduledTaskName")public void scheduledTask() { // do something}
Configure Spring and a LockProvider
@Configuration@EnableScheduling@EnableSchedulerLock(defaultLockAtMostFor = "10m")class MySpringConfiguration { ... @Bean public LockProvider lockProvider(DataSource dataSource) { return new JdbcTemplateLockProvider(dataSource); } ...}
The is another simple and robust way to safe execute a job in a cluster. You can based on database and execute the task only if the node is the "leader" in the cluster.
Also when a node is failed or shutdown in the cluster another node became the leader.
All you have is to create a "leader election" mechanism and every time to check if your are the leader:
@Scheduled(cron = "*/30 * * * * *")public void executeFailedEmailTasks() { if (checkIfLeader()) { final List<EmailTask> list = emailTaskService.getFailedEmailTasks(); for (EmailTask emailTask : list) { dispatchService.sendEmail(emailTask); } }}
Follow those steps:
1.Define the object and table that holds one entry per node in the cluster:
@Entity(name = "SYS_NODE")public class SystemNode {/** The id. */@Id@GeneratedValue(strategy = GenerationType.IDENTITY)private Long id;/** The name. */@Column(name = "TIMESTAMP")private String timestamp;/** The ip. */@Column(name = "IP")private String ip;/** The last ping. */@Column(name = "LAST_PING")private Date lastPing;/** The last ping. */@Column(name = "CREATED_AT")private Date createdAt = new Date();/** The last ping. */@Column(name = "IS_LEADER")private Boolean isLeader = Boolean.FALSE;public Long getId() { return id;}public void setId(final Long id) { this.id = id;}public String getTimestamp() { return timestamp;}public void setTimestamp(final String timestamp) { this.timestamp = timestamp;}public String getIp() { return ip;}public void setIp(final String ip) { this.ip = ip;}public Date getLastPing() { return lastPing;}public void setLastPing(final Date lastPing) { this.lastPing = lastPing;}public Date getCreatedAt() { return createdAt;}public void setCreatedAt(final Date createdAt) { this.createdAt = createdAt;}public Boolean getIsLeader() { return isLeader;}public void setIsLeader(final Boolean isLeader) { this.isLeader = isLeader;}@Overridepublic String toString() { return "SystemNode{" + "id=" + id + ", timestamp='" + timestamp + '\'' + ", ip='" + ip + '\'' + ", lastPing=" + lastPing + ", createdAt=" + createdAt + ", isLeader=" + isLeader + '}';}
}
2.Create the service that a) insert the node in database , b) check for leader
@Service@Transactionalpublic class SystemNodeServiceImpl implements SystemNodeService, ApplicationListener {/** The logger. */private static final Logger LOGGER = Logger.getLogger(SystemNodeService.class);/** The constant NO_ALIVE_NODES. */private static final String NO_ALIVE_NODES = "Not alive nodes found in list {0}";/** The ip. */private String ip;/** The system service. */private SystemService systemService;/** The system node repository. */private SystemNodeRepository systemNodeRepository;@Autowiredpublic void setSystemService(final SystemService systemService) { this.systemService = systemService;}@Autowiredpublic void setSystemNodeRepository(final SystemNodeRepository systemNodeRepository) { this.systemNodeRepository = systemNodeRepository;}@Overridepublic void pingNode() { final SystemNode node = systemNodeRepository.findByIp(ip); if (node == null) { createNode(); } else { updateNode(node); }}@Overridepublic void checkLeaderShip() { final List<SystemNode> allList = systemNodeRepository.findAll(); final List<SystemNode> aliveList = filterAliveNodes(allList); SystemNode leader = findLeader(allList); if (leader != null && aliveList.contains(leader)) { setLeaderFlag(allList, Boolean.FALSE); leader.setIsLeader(Boolean.TRUE); systemNodeRepository.save(allList); } else { final SystemNode node = findMinNode(aliveList); setLeaderFlag(allList, Boolean.FALSE); node.setIsLeader(Boolean.TRUE); systemNodeRepository.save(allList); }}/** * Returns the leaded * @param list * the list * @return the leader */private SystemNode findLeader(final List<SystemNode> list) { for (SystemNode systemNode : list) { if (systemNode.getIsLeader()) { return systemNode; } } return null;}@Overridepublic boolean isLeader() { final SystemNode node = systemNodeRepository.findByIp(ip); return node != null && node.getIsLeader();}@Overridepublic void onApplicationEvent(final ApplicationEvent applicationEvent) { try { ip = InetAddress.getLocalHost().getHostAddress(); } catch (Exception e) { throw new RuntimeException(e); } if (applicationEvent instanceof ContextRefreshedEvent) { pingNode(); }}/** * Creates the node */private void createNode() { final SystemNode node = new SystemNode(); node.setIp(ip); node.setTimestamp(String.valueOf(System.currentTimeMillis())); node.setCreatedAt(new Date()); node.setLastPing(new Date()); node.setIsLeader(CollectionUtils.isEmpty(systemNodeRepository.findAll())); systemNodeRepository.save(node);}/** * Updates the node */private void updateNode(final SystemNode node) { node.setLastPing(new Date()); systemNodeRepository.save(node);}/** * Returns the alive nodes. * * @param list * the list * @return the alive nodes */private List<SystemNode> filterAliveNodes(final List<SystemNode> list) { int timeout = systemService.getSetting(SettingEnum.SYSTEM_CONFIGURATION_SYSTEM_NODE_ALIVE_TIMEOUT, Integer.class); final List<SystemNode> finalList = new LinkedList<>(); for (SystemNode systemNode : list) { if (!DateUtils.hasExpired(systemNode.getLastPing(), timeout)) { finalList.add(systemNode); } } if (CollectionUtils.isEmpty(finalList)) { LOGGER.warn(MessageFormat.format(NO_ALIVE_NODES, list)); throw new RuntimeException(MessageFormat.format(NO_ALIVE_NODES, list)); } return finalList;}/** * Finds the min name node. * * @param list * the list * @return the min node */private SystemNode findMinNode(final List<SystemNode> list) { SystemNode min = list.get(0); for (SystemNode systemNode : list) { if (systemNode.getTimestamp().compareTo(min.getTimestamp()) < -1) { min = systemNode; } } return min;}/** * Sets the leader flag. * * @param list * the list * @param value * the value */private void setLeaderFlag(final List<SystemNode> list, final Boolean value) { for (SystemNode systemNode : list) { systemNode.setIsLeader(value); }}
}
3.ping the database to send that your are alive
@Override@Scheduled(cron = "0 0/5 * * * ?")public void executeSystemNodePing() { systemNodeService.pingNode();}@Override@Scheduled(cron = "0 0/10 * * * ?")public void executeLeaderResolution() { systemNodeService.checkLeaderShip();}
4.you are ready! Just check if you are the leader before execute the task:
@Override@Scheduled(cron = "*/30 * * * * *")public void executeFailedEmailTasks() { if (checkIfLeader()) { final List<EmailTask> list = emailTaskService.getFailedEmailTasks(); for (EmailTask emailTask : list) { dispatchService.sendEmail(emailTask); } }}