Netty 是一个利用 Java 的高级网络的能力,隐藏其背后的复杂性而提供一个易于使用的 API 客户端/服务器框架。
Netty和Tomcat最大的区别就在于通信协议,Tomcat是基于Http协议的,他的实质是一个基于http协议的web容器,但是Netty不一样,他能通过编程自定义各种协议,因为netty能够通过code自己来编码/解码字节流,完成类似redis访问的功能,这就是netty和tomcat最大的不同。
有人说netty的性能就一定比tomcat性能高,其实不然,tomcat从6.x开始就支持了nio模式,并且后续还有APR模式——一种通过jni调用apache网络库的模式,相比于旧的bio模式,并发性能得到了很大提高,特别是APR模式,而netty是否比tomcat性能更高,则要取决于netty程序作者的技术实力了。
netty版本:4.1.45.Final protobuf版本:3.11.1
项目地址:https://github.com/yongliangZhang/WorkSpace-Hadoop
import com.yongliang.socket.handler.NettyServerHandlerInitializer;import com.yongliang.socket.protobuf.MessageBase;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;import javax.annotation.PreDestroy;import java.net.InetSocketAddress;/** * NettyServer启动服务类 * * @author zhangyongliang * @create 2020-01-17 15:37 **/@Component@Slf4jpublic class NettyServer { @Value("${netty.port}") private Integer port; //线程组用于处理连接工作 private EventLoopGroup boss = new NioEventLoopGroup(); //work 线程组用于数据处理 private EventLoopGroup work = new NioEventLoopGroup(); //向客户端发送消息 public boolean sendClientMsg(Channel channel, MessageBase.Message message) { if (channel.isActive()) { ChannelFuture resultFuture = channel.writeAndFlush(message); resultFuture.addListener((ChannelFutureListener) channelFuture -> log.info("服务端手动发送 Google Protocol 成功={}", message.getContentBytes().toStringUtf8())); return true; } else { log.info("该客户端不在线:{}", message.getRequestIdBytes().toStringUtf8()); return false; } } @PostConstruct public void start() throws InterruptedException { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boss, work) //指定channel .channel(NioServerSocketChannel.class) //使用指定的端口设置套接字地址 .localAddress(new InetSocketAddress(port)) //服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数 .option(ChannelOption.SO_BACKLOG, 1024) //设置TCP长连接,一般如果两个小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文 .childOption(ChannelOption.TCP_NODELAY, true) //设置为长连接 .childOption(ChannelOption.SO_KEEPALIVE, true) //用于操作接收缓冲区和发送缓冲区 .childOption(ChannelOption.SO_RCVBUF, 256) //用于操作接收缓冲区和发送缓冲区 .childOption(ChannelOption.SO_SNDBUF, 256) //将小的数据包包装成更大的帧进行传送,提高网络的负载 .childHandler(new NettyServerHandlerInitializer()); ChannelFuture future = bootstrap.bind().sync(); if (future.isSuccess()) { log.info("启动 Netty Server,netty端口为:" + port); } } @PreDestroy public void destory() throws InterruptedException { boss.shutdownGracefully().sync(); work.shutdownGracefully().sync(); log.info("关闭Netty"); }}
在SpringBoot 项目中使用 Netty ,所以我们将Netty 服务器的启动封装在一个 start()方法,并使用 @PostConstruct注解,在指定的方法上加上 @PostConstruct注解来表示该方法在 Spring 初始化 NettyServer类后调用
import com.yongliang.socket.protobuf.MessageBase;import io.netty.channel.Channel;import io.netty.channel.ChannelInitializer;import io.netty.channel.group.ChannelGroup;import io.netty.handler.codec.http.HttpObjectAggregator;import io.netty.handler.codec.http.HttpServerCodec;import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;import io.netty.handler.codec.protobuf.ProtobufDecoder;import io.netty.handler.codec.protobuf.ProtobufEncoder;import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;import io.netty.handler.logging.LogLevel;import io.netty.handler.logging.LoggingHandler;import io.netty.handler.stream.ChunkedWriteHandler;import lombok.extern.slf4j.Slf4j;/** * netty业务消息处理类 * * @author zhangyongliang * @create 2020-01-17 15:44 **/@Slf4jpublic class NettyServerHandlerInitializer extends ChannelInitializer { @Override protected void initChannel(Channel channel) throws Exception { channel.pipeline() //空闲检测 .addLast(new ServerIdleStateHandler()) //增加日志信息 .addLast(new LoggingHandler(LogLevel.INFO)) .addLast(new ProtobufVarint32FrameDecoder()) .addLast(new ProtobufDecoder(MessageBase.Message.getDefaultInstance())) .addLast(new ProtobufVarint32LengthFieldPrepender()) .addLast(new ProtobufEncoder()) .addLast(new NettyServerHandler()); }}
此类增加来了服务端空闲检测,Protobuf编解码,半包处理以及服务端业务逻辑处理。
具体Protobuf可以参考博客: https://www.cnblogs.com/asminfo/p/6782906.html
import cn.hutool.core.date.DateUtil;import com.yongliang.socket.protobuf.MessageBase;import com.yongliang.socket.protobuf.message.HeartbeatResponsePacket;import com.yongliang.socket.utils.ChannelMapUtil;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import lombok.extern.slf4j.Slf4j;import java.net.InetSocketAddress;/** * 服务处理类 * * @author zhangyongliang * @create 2020-01-20 10:21 **/@Slf4j@ChannelHandler.Sharablepublic class NettyServerHandler extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, MessageBase.Message message) throws Exception { //实现Channel统一管理和服务端定向向客户端发送消息 String hosCode = message.getRequestIdBytes().toStringUtf8(); if (ChannelMapUtil.getChannelByName(hosCode) == null) {// log.info("客户端加入了:{}", message.getRequestIdBytes().toStringUtf8()); ChannelMapUtil.addChannel(hosCode, channelHandlerContext); } if (message.getCmd().equals(MessageBase.Message.CommandType.HEARTBEAT_REQUEST)) {// log.info("收到客户端发来的心跳消息:{}", message.toString()); channelHandlerContext.writeAndFlush(new HeartbeatResponsePacket()); } else if (message.getCmd().equals(MessageBase.Message.CommandType.NORMAL)) { //接收的消息为普通消息 MessageBase.Message result = new MessageBase.Message() .toBuilder().setCmd(MessageBase.Message.CommandType.NORMAL) .setContent("这是结果消息:" + DateUtil.date()) .setRequestId("LZ20151203093957").build(); channelHandlerContext.writeAndFlush(result); log.info("收到客户端的业务消息:{}", message.getRequestIdBytes().toStringUtf8() + "-" + message.getContentBytes().toStringUtf8()); } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = insocket.getAddress().getHostAddress(); log.info("收到客户端连接IP:{}", clientIp); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { ChannelMapUtil.removeChannelContext(ctx); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("Netty-Server捕获的异常:{}", cause.getMessage()); ctx.channel().close(); }}
import com.yongliang.socket.handler.ClientHandlerInitilizer;import com.yongliang.socket.protobuf.MessageBase;import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelFutureListener;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;import java.util.concurrent.TimeUnit;/** * netty客户端 * * @author zhangyongliang * @create 2020-01-20 10:28 **/@Component@Slf4jpublic class NettyClient { private EventLoopGroup group = new NioEventLoopGroup(); @Value("${netty.port}") private int port; @Value("${netty.host}") private String host; private SocketChannel socketChannel; public void sendMsg(MessageBase.Message message) { socketChannel.writeAndFlush(message); } @PostConstruct public void start() { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .remoteAddress(host, port) .handler(new ClientHandlerInitilizer()); ChannelFuture future = bootstrap.connect(); //客户端断线重连 future.addListener((ChannelFutureListener) channelFuture -> { if (channelFuture.isSuccess()) { log.info("连接Netty服务端成功"); } else { log.info("连接失败,进行断线重连"); channelFuture.channel().eventLoop().schedule(() -> start(), 10, TimeUnit.SECONDS); } }); socketChannel = (SocketChannel) future.channel(); }}
import com.yongliang.socket.protobuf.MessageBase;import io.netty.channel.Channel;import io.netty.channel.ChannelInitializer;import io.netty.handler.codec.protobuf.ProtobufDecoder;import io.netty.handler.codec.protobuf.ProtobufEncoder;import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;import io.netty.handler.timeout.IdleStateHandler;import java.util.concurrent.TimeUnit;/** * 客户端处理类 * * @author zhangyongliang * @create 2020-01-20 10:44 **/public class ClientHandlerInitilizer extends ChannelInitializer { @Override protected void initChannel(Channel channel) throws Exception { channel.pipeline() .addLast(new IdleStateHandler(0, 10, 10, TimeUnit.SECONDS)) .addLast(new ProtobufVarint32FrameDecoder()) .addLast(new ProtobufDecoder(MessageBase.Message.getDefaultInstance())) .addLast(new ProtobufVarint32LengthFieldPrepender()) .addLast(new ProtobufEncoder()) .addLast(new HeartbeatHandler()) .addLast(new NettyClientHandler()); }}
此处配置类初始化,实现了心跳监测。
心跳是在TCP长连接中,客户端与服务端之间定期发送的一种特殊的数据包,通知对方在线以确保TCP连接的有效性。
有两种方式实现心跳机制:
import com.yongliang.socket.client.NettyClient;import com.yongliang.socket.protobuf.MessageBase;import com.yongliang.socket.utils.SpringUtil;import io.netty.channel.ChannelFutureListener;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.channel.EventLoop;import io.netty.handler.timeout.IdleState;import io.netty.handler.timeout.IdleStateEvent;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import java.util.concurrent.TimeUnit;/** * @author zhangyongliang * @create 2018-10-25 17:15 */@Slf4j@Componentpublic class HeartbeatHandler extends ChannelInboundHandlerAdapter { @Autowired private NettyClient nettyClient; @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent idleStateEvent = (IdleStateEvent) evt; if (idleStateEvent.state() == IdleState.READER_IDLE) { log.info("长期未收到服务器推送信息,进行重连"); if(nettyClient==null){ nettyClient=SpringUtil.getBean(NettyClient.class); } EventLoop eventLoop = ctx.channel().eventLoop(); eventLoop.schedule(() -> nettyClient.start(), 5L, TimeUnit.SECONDS); } if (idleStateEvent.state() == IdleState.WRITER_IDLE) { log.info("已经10s没有发送消息给服务端"); //向服务端送心跳包 MessageBase.Message heartbeat = new MessageBase.Message().toBuilder().setCmd(MessageBase.Message.CommandType.HEARTBEAT_REQUEST) .setRequestId("LZ20151203093957") .setContent("heartbeat").build(); //发送心跳消息,并在发送失败时关闭该连接 ctx.writeAndFlush(heartbeat).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } } else { super.userEventTriggered(ctx, evt); } } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { //如果运行过程中服务端挂了,执行重连机制 EventLoop eventLoop = ctx.channel().eventLoop(); log.info("正在进行服务端重连操作..."); if(nettyClient==null){ nettyClient=SpringUtil.getBean(NettyClient.class); } eventLoop.schedule(() -> nettyClient.start(), 5L, TimeUnit.SECONDS); super.channelInactive(ctx); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("捕获的异常:{}", cause.getMessage()); ctx.channel().close(); }}
TIPS: 注入netty客户端发现,netty客户端实例为null的情况,因此采用了SpirngUtil工具类手动获取了nettyClient的上下文实例。
我们这里创建了一个ChannelHandler类并重写了userEventTriggered方法,在该方法里实现发送心跳数据包的逻辑,同时将 IdleStateEvent类加入逻辑处理链上。
实际上是当连接空闲时间太长时,将会触发一个 IdleStateEvent事件,然后我们调用 userEventTriggered来处理该 IdleStateEvent事件。
上面我们只讨论了客户端发送心跳消息给服务端,那么服务端还需要发心跳消息给客户端吗?
一般情况是,对于长连接而言,一种方案是两边都发送心跳消息,另一种是服务端作为被动接收一方,如果一段时间内服务端没有收到心跳包那么就直接断开连接。
我们这里采用第二种方案,只需要客户端发送心跳消息,然后服务端被动接收,然后设置一段时间,在这段时间内如果服务端没有收到任何消息,那么就主动断开连接,这也就是后面要说的 空闲检测。
一般有以下两种情况,Netty 客户端需要重连服务端:
第一种情况实现 ChannelFutureListener用来监测连接是否成功,不成功就进行断连重试机制,代码如下:
hannelFuture添加一个监听器,如果客户端连接服务端失败,调用 channel().eventLoop().schedule()方法执行重试逻辑。
第二种情况是运行过程中 服务端突然挂掉了,这种情况我们在处理数据读写的Handler中实现,代码如下:
我们这里直接在实现心跳机制的 Handler中重写channelInactive方法,然后在该方法中执行重试逻辑,这里注入了 NettyClient类,目的是方便调用 NettyClient的start()方法重新连接服务端
channelInactive()方法是指如果当前Channel没有连接到远程节点,那么该方法将会被调用。
空闲检测是什么?实际上空闲检测是每隔一段时间,检测这段时间内是否有数据读写。比如,服务端检测一段时间内,是否收到客户端发送来的数据,如果没有,就及时释放资源,关闭连接。
对于空闲检测,Netty 特地提供了 IdleStateHandler 来实现这个功能。
import io.netty.channel.ChannelHandlerContext;import io.netty.handler.timeout.IdleStateEvent;import io.netty.handler.timeout.IdleStateHandler;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.TimeUnit;/** * 空闲检测 * * @author zhangyongliang * @create 2018-10-25 16:21 */@Slf4jpublic class ServerIdleStateHandler extends IdleStateHandler { /** * 设置空闲检测时间为 30s */ private static final int READER_IDLE_TIME = 30; public ServerIdleStateHandler() { super(READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS); } @Override protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception { log.info("{} 秒内没有读取到数据,关闭连接", READER_IDLE_TIME); ctx.channel().close(); }}
当需要服务端给特定客户端连接推送消息时,需要我们找到特定的channel并推送信息,因此需要全局管理channel,并取出特定channel,写回消息给客户端。
服务端实现类代码如下:
当客户端连接关闭时,要及时移除释放的channel
将channel管理抽象成一个工具类,完整代码如下:
/** * ChannelMap管理类 * * @author zhangyongliang * @create 2020-01-21 10:04 **/public class ChannelMapUtil { private static int channelNum = 0; private static ConcurrentHashMap channelHashMap = null; public static ConcurrentHashMap getChannelHashMap() { return channelHashMap; } //获取特定channel public static ChannelHandlerContext getChannelByName(String name) { if (CollectionUtil.isEmpty(channelHashMap)) { return null; } return channelHashMap.get(name); } //增加Channel public static void addChannel(String name, ChannelHandlerContext channelContext) { if (channelHashMap == null) { channelHashMap = new ConcurrentHashMap<>(200); } channelHashMap.put(name, channelContext); channelNum++; } //移除channel public static int removeChannelByName(String name) { if (CollectionUtil.isNotEmpty(channelHashMap) && channelHashMap.containsKey(name)) { channelHashMap.remove(name); return 0; } else { return -1; } } //根据channelContext移除channel public static int removeChannelContext(ChannelHandlerContext ctx) { if (CollectionUtil.isNotEmpty(channelHashMap)) { for (Map.Entry channelInfo : channelHashMap.entrySet()) { String hosCodeKey = channelInfo.getKey(); ChannelHandlerContext channelResult = channelInfo.getValue(); if (channelResult.equals(ctx)) { channelHashMap.remove(hosCodeKey); return 0; } } } return -1; }}
import com.yongliang.socket.client.NettyClient;import com.yongliang.socket.protobuf.MessageBase;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;/** * @author zhangyongliang * @create 2020-01-20 11:35 **/@RestControllerpublic class ConsumerController { @Autowired private NettyClient nettyClient; @GetMapping("/send") public String send() { MessageBase.Message message = new MessageBase.Message() .toBuilder().setCmd(MessageBase.Message.CommandType.NORMAL) .setContent("中华人民共和国") .setRequestId("LZ20151203093957").build(); nettyClient.sendMsg(message); return "send ok"; }}
/** * 服务端控制类 * * @author zhangyongliang * @create 2020-01-21 10:37 **/@RestControllerpublic class ProducerController { @Autowired public NettyServer nettyServer; @GetMapping("/sendClient") public String sendClientMsg() { MessageBase.Message message = new MessageBase.Message() .toBuilder().setCmd(MessageBase.Message.CommandType.NORMAL) .setContent("这是服务端发送给客户端的消息:" + DateUtil.now()) .setRequestId("LZ20151203093957").build(); ChannelHandlerContext handlerContext = ChannelMapUtil.getChannelByName("LZ20151203093957"); if (handlerContext != null) { boolean sendFlag = nettyServer.sendClientMsg(handlerContext.channel(), message); return "send ok"; } return "send Failed"; }}
创建多个handler的方式有很多,最标准的做法是这样的,你需要创建个继承ChannelInitializer的类,当然,这个ChannelInitializer其实也是一个handler,下面是一个演示我自己创建了三个handler的示例:
public class MyServerHandler0 extends ChannelInitializer {@Overrideprotected void initChannel(Channel ch) throws Exception {ch.pipeline().addLast(new MyServerHandler1());ch.pipeline().addLast(new MyServerHandler2());ch.pipeline().addLast(new MyServerHandler3());}}
handler添加完毕之后,当触发第一个handler的事件之后,并不是自动触发第二个handler的相同事件,而是需要手动指定事件,比如下面的代码,演示了触发第一个handler的read事件之后,再触发下一个handler的active事件。
@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("触发了读事件");ctx.fireChannelActive();}
其中context中,方法以fire开头的都是inbound事件,也就是输入事件 综上所述,下面是一个服务器端接收到一个字节的时候,触发三个handler的完整事例,触发三个handler+一个配置handler四个handler,其中MyServerHandler0已经在上述例子中,下面是另外三个
public class MyServerHandler1 implements ChannelInboundHandler {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("MyServerHandler1");ctx.fireChannelRead(msg);}//省略其他Override方法}
public class MyServerHandler2 implements ChannelInboundHandler {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("MyServerHandler2");ctx.fireChannelRead(msg);}//省略其他Override方法
public class MyServerHandler3 implements ChannelInboundHandler {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("MyServerHandler3");ctx.fireChannelRead(msg);}//省略其他Override方法
至此,你已经搞定了服务端的通信,如此简单快尝试一下吧!
页面更新:2024-05-10
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号