SpringBoot自定义starter开发分布式任务调度实践

概述

需求

在前面的博客《Java定时器演进过程和生产级分布式任务调度ElasticJob代码实战》中,我们已经熟悉ElasticJob分布式任务的应用,其核心实现为elasticjob-lite-spring-boot-starter,少量配置开箱即用;还有前面也有博客文档谈谈走进Spring Boot源码学习之路和浅谈入门,了解Spring Boot的原理,没看过伙伴可以先翻看下前面的文章。SpringBoot官网已经提供非常多的starter使用,然而今天我们就来模拟封装一个简易的分布式任务调度实现定时任务选主执行和故障自动转移的starter,本篇主要重心在于基于SpringBoot官网标准start封装的模板和步骤。

相关概念

制作starter基本步骤

SpringBoot启动简述

Spring Boot 在启动的时候会做这几件事情

其实也就是 Spring Boot 在启动的时候,按照约定去读取 Spring Boot Starter 的配置信息,再根据配置信息对资源进行初始化,并注入到 Spring 容器中。这样 Spring Boot 启动完毕后,就已经准备好了一切资源,使用过程中直接注入对应 Bean 资源即可。

实践

创建项目

SpringBoot自定义starter开发分布式任务调度实践

SpringBoot自定义starter开发分布式任务调度实践

autoconfigure实现

参考GitHub基于分布式任务实现的一些代码,这里核心主要是构建一个light-job自动装配配置文件读取类和一个light-job自动装配配置类。

light-job-spring-boot-starter-autoconfigure模块添加Pom依赖

<?xml version="1.0" encoding="UTF-8"?>

    4.0.0
    
        org.springframework.boot
        spring-boot-starter-parent
        2.6.4
         
    
    com.itxs
    light-job-spring-boot-starter-autoconfigure
    1.0
    jar
    light-job-spring-boot-starter-autoconfigure
    Demo project for Spring Boot

    
        UTF-8
        UTF-8
        1.8
        2.6.4
        3.4.6
        3.4
        2.3.2
    

    
        
            org.springframework.boot
            spring-boot-starter
            provided
        
        
            org.springframework.boot
            spring-boot-starter-web
            provided
        
        
            org.springframework.boot
            spring-boot-configuration-processor
            true
        
        
            org.springframework
            spring-tx
            provided
        
        
            org.springframework
            spring-context-support
            provided
        

        
            javax.servlet
            javax.servlet-api
            provided
        

        
            org.apache.commons
            commons-lang3
            ${commons-lang3.version}
        

        
            org.apache.zookeeper
            zookeeper
            ${zookeeper.version}
            
                
                    org.slf4j
                    slf4j-log4j12
                
            
        

        
            com.google.code.gson
            gson
        

        
            org.quartz-scheduler
            quartz
            ${quartz.version}
            provided
        
        
            org.quartz-scheduler
            quartz-jobs
            ${quartz.version}
            provided
        
    


折叠 

创建LightJobProperties读取配置文件

package com.itxs.lightjob.config;

import com.itxs.lightjob.zk.ZKManager.KEYS;
import org.apache.commons.lang3.StringUtils;
import org.springframework.boot.context.properties.ConfigurationProperties;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

@ConfigurationProperties(prefix = "light.job",ignoreInvalidFields = true)
public class LightJobProperties {

	private String enabled;
	private String zkConnect;
	private String rootPath = "/light/job";
	private int zkSessionTimeout = 60000;
	private String zkUsername;
	private String zkPassword;
	private List ipBlackList;
	
	
	private List targetBean;
	private List targetMethod;
	private List cronExpression;
	private List startTime;
	private List period;
	private List delay;
	private List params;
	private List type;
	private List extKeySuffix;
	private List beforeMethod;
	private List afterMethod;
	private List threadNum;
	
	
	public Map getConfig(){
		Map properties = new HashMap();
		properties.put(KEYS.zkConnectString.key, zkConnect);
		if(StringUtils.isNotBlank(rootPath)){
			properties.put(KEYS.rootPath.key, rootPath);
		}
		if(zkSessionTimeout > 0){
			properties.put(KEYS.zkSessionTimeout.key, zkSessionTimeout+"");
		}
		if(StringUtils.isNotBlank(zkUsername)){
			properties.put(KEYS.userName.key, zkUsername);
		}
		if(StringUtils.isNotBlank(zkPassword)){
			properties.put(KEYS.password.key, zkPassword);
		}
		StringBuilder sb = new StringBuilder();
		if(ipBlackList != null && ipBlackList.size() > 0){
			for(String ip:ipBlackList){
				sb.append(ip).append(",");
			}
			sb.substring(0,sb.lastIndexOf(","));
		}
		properties.put(KEYS.ipBlacklist.key, sb.toString());
		return properties;
	}

