解决项目前端页面响应问题

本篇文章记录的是前端页面数据需要动态更新数据,解决页面状态和实际设备状态不一致的问题,引进了新的一个协议—Websocket(websocket是html5提供的一种浏览器与服务器进行全面双工通讯的网络技术,是基于TCP传输的应用层协议)协议。

项目背景

我们的web页面要去展示我们设备的在线离线状态、客流量载运情况、和第三方厂家设备的连接状态等等,这些数据都需要实时的去更新,保证能在第一时间获取到最新的数据信息。为了满足这个需求,于是就增加了websocket协议。

技术实现

封装消息

首先第一步就要去封装好消息格式,根据上述背景可以得知,并不是这个协议只用于某一个消息的,所以要将消息体之类的设计好。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package com.yuyi.pis.common.websocket;

import lombok.Data;

/**
* @description: 对发送websocket消息体进行封装
* @author: wzl
* @date: 2022/11/05
*/
@Data
public class WebSocketMessage<T> {
/**
* 消息类型,前端根据这个消息类型去匹配不同的消息
*/
private String messageType;
/**
* 实际的消息体
*/
private T body;
}

实际的消息体就不去举例了,这只是前端需要的不同数据而已。
下一步就是组装发送数据了,怎么组装,怎么发送的逻辑了。由于有多个模块,都需要使用这个websocket服务端,将数据主动推送给前端,所以我将websocket单独设计了一个模块,通过消息中心将各个模块的数据都发送到这个模块,然后由这个模块同一去转发数据给前端页面。
/img/img.png
/img/img_1.png

websocket服务端实现

当然了,这些都是设计前的准备,当我们把这些准备工作做完了,重头戏也就是要将websocket的服务端开发完成,这个模块中,我们要监听消息中心的队列,去完成消息转发给前端页面。
/img/img_2.png
这个模块特别的简单,就只是需要去完成数据转发的工作,抽出来的目的就是为了使用消息中心达到解耦的作用而已。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.yuyi.websocket.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
* @author wzl
* @date: 2022/11/06
*/
@Configuration
public class WebSocketConfig {
/**
* 注入ServerEndpointExporter,
* 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.yuyi.websocket.mq;

import com.yuyi.pis.common.constants.RabbitMqQueueValue;
import com.yuyi.pis.common.constants.ServerIdConstant;
import com.yuyi.websocket.websocket.WebSocket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

/**
* @description: rabbitmq监听者
* @author: wzl
* @date: 2022/11/06
*/
@Slf4j
@Component
public class RabbitMqListener {

@Autowired
private WebSocket webSocket;

/**
* 指定要监听的队列 ,消费实例
*/
@RabbitListener(queuesToDeclare = @Queue(RabbitMqQueueValue.WEBSOCKET))
public void consumeMsg(@Payload String msg) {
log.debug("websocket模块收到消息,并进行转发:{}",msg);
webSocket.sendAllMsgToWebSocket(msg);
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package com.yuyi.websocket.websocket;


import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;

/**
* @author 接口路径 ws://localhost:8086/webSocket/serverId;
* @author: wzl
* @date: 2022/11/06
*/
@Component
@Slf4j
@ServerEndpoint("/ws/{serverId}")
public class WebSocket {

/**
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
*/
private Session session;
/**
* 服务ID
*/
private String serverId;

/**
* concurrent包的线程安全Set,用来存放每个客户端对应的WebSocket对象。
* * 虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean,所以可以用一个静态set保存起来。
* * 注:底下WebSocket是当前类名
*/
private static CopyOnWriteArraySet<WebSocket> webSockets = new CopyOnWriteArraySet<>();
/**
* 用来存在线连接用户信息
*/
private static ConcurrentHashMap<String, Session> sessionPool = new ConcurrentHashMap<String, Session>();

/**
* 链接成功调用的方法
*/
@OnOpen
public void onOpen(Session session, @PathParam(value = "serverId") String serverId) {
try {
this.session = session;
this.serverId = serverId;
webSockets.add(this);
sessionPool.put(serverId, session);
log.info("【websocket消息】有新的连接,总数为:" + webSockets.size());
} catch (Exception e) {
}
}

/**
* 链接关闭调用的方法
*/
@OnClose
public void onClose() {
try {
webSockets.remove(this);
sessionPool.remove(this.serverId);
log.info("【websocket消息】连接断开,总数为:" + webSockets.size());
} catch (Exception e) {
}
}

/**
* 收到客户端消息后调用的方法
*
* @param message
*/
@OnMessage
public void onMessage(String message) {
log.info("【websocket消息】收到客户端消息:" + message);
}

/**
* 发送错误时的处理
*
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("用户错误,原因:" + error.getMessage());
error.printStackTrace();
}

/**
* 此为单点消息
*/
public synchronized void sendMessage(String serverId, String message) {
Session session = sessionPool.get(serverId);

if (session != null && session.isOpen()) {
synchronized (session) {
try {
log.info("【{}】服务发送websocket消息,消息内容是:{}", serverId, message);
session.getAsyncRemote().sendText(message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

/**
* 此为单点消息
*/
public synchronized void sendMessageClient(String serverId, String message) {
Session session = this.session;

if (session != null && session.isOpen()) {
synchronized (session) {
try {
log.info("【{}】服务发送websocket消息,消息内容是:{}", serverId, message);
session.getAsyncRemote().sendText(message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

/**
* 将数据主动向前端推送
*
* @param serverId 服务id 用作前端作为区别
* @param message 消息体 JSONObject.toJSONString( 实体类, SerializerFeature.DisableCircularReferenceDetect)
*/
public void sendMsgToWebSocket(String serverId, String message) {
// 主动推送数据给前端
this.sendMessage(serverId, message);
}

/**
* 将数据主动向前端推送
*/
public void sendAllMsgToWebSocket(String message) {
webSockets.forEach((webSocket) -> {
webSocket.sendMessageClient(webSocket.serverId, message);
});
}


}

sendAllMsgToWebSocket方法说明:由于web页面有可能会出现多开的问题,这样就会导致某些客户端数据丢失,因为一个客户端数据将消息接收了,其他客户端就接收不到websocket的消息了,这样web页面数据就会出现不一致的问题了。这个的原理也是很简单的,就将所有的连接都进行发送一次数据而已。

然后讲讲为啥要将所有的数据都通过这一个websocket来给前端传输数据,这样的目的有两点:

  1. 解耦,各个模块负责各个模块的事情,不能让每个模块都产生一个websocket的服务,减少后端资源的消耗。
  2. 减少前端的资源消耗,这样前端也只需要也一个客户端,根据消息类型就能处理不同的消息请求了。

测试工具

为了测试websocket我这推荐一个网站:http://wstool.js.org/ 可以写好直接就测试用的。

总结

之前旧版本是让前端每10秒刷新一次数据,这样每多一个任务就会多产生一个线程,造成线程的浪费。切换到这种websocket的协议上来使用,就可以极大的避免资源的浪费了,而且具有更好的实时性。