SpringBoot+Vue+Websocket 实现服务器端向客户端主动发送消息

概述

本文通过一个实际的场景来介绍在前后端分离的项目中通过 WebSocket 来实现服务器端主动向客户端发送消息的应用。主要内容如下

  1. WebSocket 是什么
  2. 服务器端 向 客户端 主动发送消息的案例说明
  3. SpringBoot 后端中 Websocket 的配置和使用
  4. 后端 Websocket 实现原理
  5. Vue 前端 Websocket 的配置和使用

WebSocket 是什么

Websocket 是一种在单个 TCP 连接上进行全双工通信的协议。WebSocket 连接成功后,服务端与客户端可以双向通信。在需要消息推送的场景,Websocket 相对于轮询能更好的节省服务器资源和带宽,并且能够更实时地进行通讯。

SpringBoot+Vue+Websocket 实现服务器端向客户端主动发送消息

具体如下特点

  1. 与 HTTP 协议有着良好的兼容性。默认端口也是 80 和 443,并且握手阶段采用 HTTP 协议,因此握手时不容易屏蔽,能通过各种 HTTP 代理服务器。
  2. 依赖于 TCP 协议
  3. 数据格式比较轻量,性能开销小,通信高效。
  4. 可以发送文本,也可以发送二进制数据。
  5. 没有同源限制,客户端可以与任意服务器通信。
  6. 协议标识符是 ws(如果加密,则为 wss),服务器网址就是 URL

服务器端 向 客户端 主动发送消息的案例说明

在客户端的列表数据中有个 status 字段,服务器端需要花费较长的时间进行处理,处理完成后才会更新对应数据的 status 字段值,通过 Websocket 的处理流程如下:

  1. 前端页面列表数据加载后,初始化一组 Websocket 客户端对象
  2. 服务器端 接收到 前端数据状态的查询请求
  3. 服务器端 每隔一段时间查询一下数据库,然后返回给客户端
  4. 客户端 根据返回的数据状态,再更新页面数据

后端 SpringBoot 中 Websocket 的配置和使用

Maven 依赖


   org.springframework.boot
   spring-boot-starter-websocket

配置

通过注入 ServerEndpointExporter 类,用于在项目启动的时候自动将使用了 @ServerEndpoint 注解声明的 Websocket endpoint 注册到 WebSocketContainer 中。

package com.ckjava.config;

/**
 * Function:
 *
 * @author chenkui 2022/4/6 17:55
 */

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

@Configuration
public class WebSocketConfig {
    /**
     * 注入 ServerEndpointExporter,
     * 这个 bean 会自动注册使用了 @ServerEndpoint 注解声明的 Websocket endpoint
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

}

通过@ServerEndpoint注解标注实现类

  1. 通过在类上增加 @ServerEndpoint 和 @Component 注解,用于标注 Websocket 的实现类
  2. 通过 ConcurrentHashMap 管理多个客户端的 Session
  3. 通过 ScheduledExecutorService 和 init 方法实现定时对客户端进行消息发送
  4. @OnOpen 标注的方法,用于接收客户端的 连接请求,其中的 @PathParam 用于接收 url 中的参数
  5. @OnClose 标注的方法,用于接收客户端的 关闭请求。
  6. @OnMessage标注的方法,用于接收客户端的 消息。
  7. @OnError标注的方法,用于错误处理
package com.ckjava.websocket;

import com.ckjava.xutils.JsonUtils;
import com.ckjava.entity.TSysPubEntity;
import com.ckjava.service.TSysPubService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

/**
 * Function:
 *
 * @author chenkui 2022/4/6 18:00
 */
@Slf4j
@Component
@ServerEndpoint("/websocket/{pubId}")
public class PubStatusWS {
    /**
     * 每个数据对应一个 session
     */
    private static final ConcurrentHashMap sessionPool = new ConcurrentHashMap<>();
    /**
     * 定时任务的线程池
     */
    private static final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);

    @Resource
    private TSysPubService pubService;

    /**
     * 定时从数据库中获取数据的最新状态,然后返回给前端
     */
    @PostConstruct
    public void init() {
        scheduledExecutorService.scheduleAtFixedRate(() -> {
            sessionPool.forEach((pubId, session) -> {
                final TSysPubEntity entity = pubService.findById(pubId);

                // 返回最新的状态
                session.getAsyncRemote().sendText(JsonUtils.toJSONString(entity));
                log.debug(String.format("websocket消息 server send to pubId %s, text:%s", session.getId(), entity.getPubStatus()));
            });
        }, 10, 30, TimeUnit.SECONDS);
    }

    @OnOpen
    public void onOpen(final Session session, final EndpointConfig endpointConfig,
                       @PathParam(value = "pubId") final Integer pubId) {
        sessionPool.put(pubId, session);
        log.info(String.format("【websocket消息】 pubId:%s 加入连接,当前总数为:%s", pubId, sessionPool.size()));
    }

    @OnClose
    public void onClose(final Session session, final CloseReason closeReason) {
        final AtomicReference atomicReference = new AtomicReference<>();
        sessionPool.forEach((pubId, s) -> {
            if (Objects.equals(s.getId(), session.getId())) {
                atomicReference.set(pubId);
            }
        });

        sessionPool.remove(atomicReference.get());
        log.info(String.format("【websocket消息】pubId:%s 连接断开,当前总数为:%s", atomicReference.get(), sessionPool.size()));
    }

