解决项目前端页面响应问题
本篇文章记录的是前端页面数据需要动态更新数据,解决页面状态和实际设备状态不一致的问题,引进了新的一个协议—Websocket(websocket是html5提供的一种浏览器与服务器进行全面双工通讯的网络技术,是基于TCP传输的应用层协议)协议。
项目背景
我们的web页面要去展示我们设备的在线离线状态、客流量载运情况、和第三方厂家设备的连接状态等等,这些数据都需要实时的去更新,保证能在第一时间获取到最新的数据信息。为了满足这个需求,于是就增加了websocket协议。
技术实现
封装消息
首先第一步就要去封装好消息格式,根据上述背景可以得知,并不是这个协议只用于某一个消息的,所以要将消息体之类的设计好。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| package com.yuyi.pis.common.websocket;
import lombok.Data;
@Data public class WebSocketMessage<T> {
private String messageType;
private T body; }
|
实际的消息体就不去举例了,这只是前端需要的不同数据而已。
下一步就是组装发送数据了,怎么组装,怎么发送的逻辑了。由于有多个模块,都需要使用这个websocket服务端,将数据主动推送给前端,所以我将websocket单独设计了一个模块,通过消息中心将各个模块的数据都发送到这个模块,然后由这个模块同一去转发数据给前端页面。
websocket服务端实现
当然了,这些都是设计前的准备,当我们把这些准备工作做完了,重头戏也就是要将websocket的服务端开发完成,这个模块中,我们要监听消息中心的队列,去完成消息转发给前端页面。
这个模块特别的简单,就只是需要去完成数据转发的工作,抽出来的目的就是为了使用消息中心达到解耦的作用而已。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| package com.yuyi.websocket.config;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration public class WebSocketConfig {
@Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| package com.yuyi.websocket.mq;
import com.yuyi.pis.common.constants.RabbitMqQueueValue; import com.yuyi.pis.common.constants.ServerIdConstant; import com.yuyi.websocket.websocket.WebSocket; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component;
@Slf4j @Component public class RabbitMqListener {
@Autowired private WebSocket webSocket;
@RabbitListener(queuesToDeclare = @Queue(RabbitMqQueueValue.WEBSOCKET)) public void consumeMsg(@Payload String msg) { log.debug("websocket模块收到消息,并进行转发:{}",msg); webSocket.sendAllMsgToWebSocket(msg); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
| package com.yuyi.websocket.websocket;
import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component;
import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet;
@Component @Slf4j @ServerEndpoint("/ws/{serverId}") public class WebSocket {
private Session session;
private String serverId;
private static CopyOnWriteArraySet<WebSocket> webSockets = new CopyOnWriteArraySet<>();
private static ConcurrentHashMap<String, Session> sessionPool = new ConcurrentHashMap<String, Session>();
@OnOpen public void onOpen(Session session, @PathParam(value = "serverId") String serverId) { try { this.session = session; this.serverId = serverId; webSockets.add(this); sessionPool.put(serverId, session); log.info("【websocket消息】有新的连接,总数为:" + webSockets.size()); } catch (Exception e) { } }
@OnClose public void onClose() { try { webSockets.remove(this); sessionPool.remove(this.serverId); log.info("【websocket消息】连接断开,总数为:" + webSockets.size()); } catch (Exception e) { } }
@OnMessage public void onMessage(String message) { log.info("【websocket消息】收到客户端消息:" + message); }
@OnError public void onError(Session session, Throwable error) { log.error("用户错误,原因:" + error.getMessage()); error.printStackTrace(); }
public synchronized void sendMessage(String serverId, String message) { Session session = sessionPool.get(serverId);
if (session != null && session.isOpen()) { synchronized (session) { try { log.info("【{}】服务发送websocket消息,消息内容是:{}", serverId, message); session.getAsyncRemote().sendText(message); } catch (Exception e) { e.printStackTrace(); } } } }
public synchronized void sendMessageClient(String serverId, String message) { Session session = this.session;
if (session != null && session.isOpen()) { synchronized (session) { try { log.info("【{}】服务发送websocket消息,消息内容是:{}", serverId, message); session.getAsyncRemote().sendText(message); } catch (Exception e) { e.printStackTrace(); } } } }
public void sendMsgToWebSocket(String serverId, String message) { this.sendMessage(serverId, message); }
public void sendAllMsgToWebSocket(String message) { webSockets.forEach((webSocket) -> { webSocket.sendMessageClient(webSocket.serverId, message); }); }
}
|
sendAllMsgToWebSocket方法说明:由于web页面有可能会出现多开的问题,这样就会导致某些客户端数据丢失,因为一个客户端数据将消息接收了,其他客户端就接收不到websocket的消息了,这样web页面数据就会出现不一致的问题了。这个的原理也是很简单的,就将所有的连接都进行发送一次数据而已。
然后讲讲为啥要将所有的数据都通过这一个websocket来给前端传输数据,这样的目的有两点:
- 解耦,各个模块负责各个模块的事情,不能让每个模块都产生一个websocket的服务,减少后端资源的消耗。
- 减少前端的资源消耗,这样前端也只需要也一个客户端,根据消息类型就能处理不同的消息请求了。
测试工具
为了测试websocket我这推荐一个网站:http://wstool.js.org/ 可以写好直接就测试用的。
总结
之前旧版本是让前端每10秒刷新一次数据,这样每多一个任务就会多产生一个线程,造成线程的浪费。切换到这种websocket的协议上来使用,就可以极大的避免资源的浪费了,而且具有更好的实时性。