一篇文章玩转RPC通信原理,并使用Netty实现一个PRC(精华简版)

1.什么是RPC

RPC一般指远程过程调用。 RPC是远程过程调用(Remote Procedure Call)的缩写形式。 首先看下服务的演变过程:

接口请求也在慢慢演变:

总体而言就是随着服务的增多,也伴随着服务之间的调用频繁和繁琐,这就有了PRC这代名词。

PRC普通应用在分布式架构中,先看下分布式服务派系

RPC的核心职能,以dubbo图解为例


这个机制现在用的很广泛了,例如cloud中的注册中心和配置中心。 大概了解一下理论后,接下来我们用代码来实操,以便更深入的认识PRC。

2.Netty实现一个RPC

2.1 原理概述

2.2 pom.xml依赖

基于springboot 2.5.6版本,额外引入lombok和fastjson

//netty依赖

 io.netty
 netty-all
 4.1.42.Final

2.3 api jar包

自定义注解,api目录为待发布的API接口,protocol为公用的协议和工具包


2.4. 客户端架构

2.4.1 rpc目录下为公用代码,可以单独抽离的



2.4.2 Controller代码

//注意这两个声明,并没有加@Autowired或@Resource
@RpcReference 
HellService hellService;
@RpcReference
OrderService orderService;

@GetMapping("/hello")
public String hello(@RequestParam String orderId) {
    return orderService.getOrder(orderId);
}

@GetMapping("/add")
public int add(@RequestParam Integer a, @RequestParam Integer b) {
    return hellService.add(a, b);
}

PS说明:上面的两个声明没有加@Autowired或@Resource,所以spring容器在注入的时候不会处理这里两个,本文使用的是反射注入。如果想交由spring处理可以参考mybatis第九话 - 手写实现一个简单的mybatis版本中的Mapper接口注入原理

2.4.3 核心动态代理处理类RpcBeanPostProcessor

    //该类为初始化类之后的回调 还没到注入阶段
    //因此在这里接收环境的回调,读取RPC的配置传递到代理类中
    Environment environment;
    //注册之前 设置坏境变量
    @Override
    public void setEnvironment(Environment environment) {
        this.environment = environment;
    }
//可以在bean初始化之前后返回继承类或者代理类,aop就是典型的例子
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
    Class<?> clazz = bean.getClass();
    //遍历所有的声明
    for (Field field : clazz.getDeclaredFields()) {
     //如果包含这个注解就创建代理类,并用反射注入
        if (field.isAnnotationPresent(RpcReference.class)) {
            Object instance;
            String beanClassName = field.getType().getName();
            try {
             //单例缓存
                if (cacheProxyMap.containsKey(beanClassName)) {
                    instance = cacheProxyMap.get(beanClassName);
                } else {
                    //根据不同的服务名称参数传递不同的rpc调用地址
                    RpcReference annotation = field.getAnnotation(RpcReference.class);
                    //生成动态代理
                    instance = Proxy.newProxyInstance(
                            field.getType().getClassLoader(),
                            new Class[]{field.getType()},
                            //可以配置注解参数以获取不同的RPC连接配置
                            new ProxyHandler(bean, beanClassName,
                                    this.environment.getProperty(annotation.name() + ".rpcHost"),
                                    Integer.valueOf(this.environment.getProperty(annotation.name() + ".rpcPort"))));
                }
                log.info("create proxy bean:{}", beanClassName);
                //反射注入
                field.setAccessible(true);
                field.set(bean, instance);
                cacheProxyMap.put(field.getType().getName(), instance);
            } catch (IllegalAccessException e) {
                log.error("create bean error,beanClassName {}", beanClassName);
            }
        }
    }
    return bean;
}

2.4.4 动态代理调用类ProxyHandler

 @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        //组装协议
        RpcRequest request = new RpcRequest();
        //设置一个唯一ID,用来回调
        request.setReqId(UUID.randomUUID().toString());
        request.setService(this.service);
        request.setMethod(method.getName());
        request.setParamterType(method.getParameterTypes());
        request.setArgs(args);

        //发起服务调用
        NettyClient nettyClient = new NettyClient();
        nettyClient.start(rpcHost, rpcPort, new MyRpcClientHandler());
        //返回结果
        return nettyClient.sendRequest(request);
    }

2.4.5 NettyClient 公共类

 public Channel channel;

    public void start(String host, int port, RpcHandler rpcHandler) {
        String mapKey = "/" + host + ":" + port;
        if (NettyConstans.clientMap.containsKey(mapKey)) {
            this.channel = NettyConstans.clientMap.get(mapKey);
            return;
        }
        NioEventLoopGroup b1 = new NioEventLoopGroup();
        Bootstrap bs = new Bootstrap()
                .group(b1)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(Channel channel) throws Exception {
                        ChannelPipeline pipeline = channel.pipeline();
                        //这里偷懒就直接用string的编解码了
                        pipeline.addLast(new StringEncoder());
                        pipeline.addLast(new StringDecoder());
                        pipeline.addLast(rpcHandler);
                    }
                });
        try {
         //客户端连接服务端
            ChannelFuture future = bs.connect(host, port).sync();
            future.addListener(listen -> {
                if (listen.isSuccess()) {
                    log.info("connect rpc service success,{}:{}", host, port);
                }
            });
            channel = future.channel();
            //保存为单例
            NettyConstans.clientMap.put(mapKey, channel);
        } catch (Exception e) {
            b1.shutdownGracefully();
            log.error("connect rpc service error,{}:{}", host, port);
        }
    }


    public Object sendRequest(RpcRequest rpcRequest) throws Exception {
        //自定义一个返回结果的回调 保存到单例Map中
        RpcFuture rpcFuture = new RpcFuture<>(
                new DefaultPromise(new DefaultEventLoop()));
        NettyConstans.rpcFutureMap.put(rpcRequest.getReqId(), rpcFuture);

        //消息发送,编解码为string,所以发送的是string
        channel.writeAndFlush(JSONObject.toJSONString(rpcRequest));

        //实际上为阻塞等待回调 由接收消息那里回调 
        //其实还有一个熔断线程处理这些超时或者一直没有回调的
        return rpcFuture.getPromise().get().getContent();
    }

