基础架构组件篇之RPC

RPC 基础功能

RPC 产品功能

除了RPC基础功能为,RPC还具备八大功能模块
Consumer:连接管理、负载均衡、请求路由、超时处理
Provider:队列/线程池、超时丢弃、优雅关闭、过载保护

Consumer 核心设计

完整的consumer功能模块图

Provider 核心设计

队列/线程池

将不同类型的请求,放入各自的队列,每个队列分配独立的线程池,进行资源隔离. 这里的Thread Pool并非指java中的线程池对象,这里是真正的线程池。

Provider 作为服务提供方,对调用方的信息是无感知的,随业务发展可能会有N个consumer对provider进行调用,工作线程有限,因而需要设计数据结构(队列)对请求进行存放。
数据结构设计分析:

线程分配的合理数:
线程数 = CPU核数 * 2 + 2;线程过多会造成线程等待和上下文切换的资源开销;线程太少又无法充分利用cpu资源。

这里就不提供压测截图了,关于压测有兴趣可以百度自行了解,或者找测试同学沟通
QPS 队列线程 | 2w+ | 5w+ | 8w+ | 11w+

在QPS 8w+ 时 单队列多线程模型的耗时明显比多队列单线程高

超时丢弃

快速失败已超时的请求,缓解队列的压力

  1. IO线程处理反序列化后将请求放入队列(req Queue)

入队代码

publick void run(Group group, IAsyncHandler handler){
    //构造 AsyncTask 对象放入队列
    AsyncTask task = new AsyncTask(taskTimeout,handler);
    balance(group,task);
}

//异步任务构造方法
public AsyncTask(int timeout,IAsyncHandler handler){
    super();
    if(timeout < 0){
      timeout = 1000;
    }
    this.timeout = timeout;//超时时间
    this.handler = handler;
    this.addTime = System.currentTimeMillis();//入队时间
}
  1. 当线程池中工作线程从队列中取出请求进行处理时,判断当前请求是否超时。超时则丢弃,未超时则调用服务处理

出队代码

public void run() {
    while(!isStop){
        AsyncTask task  = null;
        try {
            task = taskQueue.poll(1500,TimeUnit.MILLISECONDS);//取出队列任务
            if(null != task){
                execTimeoutTask(task);//判断任务是否超时
            }
        }catch (InterruptedException e){
            logger.error("has error!",e);
        }catch (Throwable ex){
            if(task !=null ){
                task.getHandler().exceptionCaught(ex);
            }
        }
    }
}

public void execTimeoutTask(AsyncTask task) throw {
    //当前时间减去入队时间是否大于超时时间 超时回调超时
    if(System.currentTimeMillis() - task.getAddTime() > task.getTimeout()){
        task.getHandler().exceptionCaught(new TimeoutException(threadFactoryName + "async task timeout!"));
        return;
    } else {
       //没有超时回调处理任务
      Object obj = task.getHandler().run();
      task.getHandler().messageReceived();
    }
}

对于consumer 而言,一个请求调用的超时时间 = 请求队列排队时间(主要耗时) + 程序处理时间

优雅关闭

进程结束前应确保队列中的的请求全部处理完成,这路的处理完成是直接回复 并非处理逻辑。
如何通知调用方?

优雅关闭Server端实现

@Component
 public class SignalConfig {
     @Autowired
     private SignalRegistry signalRegistry
     public void init() { signalRegistry.register(); }
 }
 //监听关闭信号
 public void register() {
     try {
         if(StringUtils.isNotBlank(osName) && (!isMac() && !isWindows())) {
             Signal sig = new Signal("USR2");//这里的USR2 其实就是 -12 代表的是用户自定义信号
             //这表示当用户通过 kill -12 来 结束某个进程时 回调  operateSignalHandler 函数进行处理
             Signal.handle(sig, operateSignalHandler);
             
             Signal sig2 = new Signal("STKFLT");
             Signal.handle(sig2, breakHeartbeatSignal);
         }
     } catch ( Exception e ) {
         logger.error("------------ signal register failed ! --------------",e)
     }
 }
 //改变服务状态
 @Component
 public class OperateSignal implements SignalHandler {
     private static Logger logger = LoggerFactory.getLogger(OperateSignal.class);
     @Autowired
     private RpcContext rpcContetx;
     @Override
     public void handle(Singal singalName){
         logger.info("server : {} current state is : {}",new Object[] {rpcContext.getServiceName(), rpcContext.getServerState()});
         //设置当前服务状态为重启
         rpcContext.setServerState(ServerStateType.Reboot);
         logger.info("server : {} will reboot !", new Object[]{rpcContext.getServiceName()});
     }
 }