	public String getEnabled() {
		return enabled;
	}

	public void setEnabled(String enabled) {
		this.enabled = enabled;
	}

	public String getZkConnect() {
		return zkConnect;
	}
	public void setZkConnect(String zkConnect) {
		this.zkConnect = zkConnect;
	}
	public String getRootPath() {
		return rootPath;
	}
	public void setRootPath(String rootPath) {
		this.rootPath = rootPath;
	}
	public int getZkSessionTimeout() {
		return zkSessionTimeout;
	}
	public void setZkSessionTimeout(int zkSessionTimeout) {
		this.zkSessionTimeout = zkSessionTimeout;
	}
	public String getZkUsername() {
		return zkUsername;
	}
	public void setZkUsername(String zkUsername) {
		this.zkUsername = zkUsername;
	}
	public String getZkPassword() {
		return zkPassword;
	}
	public void setZkPassword(String zkPassword) {
		this.zkPassword = zkPassword;
	}
	public List getIpBlackList() {
		return ipBlackList;
	}
	public void setIpBlackList(List ipBlackList) {
		this.ipBlackList = ipBlackList;
	}


	public List getTargetBean() {
		return targetBean;
	}


	public void setTargetBean(List targetBean) {
		this.targetBean = targetBean;
	}


	public List getTargetMethod() {
		return targetMethod;
	}


	public void setTargetMethod(List targetMethod) {
		this.targetMethod = targetMethod;
	}


	public List getCronExpression() {
		return cronExpression;
	}


	public void setCronExpression(List cronExpression) {
		this.cronExpression = cronExpression;
	}


	public List getStartTime() {
		return startTime;
	}


	public void setStartTime(List startTime) {
		this.startTime = startTime;
	}


	public List getPeriod() {
		return period;
	}


	public void setPeriod(List period) {
		this.period = period;
	}


	public List getDelay() {
		return delay;
	}


	public void setDelay(List delay) {
		this.delay = delay;
	}


	public List getParams() {
		return params;
	}


	public void setParams(List params) {
		this.params = params;
	}


	public List getType() {
		return type;
	}


	public void setType(List type) {
		this.type = type;
	}


	public List getExtKeySuffix() {
		return extKeySuffix;
	}


	public void setExtKeySuffix(List extKeySuffix) {
		this.extKeySuffix = extKeySuffix;
	}


	public List getBeforeMethod() {
		return beforeMethod;
	}


	public void setBeforeMethod(List beforeMethod) {
		this.beforeMethod = beforeMethod;
	}


	public List getAfterMethod() {
		return afterMethod;
	}


	public void setAfterMethod(List afterMethod) {
		this.afterMethod = afterMethod;
	}

	public List getThreadNum() {
		return threadNum;
	}


	public void setThreadNum(List threadNum) {
		this.threadNum = threadNum;
	}
}
折叠 

创建自动装配类LightJobAutoConfiguration.java

package com.itxs.lightjob.config;


import com.itxs.lightjob.ZKScheduleManager;
import com.itxs.lightjob.core.TaskDefine;
import com.itxs.lightjob.util.ScheduleUtil;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

import java.text.ParseException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

@Configuration
@EnableConfigurationProperties({LightJobProperties.class})
@ConditionalOnProperty(value = "light.job.enabled", havingValue = "true")
@ComponentScan()
public class LightJobAutoConfiguration {
	
	private static final Logger LOGGER = LoggerFactory.getLogger(LightJobAutoConfiguration.class);
	
	@Autowired
	private LightJobProperties uncodeScheduleConfig;
	
	@Bean(name = "zkScheduleManager", initMethod="init")
	public ZKScheduleManager commonMapper(){
		ZKScheduleManager zkScheduleManager = new ZKScheduleManager();
		zkScheduleManager.setZkConfig(uncodeScheduleConfig.getConfig());
		List list = initAllTask();
		zkScheduleManager.setInitTaskDefines(list);
		LOGGER.info("=====>ZKScheduleManager inited..");
		return zkScheduleManager;
	}
	