    @OnMessage
    public void onMessage(final Session session, final String message) {
        log.error(String.format("【websocket消息】收到客户端 pubId:%s 消息:%s", session.getId(), message));
    }

    @OnError
    public void onError(final Session session, final Throwable throwable) {
        log.error(String.format("【websocket消息】pubId:%s 出现异常:%s", session.getId(), throwable));
    }

    /**
     * 向客户端群发消息
     * @param message 文本消息
     */
    public void sendAllMessage(final String message) {
        sessionPool.forEach((pubId, session) -> {
            session.getAsyncRemote().sendText(message);
        });
    }

    /**
     * 向某个客户端发送消息
     *
     * @param pubId 客户端id
     * @param message 文本消息
     */
    public void sendOneMessage(final Integer pubId, final String message) {
        final Session session = sessionPool.get(pubId);
        if (session != null) {
            session.getAsyncRemote().sendText(message);
        }
    }


}

后端 Websocket 实现原理

为什么增加一个 ServerEndpointExporter Bean,并通过在一个类上增加 @ServerEndpoint 和 @Component 注解就可以实现服务器端 Websocket 功能,这里简单解析一下。

ServerEndpointExporter 的核心方法

  1. ServerEndpointExporter 实现了 spring 中的 SmartInitializingSingleton 接口,并重写了 afterSingletonsInstantiated 方法,具体如下
@Override
public void afterSingletonsInstantiated() {
	registerEndpoints();
}
  1. 在 registerEndpoints 方法中可以发现,通过 ApplicationContext 中的 getBeanNamesForAnnotation 方法,从 spring 的 ioc 容器中获取含有 @ServerEndpoint 注解的类。
/**
 * Actually register the endpoints. Called by {@link #afterSingletonsInstantiated()}.
 */
protected void registerEndpoints() {
	Set> endpointClasses = new LinkedHashSet<>();
	if (this.annotatedEndpointClasses != null) {
		endpointClasses.addAll(this.annotatedEndpointClasses);
	}

	ApplicationContext context = getApplicationContext();
	if (context != null) {
		String[] endpointBeanNames = context.getBeanNamesForAnnotation(ServerEndpoint.class);
		for (String beanName : endpointBeanNames) {
			endpointClasses.add(context.getType(beanName));
		}
	}

	for (Class<?> endpointClass : endpointClasses) {
		registerEndpoint(endpointClass);
	}

	if (context != null) {
		Map endpointConfigMap = context.getBeansOfType(ServerEndpointConfig.class);
		for (ServerEndpointConfig endpointConfig : endpointConfigMap.values()) {
			registerEndpoint(endpointConfig);
		}
	}
}
  1. 在 registerEndpoint 方法中,通过 ServerContainer 的 addEndpoint 方法,最终将 endpoint 实现类注册到 ServerContainer 中。
private void registerEndpoint(Class<?> endpointClass) {
	ServerContainer serverContainer = getServerContainer();
	Assert.state(serverContainer != null,
			"No ServerContainer set. Most likely the server's own WebSocket ServletContainerInitializer " +
			"has not run yet. Was the Spring ApplicationContext refreshed through a " +
			"org.springframework.web.context.ContextLoaderListener, " +
			"i.e. after the ServletContext has been fully initialized?");
	try {
		if (logger.isDebugEnabled()) {
			logger.debug("Registering @ServerEndpoint class: " + endpointClass);
		}
		serverContainer.addEndpoint(endpointClass);
	}
	catch (DeploymentException ex) {
		throw new IllegalStateException("Failed to register @ServerEndpoint class: " + endpointClass, ex);
	}
}

ServerContainer

java 定义了一套 javax.servlet-api, 一个 HttpServlet 就是一个 HTTP 服务。java websocket 并非基于 servlet-api 简单扩展, 而是新定义了一套 javax.websocket-api。

一个 websocket 服务对应一个 Endpoint。与 ServletContext 对应, websocket-api 也定义了 WebSocketContainer, 而编程方式注册 websocket 的接口是继承自 WebSocketContainer 的 ServerContainer。

一个 websocket 可以接受并管理多个连接, 因此可被视作一个 server。主流 servlet 容器都支持 websocket, 如 tomcat, jetty 等。看 ServerContainer api 文档, 可从 ServletContext attribute 找到 ServerContainer。

Vue 前端 Websocket 的配置和使用

  1. 在 created 方法中调用了 getPageData 方法,用于接收到列表数据后,通过 initWs 方法 给 每个数据 id 初始化一个 WebSocket 客户端
  2. 通过 onerror 事件绑定 出现异常 时候的回调方法
  3. 通过 onopen 事件绑定 连接成功 的回调方法
  4. 通过 onmessage 事件绑定 接收到服务器端消息 的回调方法,这里收到消息后,再更新前端的数据
  5. 通过 onclose 事件绑定 关闭连接 的回调方法
  6. 在 unmounted 方法中调用了 onbeforeunload 方法,用于关闭所有的 WebSocket 连接
展开阅读全文

页面更新:2024-04-29

标签:服务器端   客户端   注解   绑定   主动   状态   协议   消息   事件   方法   数据

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号

Top