//超时处理判断请求没有超时时调用 service方法处理请求
public Object innerInvoke(RpcContext rpcContext, MethodSignature methodSignature) throw Exception {
    //这里使用责任链模式维护了RPC上下文对象
    requestFilter(context);
    Object response = null;
    RPCContext.setThreadLocal(context);
    ..... 省略.....
    context.getResponseProtocol().setSdpEntity(response);
    responseFilter(context);
    RpcContext.clear();
    return context;
}

protected void requestFilter(RpcContext context) throws Excpetion {
 logger.debug("begin requestFilters");
 for(IFilter filter : requestFilters){
     if(context.getExecFilter() == ExecFilterType.ALL ||
        context.getExecFilter() == ExecFilterType.RequestOnly){
        filter.filter(context);
     }
     logger.debug("end requestFilters");
 }
}

public void filter(RpcContext context) throws Exception {
    if(serviceContext.getServerState) == ServerStateType.Reboot &&
    (protocol.getPlatformType() == PlatformType.Java||protocol.getPlatformType() == PlatformType.PHP)) {
        //封装RPC响应
        RpcResponse response  = new RpcResponse();
        ResetProtocol rp = new ResetProtocol();
        rp.setMsg("This server is reboot!");
        responseProtocol.setSdpEntity(rp);
        response.setResponseBuffer(responseProtocol.toBytes(false,null));
        context.setRpcResponse(response);
        context.setExecFilter(ExecFilterType.None);
        //不再继续过滤
        context.setDoInvoke(false);
    }
}

程序启动后初始化,初始化时 初始化容器、初始化插件、初始化监听信号(重要)、初始化服务
解释: 通常结束进程我们用 kill -9 命令 这里的 9 其实就是一种信号量,用户可以自定义信号量发送给操作系统,可以自定义实现

优雅关闭client端实现

public Protocol request(Protocol requestProtocol) throws Exception {
    //判断服务节点状态
    if(ServerState.Reboot == state || ServerState.Dead == state) {
        throw new RebootException();
    };
    ....
    //注册窗口事件
    socket.registerRec(requestProtocol.getSessionID());
    //异步发送请求
    socket.send(data);
    ....
    
    //接收数据,等待数据到达事件
    byte[] buffer = socket.receive(requestProtocol.getSessionID().currUserCount);
    Protocol receiveProtocol = Protocol.fromBytes(buffer,socket.isRights(),socket.getDESKey());
    
    //接收回包首先判断类型是否是重启
    SDPType sdpType = receiveProtocol.getSDPType();
    if(sdpType == SDPType.Reset){
        //如果是重启 服务节点置为不可用
        this.asReboot();
        logger.info("server: [{}], address : [{}] was rebooted, will choose another one !",
        new Object[]{this.getName(),this.getAddress()});
        //抛出服务重启异常
        throw new RebootException();
    }
}

设置服务器为重启状态

public void asReboot() {
    if(ServerState.Reboot != this.getState()) {
        this.setState(ServerState.Reboot);
        this.setDeadTime(System.currentTimeMillis());
        this.setWeight(-1);
        this.getSocketPool().destroy();
        ServerStateDetector.instance().add(this);
        logger.debug("this server is reboot! host: "+ this.getAddress());
    }
}

过载保护

服务提供方为保证正常运行,主动丢弃超出处理能力的请求。
这里的主动丢弃与超时丢弃不同。每个请求入队时判断队列中的请求数是否达到阈值,超过则直接丢弃。

入队任务代码

publick void run(Group group, IAsyncHandler handler){
    //构造 AsyncTask 对象放入队列
    AsyncTask task = new AsyncTask(taskTimeout,handler);
    balance(group,task);
}
public void balance(Group group,AsyncTask task) {
    if(group == Group.HIGH) {
        balanceTask(highFactor.getAndIncrement(), groupHighWorkers, task);
    }else if(group == Group.DEFAULT){
        balanceTask(defaultFactor.getAndIncrement(), defaultWorkers, task);
    }
}
public void balanceTask(int factor, AsyncWorker[] workers, AsyncTask task) {
    int idx = factor % workers.length;
    //如果预设值大于0
    if(limitSize>0){
        workers[idx].addTask(task,limitSize,mode);
    }else{
        worker[idx].addTask(task);
    }
}
void addTask(AsyncTask task,int limitSize,boolean abortNewTask) {
    //如果请求队列长度超出了预设值  抛弃新来的任务
    if(this.taskQueue.size() >= limitSize){
        if(abortNewTask) {
            task.getHandler().exceptionCaught(new TimeoutException(threadFactoryName + 
            "abort this task, because the queue is full!"));
        }else{
            elimintateOldTask(task);
        }
    }else{
        this.queue.offer(task);
    }
}
展开阅读全文

页面更新:2024-04-25

标签:数据结构   队列   节点   初始化   线程   架构   组件   实例   状态   通知   基础   数据   资源

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号

Top