	private List initAllTask(){
		List list = new ArrayList();
		int total = 0;
		if(uncodeScheduleConfig.getTargetBean() != null){
			total = uncodeScheduleConfig.getTargetBean().size();
		}
		for(int i = 0; i < total; i++){
			TaskDefine taskDefine = new TaskDefine();
			if(uncodeScheduleConfig.getTargetBean() != null){
				 String value = uncodeScheduleConfig.getTargetBean().get(i);
				if(StringUtils.isNotBlank(value)){
					taskDefine.setTargetBean(value);
				}
			}
			if(uncodeScheduleConfig.getTargetMethod() != null){
				 String value = uncodeScheduleConfig.getTargetMethod().get(i);
				if(StringUtils.isNotBlank(value)){
					taskDefine.setTargetMethod(value);
				}
			}
			if(uncodeScheduleConfig.getCronExpression() != null){
				 String value = uncodeScheduleConfig.getCronExpression().get(i);
				if(StringUtils.isNotBlank(value)){
					taskDefine.setCronExpression(value);
				}
			}
			if(uncodeScheduleConfig.getStartTime() != null){
				 String value = uncodeScheduleConfig.getStartTime().get(i);
				if(StringUtils.isNotBlank(value)){
					Date time = null;
					try {
						time = ScheduleUtil.transferStringToDate(value);
					} catch (ParseException e) {
						e.printStackTrace();
					}
					if(time != null){
						taskDefine.setStartTime(time);
					}
				}
			}
			if(uncodeScheduleConfig.getPeriod() != null){
				 String value = uncodeScheduleConfig.getPeriod().get(i);
				if(StringUtils.isNotBlank(value)){
					taskDefine.setPeriod(Long.valueOf(value));
				}
			}
			if(uncodeScheduleConfig.getDelay() != null){
				 String value = uncodeScheduleConfig.getDelay().get(i);
				if(StringUtils.isNotBlank(value)){
					taskDefine.setDelay(Long.valueOf(value));
				}
			}
			
			if(uncodeScheduleConfig.getParams() != null){
				 String value = uncodeScheduleConfig.getParams().get(i);
				if(StringUtils.isNotBlank(value)){
					taskDefine.setParams(value);
				}
			}
			
			if(uncodeScheduleConfig.getType() != null){
				 String value = uncodeScheduleConfig.getType().get(i);
				if(StringUtils.isNotBlank(value)){
					taskDefine.setType(value);
				}
			}
			
			if(uncodeScheduleConfig.getExtKeySuffix() != null){
				 String value = uncodeScheduleConfig.getExtKeySuffix().get(i);
				if(StringUtils.isNotBlank(value)){
					taskDefine.setExtKeySuffix(value);
				}
			}
			if(uncodeScheduleConfig.getBeforeMethod() != null){
				 String value = uncodeScheduleConfig.getBeforeMethod().get(i);
				if(StringUtils.isNotBlank(value)){
					taskDefine.setBeforeMethod(value);
				}
			}
			if(uncodeScheduleConfig.getAfterMethod() != null){
				 String value = uncodeScheduleConfig.getAfterMethod().get(i);
				if(StringUtils.isNotBlank(value)){
					taskDefine.setAfterMethod(value);
				}
			}
			if(uncodeScheduleConfig.getThreadNum() != null){
				 String value = uncodeScheduleConfig.getThreadNum().get(i);
				if(StringUtils.isNotBlank(value)){
					taskDefine.setThreadNum(Integer.valueOf(value));
				}
			}
			list.add(taskDefine);
		}
		return list;
	}
}
折叠 

然后在resources目录下的META-INF目录下创建spring.factories文件,跟SpringBoot其他starter一样,输出自动装配类的全类名;springboot项目默认只会扫描本项目下的带@Configuration注解的类,如果自定义starter,不在本工程中,是无法加载的,所以要配置META-INF/spring.factories配置文件。配置了META-INF/spring.factories配置文件是springboot实现starter的关键点,springboot的这种配置加载方式是一种类SPI(Service Provider Interface)的方式,SPI可以在META-INF/services配置接口扩展的实现类,springboot中原理类似,只是名称换成了spring.factories而已。

# Auto Configure
org.springframework.boot.autoconfigure.EnableAutoConfiguration=
com.itxs.lightjob.config.LightJobAutoConfiguration

其他还有自动装配类的具体实现代码文件,如下面目录,主要利用zookeeper做分布式协调如分布式选主,执行maven install打包和安装到本地maven仓库。

