在前面的博客《Java定时器演进过程和生产级分布式任务调度ElasticJob代码实战》中,我们已经熟悉ElasticJob分布式任务的应用,其核心实现为elasticjob-lite-spring-boot-starter,少量配置开箱即用;还有前面也有博客文档谈谈走进Spring Boot源码学习之路和浅谈入门,了解Spring Boot的原理,没看过伙伴可以先翻看下前面的文章。SpringBoot官网已经提供非常多的starter使用,然而今天我们就来模拟封装一个简易的分布式任务调度实现定时任务选主执行和故障自动转移的starter,本篇主要重心在于基于SpringBoot官网标准start封装的模板和步骤。
Spring Boot 在启动的时候会做这几件事情
其实也就是 Spring Boot 在启动的时候,按照约定去读取 Spring Boot Starter 的配置信息,再根据配置信息对资源进行初始化,并注入到 Spring 容器中。这样 Spring Boot 启动完毕后,就已经准备好了一切资源,使用过程中直接注入对应 Bean 资源即可。
参考GitHub基于分布式任务实现的一些代码,这里核心主要是构建一个light-job自动装配配置文件读取类和一个light-job自动装配配置类。
light-job-spring-boot-starter-autoconfigure模块添加Pom依赖
<?xml version="1.0" encoding="UTF-8"?>
4.0.0
org.springframework.boot
spring-boot-starter-parent
2.6.4
com.itxs
light-job-spring-boot-starter-autoconfigure
1.0
jar
light-job-spring-boot-starter-autoconfigure
Demo project for Spring Boot
UTF-8
UTF-8
1.8
2.6.4
3.4.6
3.4
2.3.2
org.springframework.boot
spring-boot-starter
provided
org.springframework.boot
spring-boot-starter-web
provided
org.springframework.boot
spring-boot-configuration-processor
true
org.springframework
spring-tx
provided
org.springframework
spring-context-support
provided
javax.servlet
javax.servlet-api
provided
org.apache.commons
commons-lang3
${commons-lang3.version}
org.apache.zookeeper
zookeeper
${zookeeper.version}
org.slf4j
slf4j-log4j12
com.google.code.gson
gson
org.quartz-scheduler
quartz
${quartz.version}
provided
org.quartz-scheduler
quartz-jobs
${quartz.version}
provided
折叠
创建LightJobProperties读取配置文件
package com.itxs.lightjob.config;
import com.itxs.lightjob.zk.ZKManager.KEYS;
import org.apache.commons.lang3.StringUtils;
import org.springframework.boot.context.properties.ConfigurationProperties;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ConfigurationProperties(prefix = "light.job",ignoreInvalidFields = true)
public class LightJobProperties {
private String enabled;
private String zkConnect;
private String rootPath = "/light/job";
private int zkSessionTimeout = 60000;
private String zkUsername;
private String zkPassword;
private List ipBlackList;
private List targetBean;
private List targetMethod;
private List cronExpression;
private List startTime;
private List period;
private List delay;
private List params;
private List type;
private List extKeySuffix;
private List beforeMethod;
private List afterMethod;
private List threadNum;
public Map getConfig(){
Map properties = new HashMap();
properties.put(KEYS.zkConnectString.key, zkConnect);
if(StringUtils.isNotBlank(rootPath)){
properties.put(KEYS.rootPath.key, rootPath);
}
if(zkSessionTimeout > 0){
properties.put(KEYS.zkSessionTimeout.key, zkSessionTimeout+"");
}
if(StringUtils.isNotBlank(zkUsername)){
properties.put(KEYS.userName.key, zkUsername);
}
if(StringUtils.isNotBlank(zkPassword)){
properties.put(KEYS.password.key, zkPassword);
}
StringBuilder sb = new StringBuilder();
if(ipBlackList != null && ipBlackList.size() > 0){
for(String ip:ipBlackList){
sb.append(ip).append(",");
}
sb.substring(0,sb.lastIndexOf(","));
}
properties.put(KEYS.ipBlacklist.key, sb.toString());
return properties;
}
public String getEnabled() {
return enabled;
}
public void setEnabled(String enabled) {
this.enabled = enabled;
}
public String getZkConnect() {
return zkConnect;
}
public void setZkConnect(String zkConnect) {
this.zkConnect = zkConnect;
}
public String getRootPath() {
return rootPath;
}
public void setRootPath(String rootPath) {
this.rootPath = rootPath;
}
public int getZkSessionTimeout() {
return zkSessionTimeout;
}
public void setZkSessionTimeout(int zkSessionTimeout) {
this.zkSessionTimeout = zkSessionTimeout;
}
public String getZkUsername() {
return zkUsername;
}
public void setZkUsername(String zkUsername) {
this.zkUsername = zkUsername;
}
public String getZkPassword() {
return zkPassword;
}
public void setZkPassword(String zkPassword) {
this.zkPassword = zkPassword;
}
public List getIpBlackList() {
return ipBlackList;
}
public void setIpBlackList(List ipBlackList) {
this.ipBlackList = ipBlackList;
}
public List getTargetBean() {
return targetBean;
}
public void setTargetBean(List targetBean) {
this.targetBean = targetBean;
}
public List getTargetMethod() {
return targetMethod;
}
public void setTargetMethod(List targetMethod) {
this.targetMethod = targetMethod;
}
public List getCronExpression() {
return cronExpression;
}
public void setCronExpression(List cronExpression) {
this.cronExpression = cronExpression;
}
public List getStartTime() {
return startTime;
}
public void setStartTime(List startTime) {
this.startTime = startTime;
}
public List getPeriod() {
return period;
}
public void setPeriod(List period) {
this.period = period;
}
public List getDelay() {
return delay;
}
public void setDelay(List delay) {
this.delay = delay;
}
public List getParams() {
return params;
}
public void setParams(List params) {
this.params = params;
}
public List getType() {
return type;
}
public void setType(List type) {
this.type = type;
}
public List getExtKeySuffix() {
return extKeySuffix;
}
public void setExtKeySuffix(List extKeySuffix) {
this.extKeySuffix = extKeySuffix;
}
public List getBeforeMethod() {
return beforeMethod;
}
public void setBeforeMethod(List beforeMethod) {
this.beforeMethod = beforeMethod;
}
public List getAfterMethod() {
return afterMethod;
}
public void setAfterMethod(List afterMethod) {
this.afterMethod = afterMethod;
}
public List getThreadNum() {
return threadNum;
}
public void setThreadNum(List threadNum) {
this.threadNum = threadNum;
}
}
折叠
创建自动装配类LightJobAutoConfiguration.java
package com.itxs.lightjob.config;
import com.itxs.lightjob.ZKScheduleManager;
import com.itxs.lightjob.core.TaskDefine;
import com.itxs.lightjob.util.ScheduleUtil;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@Configuration
@EnableConfigurationProperties({LightJobProperties.class})
@ConditionalOnProperty(value = "light.job.enabled", havingValue = "true")
@ComponentScan()
public class LightJobAutoConfiguration {
private static final Logger LOGGER = LoggerFactory.getLogger(LightJobAutoConfiguration.class);
@Autowired
private LightJobProperties uncodeScheduleConfig;
@Bean(name = "zkScheduleManager", initMethod="init")
public ZKScheduleManager commonMapper(){
ZKScheduleManager zkScheduleManager = new ZKScheduleManager();
zkScheduleManager.setZkConfig(uncodeScheduleConfig.getConfig());
List list = initAllTask();
zkScheduleManager.setInitTaskDefines(list);
LOGGER.info("=====>ZKScheduleManager inited..");
return zkScheduleManager;
}
private List initAllTask(){
List list = new ArrayList();
int total = 0;
if(uncodeScheduleConfig.getTargetBean() != null){
total = uncodeScheduleConfig.getTargetBean().size();
}
for(int i = 0; i < total; i++){
TaskDefine taskDefine = new TaskDefine();
if(uncodeScheduleConfig.getTargetBean() != null){
String value = uncodeScheduleConfig.getTargetBean().get(i);
if(StringUtils.isNotBlank(value)){
taskDefine.setTargetBean(value);
}
}
if(uncodeScheduleConfig.getTargetMethod() != null){
String value = uncodeScheduleConfig.getTargetMethod().get(i);
if(StringUtils.isNotBlank(value)){
taskDefine.setTargetMethod(value);
}
}
if(uncodeScheduleConfig.getCronExpression() != null){
String value = uncodeScheduleConfig.getCronExpression().get(i);
if(StringUtils.isNotBlank(value)){
taskDefine.setCronExpression(value);
}
}
if(uncodeScheduleConfig.getStartTime() != null){
String value = uncodeScheduleConfig.getStartTime().get(i);
if(StringUtils.isNotBlank(value)){
Date time = null;
try {
time = ScheduleUtil.transferStringToDate(value);
} catch (ParseException e) {
e.printStackTrace();
}
if(time != null){
taskDefine.setStartTime(time);
}
}
}
if(uncodeScheduleConfig.getPeriod() != null){
String value = uncodeScheduleConfig.getPeriod().get(i);
if(StringUtils.isNotBlank(value)){
taskDefine.setPeriod(Long.valueOf(value));
}
}
if(uncodeScheduleConfig.getDelay() != null){
String value = uncodeScheduleConfig.getDelay().get(i);
if(StringUtils.isNotBlank(value)){
taskDefine.setDelay(Long.valueOf(value));
}
}
if(uncodeScheduleConfig.getParams() != null){
String value = uncodeScheduleConfig.getParams().get(i);
if(StringUtils.isNotBlank(value)){
taskDefine.setParams(value);
}
}
if(uncodeScheduleConfig.getType() != null){
String value = uncodeScheduleConfig.getType().get(i);
if(StringUtils.isNotBlank(value)){
taskDefine.setType(value);
}
}
if(uncodeScheduleConfig.getExtKeySuffix() != null){
String value = uncodeScheduleConfig.getExtKeySuffix().get(i);
if(StringUtils.isNotBlank(value)){
taskDefine.setExtKeySuffix(value);
}
}
if(uncodeScheduleConfig.getBeforeMethod() != null){
String value = uncodeScheduleConfig.getBeforeMethod().get(i);
if(StringUtils.isNotBlank(value)){
taskDefine.setBeforeMethod(value);
}
}
if(uncodeScheduleConfig.getAfterMethod() != null){
String value = uncodeScheduleConfig.getAfterMethod().get(i);
if(StringUtils.isNotBlank(value)){
taskDefine.setAfterMethod(value);
}
}
if(uncodeScheduleConfig.getThreadNum() != null){
String value = uncodeScheduleConfig.getThreadNum().get(i);
if(StringUtils.isNotBlank(value)){
taskDefine.setThreadNum(Integer.valueOf(value));
}
}
list.add(taskDefine);
}
return list;
}
}
折叠
然后在resources目录下的META-INF目录下创建spring.factories文件,跟SpringBoot其他starter一样,输出自动装配类的全类名;springboot项目默认只会扫描本项目下的带@Configuration注解的类,如果自定义starter,不在本工程中,是无法加载的,所以要配置META-INF/spring.factories配置文件。配置了META-INF/spring.factories配置文件是springboot实现starter的关键点,springboot的这种配置加载方式是一种类SPI(Service Provider Interface)的方式,SPI可以在META-INF/services配置接口扩展的实现类,springboot中原理类似,只是名称换成了spring.factories而已。
# Auto Configure
org.springframework.boot.autoconfigure.EnableAutoConfiguration=
com.itxs.lightjob.config.LightJobAutoConfiguration
其他还有自动装配类的具体实现代码文件,如下面目录,主要利用zookeeper做分布式协调如分布式选主,执行maven install打包和安装到本地maven仓库。
light-job-spring-boot-starter是不做实现,主要管理依赖,Pom文件内容如下
<?xml version="1.0" encoding="UTF-8"?>
4.0.0
com.itxs
light-job-spring-boot-starter
1.0
jar
8
8
com.itxs
light-job-spring-boot-starter-autoconfigure
1.0
最后我们执行maven install打包和安装到本地maven仓库。
示例工程中加入light-job-spring-boot-starter依赖,这里选择前面文章示例的库存微服务模块中添加
com.itxs
light-job-spring-boot-starter
1.0
创建演示任务并放到Spring容器里管理
package cn.itxs.ecom.storage.job;
import org.springframework.stereotype.Component;
@Component
public class DemoTask {
public void execute() {
System.out.println("===========execute start!=========");
System.out.println("===========do job!=========");
System.out.println("===========execute end !=========");
}
}
配置文件增加
light:
job:
enabled: true
zk-connect: 192.168.4.27:2181,192.168.4.28:2181,192.168.4.29:2181
root-path: /ecom/storage
zk-session-timeout: 60000
target-bean:
- demoTask
target-method:
- execute
period:
- 1000
cron-expression:
- 0/10 * * * * ?
启动三个库存微服务模块,在第1个库存微服务模块看到demoTask任务已经根据配置每十秒在运行
关闭第1个库存微服务模块程序后,通过zookeeper重新选举一个节点定时执行,从下面看选择第3个库存微服务模块每十秒实行
zookeeper地址配置可以放到配置中心如Nacos,如果目前我们配置数据是放在Redis中,可以通过System.setProperty设置系统变量的方式来实现,先注释zk-connect的配置,这是启动程序就会报错
RedisConfig配置类中增加实现BeanPostProcessor接口实现其postProcessAfterInitialization方法,在bean初始化后读取redis值设置环境变量值。
package cn.itxs.ecom.storage.config;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
@Slf4j
public class RedisConfig implements BeanPostProcessor{
@Bean
public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate template = new RedisTemplate();
template.setConnectionFactory(redisConnectionFactory);
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
//om.activateDefaultTyping(LaissezFaireSubTypeValidator.instance , ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
// key采用String的序列化方式
template.setKeySerializer(stringRedisSerializer);
// hash的key也采用String的序列化方式
template.setHashKeySerializer(stringRedisSerializer);
// value序列化方式采用jackson
//template.setValueSerializer(jackson2JsonRedisSerializer);
template.setValueSerializer(stringRedisSerializer);
// hash的value序列化方式采用jackson
//template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.setHashValueSerializer(stringRedisSerializer);
template.afterPropertiesSet();
return template;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException
{
//在redisTemplate Bean初始化之后设置light.job.zk-connect为公共集群的zk地址
if (beanName.equals("redisTemplate")){
log.info("postProcessAfterInitialization match beanName {}",beanName);
try {
RedisTemplate redisObj = (RedisTemplate) bean;
String zkConnect = (String)redisObj.opsForHash().get("clusterinfo", "zookeeper-server");
if (StringUtils.isNotBlank(zkConnect)) {
log.info("postProcessAfterInitialization get zkConnect ={}", zkConnect);
System.setProperty("light.job.zk-connect", zkConnect);
log.info("System.setProperty light.job.zk-connect={}", zkConnect);
}
} catch (Exception e) {
log.error("postProcessAfterInitialization operate redisTemplate {} failed", e);
}
}
return null;
}
}
折叠
启动后可以看到正常每十秒执行定时任务
文章来自https://www.cnblogs.com/itxiaoshen/p/16456694.html
页面更新:2024-05-17
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号