SpringBoot集成Netty,实现聊天功能

一、简介

Netty 是一个利用 Java 的高级网络的能力,隐藏其背后的复杂性而提供一个易于使用的 API 客户端/服务器框架。

二、Netty和tomcat有什么区别

Netty和Tomcat最大的区别就在于通信协议,Tomcat是基于Http协议的,他的实质是一个基于http协议的web容器,但是Netty不一样,他能通过编程自定义各种协议,因为netty能够通过code自己来编码/解码字节流,完成类似redis访问的功能,这就是netty和tomcat最大的不同。

有人说netty的性能就一定比tomcat性能高,其实不然,tomcat从6.x开始就支持了nio模式,并且后续还有APR模式——一种通过jni调用apache网络库的模式,相比于旧的bio模式,并发性能得到了很大提高,特别是APR模式,而netty是否比tomcat性能更高,则要取决于netty程序作者的技术实力了。

三、SpringBoot集成Netty

netty版本:4.1.45.Final protobuf版本:3.11.1

1.项目结构


项目地址:https://github.com/yongliangZhang/WorkSpace-Hadoop

2.实现Netty服务端

import com.yongliang.socket.handler.NettyServerHandlerInitializer;import com.yongliang.socket.protobuf.MessageBase;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;import javax.annotation.PreDestroy;import java.net.InetSocketAddress;/** * NettyServer启动服务类 * * @author zhangyongliang * @create 2020-01-17 15:37 **/@Component@Slf4jpublic class NettyServer {    @Value("${netty.port}")    private Integer port;    //线程组用于处理连接工作    private EventLoopGroup boss = new NioEventLoopGroup();    //work 线程组用于数据处理    private EventLoopGroup work = new NioEventLoopGroup();    //向客户端发送消息    public boolean sendClientMsg(Channel channel, MessageBase.Message message) {        if (channel.isActive()) {            ChannelFuture resultFuture = channel.writeAndFlush(message);            resultFuture.addListener((ChannelFutureListener) channelFuture -> log.info("服务端手动发送 Google Protocol 成功={}", message.getContentBytes().toStringUtf8()));            return true;        } else {            log.info("该客户端不在线:{}", message.getRequestIdBytes().toStringUtf8());            return false;        }    }    @PostConstruct    public void start() throws InterruptedException {        ServerBootstrap bootstrap = new ServerBootstrap();        bootstrap.group(boss, work)                //指定channel                .channel(NioServerSocketChannel.class)                //使用指定的端口设置套接字地址                .localAddress(new InetSocketAddress(port))                //服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数                .option(ChannelOption.SO_BACKLOG, 1024)                //设置TCP长连接,一般如果两个小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文                .childOption(ChannelOption.TCP_NODELAY, true)                //设置为长连接                .childOption(ChannelOption.SO_KEEPALIVE, true)                //用于操作接收缓冲区和发送缓冲区                .childOption(ChannelOption.SO_RCVBUF, 256)                //用于操作接收缓冲区和发送缓冲区                .childOption(ChannelOption.SO_SNDBUF, 256)                //将小的数据包包装成更大的帧进行传送,提高网络的负载                .childHandler(new NettyServerHandlerInitializer());        ChannelFuture future = bootstrap.bind().sync();        if (future.isSuccess()) {            log.info("启动 Netty Server,netty端口为:" + port);        }    }    @PreDestroy    public void destory() throws InterruptedException {        boss.shutdownGracefully().sync();        work.shutdownGracefully().sync();        log.info("关闭Netty");    }}

在SpringBoot 项目中使用 Netty ,所以我们将Netty 服务器的启动封装在一个 start()方法,并使用 @PostConstruct注解,在指定的方法上加上 @PostConstruct注解来表示该方法在 Spring 初始化 NettyServer类后调用

3.实现Netty的Handler初始化逻辑

import com.yongliang.socket.protobuf.MessageBase;import io.netty.channel.Channel;import io.netty.channel.ChannelInitializer;import io.netty.channel.group.ChannelGroup;import io.netty.handler.codec.http.HttpObjectAggregator;import io.netty.handler.codec.http.HttpServerCodec;import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;import io.netty.handler.codec.protobuf.ProtobufDecoder;import io.netty.handler.codec.protobuf.ProtobufEncoder;import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;import io.netty.handler.logging.LogLevel;import io.netty.handler.logging.LoggingHandler;import io.netty.handler.stream.ChunkedWriteHandler;import lombok.extern.slf4j.Slf4j;/** * netty业务消息处理类 * * @author zhangyongliang * @create 2020-01-17 15:44 **/@Slf4jpublic class NettyServerHandlerInitializer extends ChannelInitializer {    @Override    protected void initChannel(Channel channel) throws Exception {        channel.pipeline()                //空闲检测                .addLast(new ServerIdleStateHandler())                //增加日志信息                .addLast(new LoggingHandler(LogLevel.INFO))                .addLast(new ProtobufVarint32FrameDecoder())                .addLast(new ProtobufDecoder(MessageBase.Message.getDefaultInstance()))                .addLast(new ProtobufVarint32LengthFieldPrepender())                .addLast(new ProtobufEncoder())                .addLast(new NettyServerHandler());    }}

此类增加来了服务端空闲检测,Protobuf编解码,半包处理以及服务端业务逻辑处理。

具体Protobuf可以参考博客: https://www.cnblogs.com/asminfo/p/6782906.html

4.netty 业务逻辑实现类

import cn.hutool.core.date.DateUtil;import com.yongliang.socket.protobuf.MessageBase;import com.yongliang.socket.protobuf.message.HeartbeatResponsePacket;import com.yongliang.socket.utils.ChannelMapUtil;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import lombok.extern.slf4j.Slf4j;import java.net.InetSocketAddress;/** * 服务处理类 * * @author zhangyongliang * @create 2020-01-20 10:21 **/@Slf4j@ChannelHandler.Sharablepublic class NettyServerHandler extends SimpleChannelInboundHandler {    @Override    protected void channelRead0(ChannelHandlerContext channelHandlerContext, MessageBase.Message message) throws Exception {        //实现Channel统一管理和服务端定向向客户端发送消息        String hosCode = message.getRequestIdBytes().toStringUtf8();        if (ChannelMapUtil.getChannelByName(hosCode) == null) {//            log.info("客户端加入了:{}", message.getRequestIdBytes().toStringUtf8());            ChannelMapUtil.addChannel(hosCode, channelHandlerContext);        }        if (message.getCmd().equals(MessageBase.Message.CommandType.HEARTBEAT_REQUEST)) {//            log.info("收到客户端发来的心跳消息:{}", message.toString());            channelHandlerContext.writeAndFlush(new HeartbeatResponsePacket());        } else if (message.getCmd().equals(MessageBase.Message.CommandType.NORMAL)) {            //接收的消息为普通消息            MessageBase.Message result = new MessageBase.Message()                    .toBuilder().setCmd(MessageBase.Message.CommandType.NORMAL)                    .setContent("这是结果消息:" + DateUtil.date())                    .setRequestId("LZ20151203093957").build();            channelHandlerContext.writeAndFlush(result);            log.info("收到客户端的业务消息:{}", message.getRequestIdBytes().toStringUtf8() + "-" + message.getContentBytes().toStringUtf8());        }    }    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();        String clientIp = insocket.getAddress().getHostAddress();        log.info("收到客户端连接IP:{}", clientIp);    }    @Override    public void channelInactive(ChannelHandlerContext ctx) throws Exception {        ChannelMapUtil.removeChannelContext(ctx);    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        log.error("Netty-Server捕获的异常:{}", cause.getMessage());        ctx.channel().close();    }}

5.实现客户端

import com.yongliang.socket.handler.ClientHandlerInitilizer;import com.yongliang.socket.protobuf.MessageBase;import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelFutureListener;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;import java.util.concurrent.TimeUnit;/** * netty客户端 * * @author zhangyongliang * @create 2020-01-20 10:28 **/@Component@Slf4jpublic class NettyClient {    private EventLoopGroup group = new NioEventLoopGroup();    @Value("${netty.port}")    private int port;    @Value("${netty.host}")    private String host;    private SocketChannel socketChannel;    public void sendMsg(MessageBase.Message message) {        socketChannel.writeAndFlush(message);    }    @PostConstruct    public void start() {        Bootstrap bootstrap = new Bootstrap();        bootstrap.group(group)                .channel(NioSocketChannel.class)                .remoteAddress(host, port)                .handler(new ClientHandlerInitilizer());        ChannelFuture future = bootstrap.connect();        //客户端断线重连        future.addListener((ChannelFutureListener) channelFuture -> {            if (channelFuture.isSuccess()) {                log.info("连接Netty服务端成功");            } else {                log.info("连接失败,进行断线重连");                channelFuture.channel().eventLoop().schedule(() -> start(), 10, TimeUnit.SECONDS);            }        });        socketChannel = (SocketChannel) future.channel();    }}

6.实现客户端初始化配置

import com.yongliang.socket.protobuf.MessageBase;import io.netty.channel.Channel;import io.netty.channel.ChannelInitializer;import io.netty.handler.codec.protobuf.ProtobufDecoder;import io.netty.handler.codec.protobuf.ProtobufEncoder;import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;import io.netty.handler.timeout.IdleStateHandler;import java.util.concurrent.TimeUnit;/** * 客户端处理类 * * @author zhangyongliang * @create 2020-01-20 10:44 **/public class ClientHandlerInitilizer extends ChannelInitializer {    @Override    protected void initChannel(Channel channel) throws Exception {        channel.pipeline()                .addLast(new IdleStateHandler(0, 10, 10, TimeUnit.SECONDS))                .addLast(new ProtobufVarint32FrameDecoder())                .addLast(new ProtobufDecoder(MessageBase.Message.getDefaultInstance()))                .addLast(new ProtobufVarint32LengthFieldPrepender())                .addLast(new ProtobufEncoder())                .addLast(new HeartbeatHandler())                .addLast(new NettyClientHandler());    }}

此处配置类初始化,实现了心跳监测。

心跳是在TCP长连接中,客户端与服务端之间定期发送的一种特殊的数据包,通知对方在线以确保TCP连接的有效性。

有两种方式实现心跳机制:

7.实现客户端心跳检测实现类

import com.yongliang.socket.client.NettyClient;import com.yongliang.socket.protobuf.MessageBase;import com.yongliang.socket.utils.SpringUtil;import io.netty.channel.ChannelFutureListener;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.channel.EventLoop;import io.netty.handler.timeout.IdleState;import io.netty.handler.timeout.IdleStateEvent;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import java.util.concurrent.TimeUnit;/** * @author zhangyongliang * @create 2018-10-25 17:15 */@Slf4j@Componentpublic class HeartbeatHandler extends ChannelInboundHandlerAdapter {    @Autowired    private NettyClient nettyClient;    @Override    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {        if (evt instanceof IdleStateEvent) {            IdleStateEvent idleStateEvent = (IdleStateEvent) evt;            if (idleStateEvent.state() == IdleState.READER_IDLE) {                log.info("长期未收到服务器推送信息,进行重连");                if(nettyClient==null){                    nettyClient=SpringUtil.getBean(NettyClient.class);                }                EventLoop eventLoop = ctx.channel().eventLoop();                eventLoop.schedule(() -> nettyClient.start(), 5L, TimeUnit.SECONDS);            }            if (idleStateEvent.state() == IdleState.WRITER_IDLE) {                log.info("已经10s没有发送消息给服务端");                //向服务端送心跳包                MessageBase.Message heartbeat = new MessageBase.Message().toBuilder().setCmd(MessageBase.Message.CommandType.HEARTBEAT_REQUEST)                        .setRequestId("LZ20151203093957")                        .setContent("heartbeat").build();                //发送心跳消息,并在发送失败时关闭该连接                ctx.writeAndFlush(heartbeat).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);            }        } else {            super.userEventTriggered(ctx, evt);        }    }    @Override    public void channelInactive(ChannelHandlerContext ctx) throws Exception {        //如果运行过程中服务端挂了,执行重连机制        EventLoop eventLoop = ctx.channel().eventLoop();        log.info("正在进行服务端重连操作...");        if(nettyClient==null){            nettyClient=SpringUtil.getBean(NettyClient.class);        }        eventLoop.schedule(() -> nettyClient.start(), 5L, TimeUnit.SECONDS);        super.channelInactive(ctx);    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        log.error("捕获的异常:{}", cause.getMessage());        ctx.channel().close();    }}

TIPS: 注入netty客户端发现,netty客户端实例为null的情况,因此采用了SpirngUtil工具类手动获取了nettyClient的上下文实例。

我们这里创建了一个ChannelHandler类并重写了userEventTriggered方法,在该方法里实现发送心跳数据包的逻辑,同时将 IdleStateEvent类加入逻辑处理链上。

实际上是当连接空闲时间太长时,将会触发一个 IdleStateEvent事件,然后我们调用 userEventTriggered来处理该 IdleStateEvent事件。

上面我们只讨论了客户端发送心跳消息给服务端,那么服务端还需要发心跳消息给客户端吗?

一般情况是,对于长连接而言,一种方案是两边都发送心跳消息,另一种是服务端作为被动接收一方,如果一段时间内服务端没有收到心跳包那么就直接断开连接。

我们这里采用第二种方案,只需要客户端发送心跳消息,然后服务端被动接收,然后设置一段时间,在这段时间内如果服务端没有收到任何消息,那么就主动断开连接,这也就是后面要说的 空闲检测。

8.客户端断线重连

一般有以下两种情况,Netty 客户端需要重连服务端:

第一种情况实现 ChannelFutureListener用来监测连接是否成功,不成功就进行断连重试机制,代码如下:


hannelFuture添加一个监听器,如果客户端连接服务端失败,调用 channel().eventLoop().schedule()方法执行重试逻辑。

第二种情况是运行过程中 服务端突然挂掉了,这种情况我们在处理数据读写的Handler中实现,代码如下:

我们这里直接在实现心跳机制的 Handler中重写channelInactive方法,然后在该方法中执行重试逻辑,这里注入了 NettyClient类,目的是方便调用 NettyClient的start()方法重新连接服务端

channelInactive()方法是指如果当前Channel没有连接到远程节点,那么该方法将会被调用。

9.服务端空闲检测

空闲检测是什么?实际上空闲检测是每隔一段时间,检测这段时间内是否有数据读写。比如,服务端检测一段时间内,是否收到客户端发送来的数据,如果没有,就及时释放资源,关闭连接。

对于空闲检测,Netty 特地提供了 IdleStateHandler 来实现这个功能。

import io.netty.channel.ChannelHandlerContext;import io.netty.handler.timeout.IdleStateEvent;import io.netty.handler.timeout.IdleStateHandler;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.TimeUnit;/** * 空闲检测 * * @author zhangyongliang * @create 2018-10-25 16:21 */@Slf4jpublic class ServerIdleStateHandler extends IdleStateHandler {    /**     * 设置空闲检测时间为 30s     */    private static final int READER_IDLE_TIME = 30;    public ServerIdleStateHandler() {        super(READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS);    }    @Override    protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {        log.info("{} 秒内没有读取到数据,关闭连接", READER_IDLE_TIME);        ctx.channel().close();    }}

10.服务端向特定客户端推送消息

当需要服务端给特定客户端连接推送消息时,需要我们找到特定的channel并推送信息,因此需要全局管理channel,并取出特定channel,写回消息给客户端。

服务端实现类代码如下:

当客户端连接关闭时,要及时移除释放的channel

将channel管理抽象成一个工具类,完整代码如下:

/** * ChannelMap管理类 * * @author zhangyongliang * @create 2020-01-21 10:04 **/public class ChannelMapUtil {    private static int channelNum = 0;    private static ConcurrentHashMap channelHashMap = null;    public static ConcurrentHashMap getChannelHashMap() {        return channelHashMap;    }    //获取特定channel    public static ChannelHandlerContext getChannelByName(String name) {        if (CollectionUtil.isEmpty(channelHashMap)) {            return null;        }        return channelHashMap.get(name);    }    //增加Channel    public static void addChannel(String name, ChannelHandlerContext channelContext) {        if (channelHashMap == null) {            channelHashMap = new ConcurrentHashMap<>(200);        }        channelHashMap.put(name, channelContext);        channelNum++;    }    //移除channel    public static int removeChannelByName(String name) {        if (CollectionUtil.isNotEmpty(channelHashMap) && channelHashMap.containsKey(name)) {            channelHashMap.remove(name);            return 0;        } else {            return -1;        }    }    //根据channelContext移除channel    public static int removeChannelContext(ChannelHandlerContext ctx) {        if (CollectionUtil.isNotEmpty(channelHashMap)) {            for (Map.Entry channelInfo : channelHashMap.entrySet()) {                String hosCodeKey = channelInfo.getKey();                ChannelHandlerContext channelResult = channelInfo.getValue();                if (channelResult.equals(ctx)) {                    channelHashMap.remove(hosCodeKey);                    return 0;                }            }        }        return -1;    }}

四、controller层调用示例

1.客户端发送消息

import com.yongliang.socket.client.NettyClient;import com.yongliang.socket.protobuf.MessageBase;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;/** * @author zhangyongliang * @create 2020-01-20 11:35 **/@RestControllerpublic class ConsumerController {    @Autowired    private NettyClient nettyClient;    @GetMapping("/send")    public String send() {        MessageBase.Message message = new MessageBase.Message()                .toBuilder().setCmd(MessageBase.Message.CommandType.NORMAL)                .setContent("中华人民共和国")                .setRequestId("LZ20151203093957").build();        nettyClient.sendMsg(message);        return "send ok";    }}

2.服务端发送特定消息给客户端

/** * 服务端控制类 * * @author zhangyongliang * @create 2020-01-21 10:37 **/@RestControllerpublic class ProducerController {    @Autowired    public NettyServer nettyServer;    @GetMapping("/sendClient")    public String sendClientMsg() {        MessageBase.Message message = new MessageBase.Message()                .toBuilder().setCmd(MessageBase.Message.CommandType.NORMAL)                .setContent("这是服务端发送给客户端的消息:" + DateUtil.now())                .setRequestId("LZ20151203093957").build();        ChannelHandlerContext handlerContext = ChannelMapUtil.getChannelByName("LZ20151203093957");        if (handlerContext != null) {            boolean sendFlag = nettyServer.sendClientMsg(handlerContext.channel(), message);            return "send ok";        }        return "send Failed";    }}

五、多Handler处理

创建多个handler的方式有很多,最标准的做法是这样的,你需要创建个继承ChannelInitializer的类,当然,这个ChannelInitializer其实也是一个handler,下面是一个演示我自己创建了三个handler的示例:

public class MyServerHandler0 extends ChannelInitializer {@Overrideprotected void initChannel(Channel ch) throws Exception {ch.pipeline().addLast(new MyServerHandler1());ch.pipeline().addLast(new MyServerHandler2());ch.pipeline().addLast(new MyServerHandler3());}}

handler添加完毕之后,当触发第一个handler的事件之后,并不是自动触发第二个handler的相同事件,而是需要手动指定事件,比如下面的代码,演示了触发第一个handler的read事件之后,再触发下一个handler的active事件。

@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("触发了读事件");ctx.fireChannelActive();}

其中context中,方法以fire开头的都是inbound事件,也就是输入事件 综上所述,下面是一个服务器端接收到一个字节的时候,触发三个handler的完整事例,触发三个handler+一个配置handler四个handler,其中MyServerHandler0已经在上述例子中,下面是另外三个

public class MyServerHandler1 implements ChannelInboundHandler {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("MyServerHandler1");ctx.fireChannelRead(msg);}//省略其他Override方法}
public class MyServerHandler2 implements ChannelInboundHandler {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("MyServerHandler2");ctx.fireChannelRead(msg);}//省略其他Override方法
public class MyServerHandler3 implements ChannelInboundHandler {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("MyServerHandler3");ctx.fireChannelRead(msg);}//省略其他Override方法

结束语

至此,你已经搞定了服务端的通信,如此简单快尝试一下吧!

展开阅读全文

页面更新:2024-05-10

标签:缓冲区   断线   服务端   客户端   逻辑   机制   消息   事件   代码   功能   方法

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号

Top