本文通过一个实际的场景来介绍在前后端分离的项目中通过 WebSocket 来实现服务器端主动向客户端发送消息的应用。主要内容如下
Websocket 是一种在单个 TCP 连接上进行全双工通信的协议。WebSocket 连接成功后,服务端与客户端可以双向通信。在需要消息推送的场景,Websocket 相对于轮询能更好的节省服务器资源和带宽,并且能够更实时地进行通讯。
具体如下特点
在客户端的列表数据中有个 status 字段,服务器端需要花费较长的时间进行处理,处理完成后才会更新对应数据的 status 字段值,通过 Websocket 的处理流程如下:
org.springframework.boot
spring-boot-starter-websocket
通过注入 ServerEndpointExporter 类,用于在项目启动的时候自动将使用了 @ServerEndpoint 注解声明的 Websocket endpoint 注册到 WebSocketContainer 中。
package com.ckjava.config;
/**
* Function:
*
* @author chenkui 2022/4/6 17:55
*/
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
public class WebSocketConfig {
/**
* 注入 ServerEndpointExporter,
* 这个 bean 会自动注册使用了 @ServerEndpoint 注解声明的 Websocket endpoint
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
package com.ckjava.websocket;
import com.ckjava.xutils.JsonUtils;
import com.ckjava.entity.TSysPubEntity;
import com.ckjava.service.TSysPubService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
* Function:
*
* @author chenkui 2022/4/6 18:00
*/
@Slf4j
@Component
@ServerEndpoint("/websocket/{pubId}")
public class PubStatusWS {
/**
* 每个数据对应一个 session
*/
private static final ConcurrentHashMap sessionPool = new ConcurrentHashMap<>();
/**
* 定时任务的线程池
*/
private static final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
@Resource
private TSysPubService pubService;
/**
* 定时从数据库中获取数据的最新状态,然后返回给前端
*/
@PostConstruct
public void init() {
scheduledExecutorService.scheduleAtFixedRate(() -> {
sessionPool.forEach((pubId, session) -> {
final TSysPubEntity entity = pubService.findById(pubId);
// 返回最新的状态
session.getAsyncRemote().sendText(JsonUtils.toJSONString(entity));
log.debug(String.format("websocket消息 server send to pubId %s, text:%s", session.getId(), entity.getPubStatus()));
});
}, 10, 30, TimeUnit.SECONDS);
}
@OnOpen
public void onOpen(final Session session, final EndpointConfig endpointConfig,
@PathParam(value = "pubId") final Integer pubId) {
sessionPool.put(pubId, session);
log.info(String.format("【websocket消息】 pubId:%s 加入连接,当前总数为:%s", pubId, sessionPool.size()));
}
@OnClose
public void onClose(final Session session, final CloseReason closeReason) {
final AtomicReference atomicReference = new AtomicReference<>();
sessionPool.forEach((pubId, s) -> {
if (Objects.equals(s.getId(), session.getId())) {
atomicReference.set(pubId);
}
});
sessionPool.remove(atomicReference.get());
log.info(String.format("【websocket消息】pubId:%s 连接断开,当前总数为:%s", atomicReference.get(), sessionPool.size()));
}
@OnMessage
public void onMessage(final Session session, final String message) {
log.error(String.format("【websocket消息】收到客户端 pubId:%s 消息:%s", session.getId(), message));
}
@OnError
public void onError(final Session session, final Throwable throwable) {
log.error(String.format("【websocket消息】pubId:%s 出现异常:%s", session.getId(), throwable));
}
/**
* 向客户端群发消息
* @param message 文本消息
*/
public void sendAllMessage(final String message) {
sessionPool.forEach((pubId, session) -> {
session.getAsyncRemote().sendText(message);
});
}
/**
* 向某个客户端发送消息
*
* @param pubId 客户端id
* @param message 文本消息
*/
public void sendOneMessage(final Integer pubId, final String message) {
final Session session = sessionPool.get(pubId);
if (session != null) {
session.getAsyncRemote().sendText(message);
}
}
}
为什么增加一个 ServerEndpointExporter Bean,并通过在一个类上增加 @ServerEndpoint 和 @Component 注解就可以实现服务器端 Websocket 功能,这里简单解析一下。
@Override
public void afterSingletonsInstantiated() {
registerEndpoints();
}
/**
* Actually register the endpoints. Called by {@link #afterSingletonsInstantiated()}.
*/
protected void registerEndpoints() {
Set> endpointClasses = new LinkedHashSet<>();
if (this.annotatedEndpointClasses != null) {
endpointClasses.addAll(this.annotatedEndpointClasses);
}
ApplicationContext context = getApplicationContext();
if (context != null) {
String[] endpointBeanNames = context.getBeanNamesForAnnotation(ServerEndpoint.class);
for (String beanName : endpointBeanNames) {
endpointClasses.add(context.getType(beanName));
}
}
for (Class<?> endpointClass : endpointClasses) {
registerEndpoint(endpointClass);
}
if (context != null) {
Map endpointConfigMap = context.getBeansOfType(ServerEndpointConfig.class);
for (ServerEndpointConfig endpointConfig : endpointConfigMap.values()) {
registerEndpoint(endpointConfig);
}
}
}
private void registerEndpoint(Class<?> endpointClass) {
ServerContainer serverContainer = getServerContainer();
Assert.state(serverContainer != null,
"No ServerContainer set. Most likely the server's own WebSocket ServletContainerInitializer " +
"has not run yet. Was the Spring ApplicationContext refreshed through a " +
"org.springframework.web.context.ContextLoaderListener, " +
"i.e. after the ServletContext has been fully initialized?");
try {
if (logger.isDebugEnabled()) {
logger.debug("Registering @ServerEndpoint class: " + endpointClass);
}
serverContainer.addEndpoint(endpointClass);
}
catch (DeploymentException ex) {
throw new IllegalStateException("Failed to register @ServerEndpoint class: " + endpointClass, ex);
}
}
java 定义了一套 javax.servlet-api, 一个 HttpServlet 就是一个 HTTP 服务。java websocket 并非基于 servlet-api 简单扩展, 而是新定义了一套 javax.websocket-api。
一个 websocket 服务对应一个 Endpoint。与 ServletContext 对应, websocket-api 也定义了 WebSocketContainer, 而编程方式注册 websocket 的接口是继承自 WebSocketContainer 的 ServerContainer。
一个 websocket 可以接受并管理多个连接, 因此可被视作一个 server。主流 servlet 容器都支持 websocket, 如 tomcat, jetty 等。看 ServerContainer api 文档, 可从 ServletContext attribute 找到 ServerContainer。
页面更新:2024-04-29
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号