2.4.6 客户端接收消息handler

 //MyRpcClientHandler
    /**
     * 协议 RpcResponse
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        log.info("RpcResponse receive msg:{}", msg);
        RpcResponse response = JSONObject.parseObject(msg, RpcResponse.class);
        //未知的消息直接忽略
        if (response == null || !NettyConstans.rpcFutureMap.containsKey(response.getReqId())) return;
        //给指定的ReqId回调
        NettyConstans.rpcFutureMap.get(response.getReqId()).getPromise().setSuccess(response);
        NettyConstans.rpcFutureMap.remove(response.getReqId());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("连接出现异常,重置连接:{}", ctx.channel().remoteAddress());
        //异常重连 服务端重启之类的
        NettyConstans.clientMap.remove(ctx.channel().remoteAddress().toString());
    }

客户端的代码基本上贴完了,比较复杂,服务端会比较简单,接下来看看服务端的代码

2.5 服务端架构

2.5.2.1bean的初始化回调RpcBeanPostProcessor

 static Map beanMap = new ConcurrentHashMap<>();
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        Class<?> clazz = bean.getClass();
        //只要包含该注解的就报保存到Map中
        if (clazz.isAnnotationPresent(RpcService.class)) {
         //存的是服务发布的接口类名称
            beanMap.put(clazz.getInterfaces()[0].getName(), bean);
            log.info("register rpc service:{}", clazz.getInterfaces()[0].getName());
        }
        return bean;
    }

这里没有往注册中心上发布了,直接以本地Map的形式保存的。主要是为弄懂原理

2.5.2 NettyService初始化

//使用springboot的启动回调开始一个RPC服务
@Override
public void run(String... args) throws Exception {
 //启动代码就不贴了 编解码为String
    NettyService.start(port, new MyRpcHandler());
}

//自定义handler类MyRpcHandler
/**
 * 协议 RpcRequest
 */
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
    log.info("RpcRequest receive msg:{}", msg);

    RpcRequest request = JSONObject.parseObject(msg, RpcRequest.class);
    if (request == null || request.getReqId() == null) return;

    String service = request.getService();
    Object bean = RpcBeanPostProcessor.beanMap.get(service);
    //根据方法名称和参数类型获取类中的方法
    Method method = bean.getClass().getMethod(request.getMethod(), request.getParamterType());
    Object result = method.invoke(bean, request.getArgs());

    //响应协议
    RpcResponse response = new RpcResponse();
    response.setReqId(request.getReqId());
    response.setContent(result);
    //写出 和发送同理
    ctx.writeAndFlush(JSONObject.toJSONString(response));
}

3. RPC测试

分别启动客户端和服务端

3.1 客户端调用

create proxy bean:com.exmaple.demo.api.HellService
create proxy bean:com.exmaple.demo.api.OrderService

//执行http://127.0.0.1:8080/hello?orderId=1234567
connect rpc service success,127.0.0.1:18080
RpcResponse receive msg:{"content":"select order service by orderId: 1234567","reqId":"61a37ef5-6a97-4fe7-9ba9-d8c3a955c8c0"}

3.2 服务端调用日志

start remote service:18080
RpcRequest receive msg:{"args":["1234567"],"method":"getOrder","paramterType":["java.lang.String"],"reqId":"61a37ef5-6a97-4fe7-9ba9-d8c3a955c8c0","service":"com.exmaple.demo.api.OrderService"}
//第二次调用http://127.0.0.1:8080/add?a=4545&b=12日志
RpcRequest receive msg:{"args":[4545,12],"method":"add","paramterType":["int","int"],"reqId":"4f312678-b463-4db9-a861-d8b4b9c9fc4a","service":"com.exmaple.demo.api.HellService"}

4.总结

4.1 关于反射注入

正常应该使用的是FactoryBean的方式注入的,这里只是为了搞懂原理,忽略!

4.2 关于Rpc服务地址

正常的RPC服务,会先从注册中心获取这个服务发布的地址,也就是我们配置中的地址实际上是注册中心的地址 建立连接后,应该会保持心跳,第二次调用不再重新建立连接

4.3 关于阻塞异步回调

实际上还有熔断机制,应该处理掉一直等待的回调

展开阅读全文

页面更新:2024-05-04

标签:注解   初始化   服务端   反射   架构   客户端   接口   原理   协议   消息   代码   通信   精华

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号

Top