SpringBoot自定义starter开发分布式任务调度实践

light-job-spring-boot-starter

light-job-spring-boot-starter是不做实现,主要管理依赖,Pom文件内容如下

<?xml version="1.0" encoding="UTF-8"?>

    4.0.0

    com.itxs
    light-job-spring-boot-starter
    1.0
    jar

    
        8
        8
    
    
        
            com.itxs
            light-job-spring-boot-starter-autoconfigure
            1.0
        
    

最后我们执行maven install打包和安装到本地maven仓库。

调用示例

示例工程中加入light-job-spring-boot-starter依赖,这里选择前面文章示例的库存微服务模块中添加

        
            com.itxs
            light-job-spring-boot-starter
            1.0
        

创建演示任务并放到Spring容器里管理

package cn.itxs.ecom.storage.job;

import org.springframework.stereotype.Component;

@Component
public class DemoTask {

    public void execute() {
        System.out.println("===========execute start!=========");
        System.out.println("===========do job!=========");
        System.out.println("===========execute end !=========");
    }
}

配置文件增加

light:
  job:
    enabled: true
    zk-connect: 192.168.4.27:2181,192.168.4.28:2181,192.168.4.29:2181
    root-path: /ecom/storage
    zk-session-timeout: 60000
    target-bean:
      - demoTask
    target-method:
      - execute
    period:
      - 1000
    cron-expression:
      - 0/10 * * * * ?

启动三个库存微服务模块,在第1个库存微服务模块看到demoTask任务已经根据配置每十秒在运行

SpringBoot自定义starter开发分布式任务调度实践

关闭第1个库存微服务模块程序后,通过zookeeper重新选举一个节点定时执行,从下面看选择第3个库存微服务模块每十秒实行

SpringBoot自定义starter开发分布式任务调度实践

Redis读取配置赋值lightjob

zookeeper地址配置可以放到配置中心如Nacos,如果目前我们配置数据是放在Redis中,可以通过System.setProperty设置系统变量的方式来实现,先注释zk-connect的配置,这是启动程序就会报错

SpringBoot自定义starter开发分布式任务调度实践

RedisConfig配置类中增加实现BeanPostProcessor接口实现其postProcessAfterInitialization方法,在bean初始化后读取redis值设置环境变量值。

package cn.itxs.ecom.storage.config;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
@Slf4j
public class RedisConfig implements BeanPostProcessor{
    @Bean
    public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate template = new RedisTemplate();
        template.setConnectionFactory(redisConnectionFactory);
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        //om.activateDefaultTyping(LaissezFaireSubTypeValidator.instance , ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        // key采用String的序列化方式
        template.setKeySerializer(stringRedisSerializer);
        // hash的key也采用String的序列化方式
        template.setHashKeySerializer(stringRedisSerializer);
        // value序列化方式采用jackson
        //template.setValueSerializer(jackson2JsonRedisSerializer);
        template.setValueSerializer(stringRedisSerializer);
        // hash的value序列化方式采用jackson
        //template.setHashValueSerializer(jackson2JsonRedisSerializer);
        template.setHashValueSerializer(stringRedisSerializer);
        template.afterPropertiesSet();
        return template;
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException
    {
        //在redisTemplate Bean初始化之后设置light.job.zk-connect为公共集群的zk地址
        if (beanName.equals("redisTemplate")){
            log.info("postProcessAfterInitialization match beanName {}",beanName);
            try {
                RedisTemplate redisObj = (RedisTemplate) bean;
                String zkConnect = (String)redisObj.opsForHash().get("clusterinfo", "zookeeper-server");
                if (StringUtils.isNotBlank(zkConnect)) {
                    log.info("postProcessAfterInitialization get zkConnect ={}", zkConnect);
                    System.setProperty("light.job.zk-connect", zkConnect);
                    log.info("System.setProperty light.job.zk-connect={}", zkConnect);
                }
            } catch (Exception e) {
                log.error("postProcessAfterInitialization operate redisTemplate {} failed", e);
            }
        }
        return null;
    }
}
折叠 

启动后可以看到正常每十秒执行定时任务

文章来自https://www.cnblogs.com/itxiaoshen/p/16456694.html

展开阅读全文

页面更新:2024-05-17

标签:分布式   上下文   示例   初始化   应用程序   模块   库存   方式   文件   项目

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号

Top