除了RPC基础功能为,RPC还具备八大功能模块
Consumer:连接管理、负载均衡、请求路由、超时处理
Provider:队列/线程池、超时丢弃、优雅关闭、过载保护
完整的consumer功能模块图
将不同类型的请求,放入各自的队列,每个队列分配独立的线程池,进行资源隔离. 这里的Thread Pool并非指java中的线程池对象,这里是真正的线程池。
Provider 作为服务提供方,对调用方的信息是无感知的,随业务发展可能会有N个consumer对provider进行调用,工作线程有限,因而需要设计数据结构(队列)对请求进行存放。
数据结构设计分析:
线程分配的合理数:
线程数 = CPU核数 * 2 + 2;线程过多会造成线程等待和上下文切换的资源开销;线程太少又无法充分利用cpu资源。
这里就不提供压测截图了,关于压测有兴趣可以百度自行了解,或者找测试同学沟通
QPS 队列线程 | 2w+ | 5w+ | 8w+ | 11w+
在QPS 8w+ 时 单队列多线程模型的耗时明显比多队列单线程高
快速失败已超时的请求,缓解队列的压力
入队代码
publick void run(Group group, IAsyncHandler handler){
//构造 AsyncTask 对象放入队列
AsyncTask task = new AsyncTask(taskTimeout,handler);
balance(group,task);
}
//异步任务构造方法
public AsyncTask(int timeout,IAsyncHandler handler){
super();
if(timeout < 0){
timeout = 1000;
}
this.timeout = timeout;//超时时间
this.handler = handler;
this.addTime = System.currentTimeMillis();//入队时间
}
出队代码
public void run() {
while(!isStop){
AsyncTask task = null;
try {
task = taskQueue.poll(1500,TimeUnit.MILLISECONDS);//取出队列任务
if(null != task){
execTimeoutTask(task);//判断任务是否超时
}
}catch (InterruptedException e){
logger.error("has error!",e);
}catch (Throwable ex){
if(task !=null ){
task.getHandler().exceptionCaught(ex);
}
}
}
}
public void execTimeoutTask(AsyncTask task) throw {
//当前时间减去入队时间是否大于超时时间 超时回调超时
if(System.currentTimeMillis() - task.getAddTime() > task.getTimeout()){
task.getHandler().exceptionCaught(new TimeoutException(threadFactoryName + "async task timeout!"));
return;
} else {
//没有超时回调处理任务
Object obj = task.getHandler().run();
task.getHandler().messageReceived();
}
}
对于consumer 而言,一个请求调用的超时时间 = 请求队列排队时间(主要耗时) + 程序处理时间
进程结束前应确保队列中的的请求全部处理完成,这路的处理完成是直接回复 并非处理逻辑。
如何通知调用方?
@Component
public class SignalConfig {
@Autowired
private SignalRegistry signalRegistry
public void init() { signalRegistry.register(); }
}
//监听关闭信号
public void register() {
try {
if(StringUtils.isNotBlank(osName) && (!isMac() && !isWindows())) {
Signal sig = new Signal("USR2");//这里的USR2 其实就是 -12 代表的是用户自定义信号
//这表示当用户通过 kill -12 来 结束某个进程时 回调 operateSignalHandler 函数进行处理
Signal.handle(sig, operateSignalHandler);
Signal sig2 = new Signal("STKFLT");
Signal.handle(sig2, breakHeartbeatSignal);
}
} catch ( Exception e ) {
logger.error("------------ signal register failed ! --------------",e)
}
}
//改变服务状态
@Component
public class OperateSignal implements SignalHandler {
private static Logger logger = LoggerFactory.getLogger(OperateSignal.class);
@Autowired
private RpcContext rpcContetx;
@Override
public void handle(Singal singalName){
logger.info("server : {} current state is : {}",new Object[] {rpcContext.getServiceName(), rpcContext.getServerState()});
//设置当前服务状态为重启
rpcContext.setServerState(ServerStateType.Reboot);
logger.info("server : {} will reboot !", new Object[]{rpcContext.getServiceName()});
}
}
//超时处理判断请求没有超时时调用 service方法处理请求
public Object innerInvoke(RpcContext rpcContext, MethodSignature methodSignature) throw Exception {
//这里使用责任链模式维护了RPC上下文对象
requestFilter(context);
Object response = null;
RPCContext.setThreadLocal(context);
..... 省略.....
context.getResponseProtocol().setSdpEntity(response);
responseFilter(context);
RpcContext.clear();
return context;
}
protected void requestFilter(RpcContext context) throws Excpetion {
logger.debug("begin requestFilters");
for(IFilter filter : requestFilters){
if(context.getExecFilter() == ExecFilterType.ALL ||
context.getExecFilter() == ExecFilterType.RequestOnly){
filter.filter(context);
}
logger.debug("end requestFilters");
}
}
public void filter(RpcContext context) throws Exception {
if(serviceContext.getServerState) == ServerStateType.Reboot &&
(protocol.getPlatformType() == PlatformType.Java||protocol.getPlatformType() == PlatformType.PHP)) {
//封装RPC响应
RpcResponse response = new RpcResponse();
ResetProtocol rp = new ResetProtocol();
rp.setMsg("This server is reboot!");
responseProtocol.setSdpEntity(rp);
response.setResponseBuffer(responseProtocol.toBytes(false,null));
context.setRpcResponse(response);
context.setExecFilter(ExecFilterType.None);
//不再继续过滤
context.setDoInvoke(false);
}
}
程序启动后初始化,初始化时 初始化容器、初始化插件、初始化监听信号(重要)、初始化服务
解释: 通常结束进程我们用 kill -9 命令 这里的 9 其实就是一种信号量,用户可以自定义信号量发送给操作系统,可以自定义实现
public Protocol request(Protocol requestProtocol) throws Exception {
//判断服务节点状态
if(ServerState.Reboot == state || ServerState.Dead == state) {
throw new RebootException();
};
....
//注册窗口事件
socket.registerRec(requestProtocol.getSessionID());
//异步发送请求
socket.send(data);
....
//接收数据,等待数据到达事件
byte[] buffer = socket.receive(requestProtocol.getSessionID().currUserCount);
Protocol receiveProtocol = Protocol.fromBytes(buffer,socket.isRights(),socket.getDESKey());
//接收回包首先判断类型是否是重启
SDPType sdpType = receiveProtocol.getSDPType();
if(sdpType == SDPType.Reset){
//如果是重启 服务节点置为不可用
this.asReboot();
logger.info("server: [{}], address : [{}] was rebooted, will choose another one !",
new Object[]{this.getName(),this.getAddress()});
//抛出服务重启异常
throw new RebootException();
}
}
设置服务器为重启状态
public void asReboot() {
if(ServerState.Reboot != this.getState()) {
this.setState(ServerState.Reboot);
this.setDeadTime(System.currentTimeMillis());
this.setWeight(-1);
this.getSocketPool().destroy();
ServerStateDetector.instance().add(this);
logger.debug("this server is reboot! host: "+ this.getAddress());
}
}
服务提供方为保证正常运行,主动丢弃超出处理能力的请求。
这里的主动丢弃与超时丢弃不同。每个请求入队时判断队列中的请求数是否达到阈值,超过则直接丢弃。
入队任务代码
publick void run(Group group, IAsyncHandler handler){
//构造 AsyncTask 对象放入队列
AsyncTask task = new AsyncTask(taskTimeout,handler);
balance(group,task);
}
public void balance(Group group,AsyncTask task) {
if(group == Group.HIGH) {
balanceTask(highFactor.getAndIncrement(), groupHighWorkers, task);
}else if(group == Group.DEFAULT){
balanceTask(defaultFactor.getAndIncrement(), defaultWorkers, task);
}
}
public void balanceTask(int factor, AsyncWorker[] workers, AsyncTask task) {
int idx = factor % workers.length;
//如果预设值大于0
if(limitSize>0){
workers[idx].addTask(task,limitSize,mode);
}else{
worker[idx].addTask(task);
}
}
void addTask(AsyncTask task,int limitSize,boolean abortNewTask) {
//如果请求队列长度超出了预设值 抛弃新来的任务
if(this.taskQueue.size() >= limitSize){
if(abortNewTask) {
task.getHandler().exceptionCaught(new TimeoutException(threadFactoryName +
"abort this task, because the queue is full!"));
}else{
elimintateOldTask(task);
}
}else{
this.queue.offer(task);
}
}
页面更新:2024-04-25
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号