WebSocket实现设计
WebSocket 是 HTML5 开始提供的一种在单个 TCP 连接上进行全双工通讯的协议。
有时会遇到服务器需要主动推动消息的需求,或者有频繁的少量消息需要交换。因为HTTP请求的头部可能很长,这种场景下,会带来很多的带宽损耗。
WebSocket 协议,能更好的节省服务器资源和带宽,并且能够更实时地进行通讯。
使用
最简单的例子
1 | const ws = new WebSocket("ws://localhost:8080"); |
2 | ws.onopen = function(evt) { |
3 | console.log("Connection open ..."); |
4 | ws.send("Hello WebSockets!"); |
5 | }; |
6 | |
7 | ws.onmessage = function(evt) { |
8 | console.log("Received Message: " + evt.data); |
9 | ws.close(); |
10 | }; |
11 | |
12 | ws.onclose = function(evt) { |
13 | console.log("Connection closed."); |
14 | }; |
ws和wss
ws表示纯文本通信,而wss表示使用加密信道通信(TCP+TLS)。
- ws协议:普通请求,占用与HTTP相同的80端口
- wss协议:基于SSL的安全传输,占用与TLS相同的443端口。
从WebSocket到Socket.IO
简单来说Socket.IO就是对WebSocket的封装,并且实现了WebSocket的服务端代码。Socket.IO将WebSocket和轮询(Polling)机制以及其它的实时通信方式封装成了通用的接口,并且在服务端实现了这些实时机制的相应代码。也就是说,WebSocket仅仅是Socket.IO实现实时通信的一个子集。
Socket.IO简化了WebSocket API,统一了返回传输的API,具有更好的兼容性。
Socket.IO传输种类
- WebSocket
- Flash Socket
- AJAX long-polling
- AJAX multipart streaming
- IFrame
- JSONP polling
例子
1 | const instance = io(url); |
2 | // 连接成功 |
3 | instance.on('connect', () => { |
4 | console.log('Socket connect success'); |
5 | }); |
6 | // error |
7 | instance.on('error', (error) => { |
8 | console.log('Socket error', error); |
9 | }); |
10 | // 断开连接 |
11 | instance.on('disconnect', (info) => { |
12 | console.log('Socket disconnect', info); |
13 | }); |
14 | // 重联 |
15 | instance.on('reconnect', (attemptNumber) => { |
16 | console.log('Socket reconnect', attemptNumber); |
17 | }); |
18 | // 监听channel |
19 | instance.on(channel, (message) => { |
20 | // handle message |
21 | }); |
WebSocket封装实践
有时我们可能在业务中会有大量的ws使用需求,如果各自处理的话会很混乱,这时候可以采用类似于封装request工具的方式来进行处理。
常见场景
- 发送一条消息,需要在返回对应结果时进行响应处理,且一次性返回结果。实际的例子有导出报表,生成文件后返回url等。
- 发送一条消息,需要在返回对应结果时进行响应处理,且多次返回结果。实际的例子有行情推送,运行状态监控等。
- 直接监听channel,等待数据推送持续处理。实际的例子有消息通知等。
- 直接发送消息,不需要返回或不需要处理返回结果。
要解决的问题
- 统一使用方式,避免业务层处理socket的连接,断开,重联等复杂情况
- 提供丰富的消息处理机制,满足常见场景
- 对连接异常提供重连机制,保证连接稳定
- 重连后可以保持之前的消息处理继续生效
- 对复杂场景提供更灵活的使用方式,暴露实例以满足业务需求
设计思想
通过单例模式实现websocket的实例,随页面加载初始化。
封装API覆盖常见场景,简化操作流程。通过参数调整是否持续处理,提供处理机制等。
因为ws的消息跟ajax请求不同,不是一一对应的。所以对于前端主动请求的结果,可以前端生成唯一标识作为id,随消息发送,后台返回的时候带上对应的id。
断开后主动重连,恢复断开前状态,重新监听之前的channel,收到消息继续之前的处理机制。
自动管理channel的监听,消息使用新的channel和重联时,自动开启。
框架设计
1 | export class SocketFactory { |
2 | constructor() { |
3 | this.instance = null; |
4 | this.handleMQ = {}; |
5 | this.handleMQPersist = {}; |
6 | this.noIdChannelList = {}; |
7 | this.errorCallback = {}; |
8 | } |
9 | // 初始化 |
10 | init = () => {}; |
11 | // 关闭实例 |
12 | close = () => {}; |
13 | // 暴露实例 |
14 | getSocket = () => { |
15 | return this.instance; |
16 | }; |
17 | // 错误处理 |
18 | showError = (channel, msg, requestId) => {}; |
19 | // 消息处理 |
20 | handleMessage = (channel, msg, noId) => {}; |
21 | // 判定是否开启通道监听 |
22 | listenChannel = (channel, noId) => { |
23 | if (condition) { this.openListenChannel(channel, noId) } |
24 | else { ... } |
25 | }; |
26 | // 开启通道监听 |
27 | openListenChannel = (channel, noId) => {}; |
28 | // 发送消息,并绑定后续的处理逻辑 |
29 | sendMessage = (channel, message, handleFun, persist, errorCallback) => {}; |
30 | // 直接监听某条消息 |
31 | listenMessage = (channel, messageId, handleFun, persist) => {}; |
32 | // 直接监听通道 |
33 | listenMessageNoId = (channel, messagetype, handleFun, persist) => {}; |
34 | // 停止通道或者消息的监听 |
35 | stopListenMessage = (channel, messageId) => {}; |
36 | } |
37 | // 单例模式 |
38 | const GlobalSocket = new SocketFactory(); |
39 | window.GlobalSocket = GlobalSocket; |
40 | export default GlobalSocket; |
发送消息的设计
1 | sendMessage = (channel, message, handleFun, persist, errorCallback) => { |
2 | const uuid = UUID(); // 生成唯一标识 |
3 | this.listenChannel(channel); // 通知需要监听通道 |
4 | if (handleFun) { // 绑定处理函数 |
5 | this.handleMQ[channel][uuid] = handleFun; |
6 | } |
7 | if (persist) { // 是否需要持续处理 |
8 | this.handleMQPersist[channel][uuid] = true; |
9 | } |
10 | if (errorCallback) { // 是否需要特殊的错误处理 |
11 | this.errorCallback[uuid] = errorCallback; |
12 | } |
13 | this.instance.emit(channel, { requestId: uuid, content: message }); // 发送消息 |
14 | } |
处理消息的设计
1 | handleMessage = (channel, msg, noId) => { |
2 | if (this.handleMQ[channel]) { // 判定是否处理消息 |
3 | if (noId) { // 监听通道的处理,如果消息通知 |
4 | for (const type of Object.keys(this.handleMQ[channel])) { |
5 | if (msg.code === 0) { |
6 | this.handleMQ[channel][type](msg.data); |
7 | if (!(this.handleMQPersist[channel] && this.handleMQPersist[channel][type])) { |
8 | delete this.handleMQ[channel][type]; |
9 | } |
10 | } else if (!(this.handleMQPersist[channel] |
11 | && this.handleMQPersist[channel][type])) { |
12 | this.showError(channel, msg); |
13 | delete this.handleMQ[channel][type]; |
14 | } else { |
15 | this.showError(channel, msg); |
16 | } |
17 | } |
18 | } else if (this.handleMQ[channel][msg.requestId]) { // 业务的处理 |
19 | if (msg.code === 0) { // 正常数据 |
20 | this.handleMQ[channel][msg.requestId](msg.data); |
21 | if (!(this.handleMQPersist[channel] && this.handleMQPersist[channel][msg.requestId])) { |
22 | delete this.handleMQ[channel][msg.requestId]; // 移除一次处理的任务 |
23 | } |
24 | } else if (!(this.handleMQPersist[channel] // 非持续处理任务,发生错误后移除 |
25 | && this.handleMQPersist[channel][msg.requestId])) { |
26 | this.showError(channel, msg, msg.requestId); |
27 | delete this.handleMQ[channel][msg.requestId]; |
28 | } else { |
29 | this.showError(channel, msg, msg.requestId); // 持续处理任务,发生错误后保持 |
30 | } |
31 | } |
32 | } |
33 | } |
重连后的设计
重连之后需要恢复之前的状态,需要在连接后判断需要开启的通道。
1 | this.instance.on('connect', () => { |
2 | console.log('Socket connect success'); |
3 | for (const channel of Object.keys(this.handleMQ)) { |
4 | if (this.noIdChannelList[channel]) { |
5 | this.openListenChannel(channel, true); |
6 | } else { |
7 | this.openListenChannel(channel); |
8 | } |
9 | } |
10 | }); |
因为重连之后某些通道可能会自动开启,为了避免重复监听导致处理两次,需要判断。
1 | openListenChannel = (channel, noId) => { |
2 | if (this.instance && !this.instance['_callbacks'][`$${channel}`]) { |
3 | this.instance.on(channel, (message) => { |
4 | this.handleMessage(channel, message, noId); |
5 | }); |
6 | console.log(channel, 'listen'); |
7 | } |
8 | } |