在分布式系统中,日常开发维护中,我们经常涉及到调用外部接口或者通过RPC去访问其他的业务系统。在这个过程中经常会碰到这样的问题:被调用的第三方或者外部接口的稳定性存在问题,经常会出现请求不通或者超时等现象,造成这种现象的原因可能是由于网络波动或者是系统升级维护导致短暂的不可用。这些可能会导致我们自身的系统处于一种不稳定的状态,不仅会在测试跟产品那经常被抱怨系统地实现问题,还会影响我们自身业务的正常进行。
诸如此类的场景:发送消息失败、调用远程服务失败、争抢锁失败等;
重试机制可以保护系统减少因网络波动、依赖服务短暂性不可用所带来的影响,让系统能更稳定运行的一种保护机制。所以在有些必须的业务中重试机制实用又有效。
当然,在引入重试机制的时候我们需要进行如下几个问题的考虑:
1.重试几次比较合适
通常来说我们单次重试所面临的的结果是有很大的不确定性的,那么到底需要进行多少次重试才是最合理的呢?这个就需要根据具体的业务具体进行分析了,看业务的重要性以及异常报警的等级划分处理及时度。但是一般来说3次重试就基本可以满足大多数的业务需求了,当然这个也需要结合重试间隔进行一起考量。为什么说3次就基本可以说足够了呢,因为如果系统处在长时间不可用的状态下,我们重试多少次都是没有实际意义的,反而徒增系统的压力。
2.每次重试的间隔设置多少合适
如果重试间隔设置得太小,可能会造成这样的情况:被调用的系统还没来得及恢复我们就已经发起了调用,那么得到的结果肯定还是失败,这样相当于快速调用并失败了N次,没有实际意义;
如果重试间隔设置得太大,可能会造成这样的情况:牺牲掉了不少数据的时效性;
所以,重试间隔的设置要根据被调用系统的平均恢复时间来去正确的估量,通常来说这个平均恢复时间如果没有完备的大数据分析系统是很难统计到,所以一般这个值就需要根据经验(一般的经验值3-5min)来进行设置并根据实际情况去不断地修正。
3.重试机会用完之后还是失败应该怎么办
当设置的重试次数全部用完之后系统仍然返回失败,此时此处业务相当于中断,这时候就需要采用一定的补偿措施,以保证系统流程的正常进行,保证数据的准确性时效性。
spring-retry是spring自身提供的一种重试机制,可以帮助我们以标准的方式处理特定操作的重试。在spring-retry中,所有配置都是基于简单注释的。
/**
* value:抛出指定异常才会重试
* include:和value一样,默认为空,当exclude也为空时,默认所有异常
* exclude:指定不处理得异常
* maxAttempts:最大重试次数,默认3次
* backoff:重试等待策略,
* 默认使用@Backoff,@Backoff的value默认为1000L,我们设置为2000; 以毫秒为单位的延迟(默认 1000)
* multiplier(指定延迟倍数)默认为0,表示固定暂停1秒后进行重试,如果把multiplier设置为1.5,则第一次重试为2秒,第二次为3秒,第三次为4.5秒。
* Spring-Retry还提供了@Recover注解,用于@Retryable重试失败后处理方法。
* 如果不需要回调方法,可以直接不写回调方法,那么实现的效果是,重试次数完了后,如果还是没成功没符合业务判断,就抛出异常。
* 可以看到传参里面写的是 Exception e,这个是作为回调的接头暗号(重试次数用完了,还是失败,我们抛出这个Exception e通知触发这个回调方法)。
* 注意事项:
* 方法的返回值必须与@Retryable方法一致
* 方法的第一个参数,必须是Throwable类型的,建议是与@Retryable配置的异常一致,其他的参数,需要哪个参数,写进去就可以了(@Recover方法中有的)
* 该回调方法与重试方法写在同一个实现类里面
*
* 由于是基于AOP实现,所以不支持类里自调用方法
* 如果重试失败需要给@Recover注解的方法做后续处理,那这个重试的方法不能有返回值,只能是void
* 方法内不能使用try catch,只能往外抛异常
* @Recover注解来开启重试失败后调用的方法(注意,需跟重处理方法在同一个类中),此注解注释的方法参数一定要是@Retryable抛出的异常,否则无法识别,可以在该方法中进行日志处理。
*/
spring-retry工具虽然是基于注解的并且很优雅实现重试,但是存在几个不友好的设计:
--可以设置任何任务单次执行的时间限制,如果超时则抛出异常;
--可以设置重试监听器,用来执行额外的处理工作
--可以设置任务阻塞策略,即可设置当前重试完成,下次重试开始前的这段时间做什么事情
--可以通过停止重试策略和等待策略结合使用来设置更加灵活的策略,比如指数等待时长并最多10次调用,随机等待时长并且永不停止等待
自己造轮子:使用AOP来为目标设置切面,即可在目标调用的前后添加一些额外的逻辑。
以kafka为例:
基本的过程如下:
重试主题(retry-topics)带来的问题以及思考
但是:如果被代理的类没有其他的依赖类,直接创建不成问题;如果被代理的类依赖了其他被spring容器 管理的类,则这种方式就会抛出异常,因为没有把被代理的实例注入到创建的代理实例中。这种情况 下,就比较复杂了,需要从Spring容器中获取已经装配好的,需要被代理的实例,然后为其创建代 理类实例,并交给Spring容器来管理,这样就不用每次都重新创建新的代理类实例了。
同时还要考虑容器中的bean类型是Singleton还是Prototype,如果是Singleton则像上面这样进行 操作,如果是Prototype则每次都新建代理类对象。
另外,这里使用的是JDK动态代理,因此就存在一个天然的缺陷,如果想要被代理的类,没有实现 任何接口,那么就无法为其创建代理对象,这种方式就行不通了。
服务端Svr、数据采集中间件Svr、车端Svr进行指令的交互
2.重试需求
3.解决方案
采用redis进行指令、指令索引、远程启动指令、指令重试次数的记录;采用naocs配置指令重试的最大次数; 采用xxl-job进行设置指令重试的间隔时间;
指令缓存(String类型):key:dispatch:vehicle:retry:instruction:instructionId(指令id) value:json字符串(此次下发的指令json)
指令的重试次数(String类型):key:dispatch:vehicle:retry:count:instructionId value:0(默认为0次)
【远程启动】指令缓存(Set类型):key:dispatch:vehicle:retry:remotestart value:instructionId
注:此两处缓存在消息返回成功后或者是重试N次之后进行删除;缓存的默认次数采用nacos进行配置,重试的执行间隔时间采用xxl-job进行设置,可以是每分钟一次或者是间隔的倍数等形式。
消息指令索引集合(Set类型):key:dispatch:vehicle:retry:failmsgindex value:instructionId
获取缓存指令索引的集合,判断该set集合是否为空,如果为空,则【没有需要进行重试的指令】;如果set集合不为空,则遍历集合中的每一个值。
由于是分布式系统,此处需要采用分布式锁,我们在此采用redission架构来进行加锁。至此还需对具体的业务进行判断:有一个特殊的指令为【远程启动】指令,由于此指令在下发还未创建任务,所以无法与任务进行关联,所以如果是这个指令就直接进行重试,如果不是则需要对任务的状态进行判断,我们只对正在执行的任务进行重试,已经完成、暂停的、废弃的任务不进行指令的重试
// 核心代码:
@XxlJob("doRetryHandle")
public void doRetryHandle() {
log.info("...[消息重试]正在进行消息重试...");
// 获取执行结果为失败的消息索引
Set msgSet = commonHandle.getRedisFailMsgIndex();
log.info("...[消息重试]需要进行消息重试的条数为:{},消息索引为:{}", msgSet.size(), msgSet);
if (EmptyUtil.isEmpty(msgSet)) {
log.info("[没有需要进行重试的指令]:doRetryHandle:[{}]", JSON.toJSONString(msgSet));
} else {
msgSet.forEach(instructionId -> {
RLock lock = redissonClient.getLock(DispatchRedisConstants.DISPATCH_MSG_RETRY_LOCK + instructionId);
log.info("...[消息重试]获取到锁{}", DispatchRedisConstants.DISPATCH_MSG_RETRY_LOCK + instructionId);
try {
lock.lock();
//ADD:前置条件,只对执行中的任务进行重试;若下发指令时,验证任务为【作废/完成/暂停=非执行中】状态,则删除该指令重试的缓存,同时更新该指令状态为失败
// todo:问题:1.远程启动命令时,尚未进行创建任务关联 2.需要做指令跟任务关联的缓存(同时注意删除时机)
// 判断指令是否为远程启动的指令,如果是直接进行重试,如果不是则进行判断任务的状态
if (commonHandle.getRedisRetryRemoteStart(instructionId)) {
doRetryExcute(instructionId);
} else {
String dispatchId = commonHandle.getRedisInstrctionIdDispatchId(instructionId);
log.info("...[消息重试]根据指令id:{}获取到该指令所属的任务id:{}", instructionId, dispatchId);
if (EmptyUtil.isNotEmpty(dispatchId)) {
// 获取任务的当前状态
DispatchTask dispatchTask = dispatchTaskRepository.getById(dispatchId);
log.info("...[消息重试]消息为:{},任务状态为{}", JSONObject.toJSONString(dispatchTask), dispatchTask.getTaskState());
if (EmptyUtil.isNotEmpty(dispatchTask)) {
//只针对执行状态的任务进行重试
if (dispatchTask.getTaskState().equals(EnumTaskState.EXECUTING.getCode())) {
log.info("...[消息重试]正在进行消息重试,任务状态为:{}", EnumTaskState.EXECUTING.getDesc());
//执行重试
doRetryExcute(instructionId);
} else {
// 非执行状态:则删除该指令重试的缓存,同时更新该指令状态为失败
log.info("...[消息重试]任务状态为:{},不是执行中的任务不进行重试", dispatchTask.getTaskState());
commonHandle.doRetryFailExcute(instructionId, OperateExecuteResultEnum.FAIL.getCode());
}
} else {
log.info("...[消息重试]查询不到任务,不进行重试", dispatchTask.getTaskState());
// 查询不到任务
commonHandle.doRetryFailExcute(instructionId, OperateExecuteResultEnum.FAIL.getCode());
}
}
}
} catch (Exception e) {
log.error("doRetryHandle消息重试异常:{}", e.getMessage());
e.printStackTrace();
} finally {
lock.unlock();
}
});
}
}
/**
* 执行重试逻辑
*/
private void doRetryExcute(String instructionId) {
// 根据指令索引获取指令已经进行的重试次数
String stringCount = commonHandle.getRedisRetryCount(instructionId);
if (EmptyUtil.isNotEmpty(stringCount)) {
Integer retryCount = Integer.valueOf(stringCount);
if (retryCount >= RETRY_COUNT) {
log.info("...[消息重试]doRetryExcute:重试次数为{},不再进行重试", retryCount);
// 删除缓存:指令索引缓存、指令缓存、指令重试次数缓存
commonHandle.deleteRetryCaches(instructionId);
return;
} else {
// 根据指令索引获取需要进行重试的消息
String messageJson = commonHandle.getRedisInstrction(instructionId);
if (EmptyUtil.isNotEmpty(messageJson)) {
log.info("...[消息重试]doRetryExcute:需要进行重试的消息为{}", messageJson);
// 发送重试消息
kafkaSendMessage.retrySendMessage(messageJson);
// 重试次数+1
commonHandle.addRedisRetryCount(instructionId, String.valueOf(retryCount + 1));
// 写日志文件(暂时:正在进行第N次重试,**秒后进行第N+1次重试)
IntructionLogUpdateResultRequestDTO intructionLogUpdateResultRequestDTO = new IntructionLogUpdateResultRequestDTO()
.setOperateId(Long.parseLong(instructionId))
.setExecuteResult(OperateExecuteResultEnum.RERTY.getCode())
.setLogContext("进行第" + (retryCount + 1) + "次重试");
dispatchTaskOperateLogRepository.ModifyResultByInstructionId(intructionLogUpdateResultRequestDTO);
}
}
}
}
总结:没有更好的方案,只有最合适的方案,一定要根据自己的业务进行选择并不断的进行优化。
欢迎关注公众号:
页面更新:2024-03-20
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号