WebSocket实现设计

WebSocket 是 HTML5 开始提供的一种在单个 TCP 连接上进行全双工通讯的协议。

有时会遇到服务器需要主动推动消息的需求,或者有频繁的少量消息需要交换。因为HTTP请求的头部可能很长,这种场景下,会带来很多的带宽损耗。

WebSocket 协议,能更好的节省服务器资源和带宽,并且能够更实时地进行通讯。

使用

最简单的例子

1
const ws = new WebSocket("ws://localhost:8080");
2
ws.onopen = function(evt) { 
3
  console.log("Connection open ..."); 
4
  ws.send("Hello WebSockets!");
5
};
6
7
ws.onmessage = function(evt) {
8
  console.log("Received Message: " + evt.data);
9
  ws.close();
10
};
11
12
ws.onclose = function(evt) {
13
  console.log("Connection closed.");
14
};

ws和wss

ws表示纯文本通信,而wss表示使用加密信道通信(TCP+TLS)。

  • ws协议:普通请求,占用与HTTP相同的80端口
  • wss协议:基于SSL的安全传输,占用与TLS相同的443端口。

从WebSocket到Socket.IO

简单来说Socket.IO就是对WebSocket的封装,并且实现了WebSocket的服务端代码。Socket.IO将WebSocket和轮询(Polling)机制以及其它的实时通信方式封装成了通用的接口,并且在服务端实现了这些实时机制的相应代码。也就是说,WebSocket仅仅是Socket.IO实现实时通信的一个子集。

Socket.IO简化了WebSocket API,统一了返回传输的API,具有更好的兼容性。

Socket.IO传输种类

  • WebSocket
  • Flash Socket
  • AJAX long-polling
  • AJAX multipart streaming
  • IFrame
  • JSONP polling

例子

1
const instance = io(url);
2
// 连接成功
3
instance.on('connect', () => {
4
  console.log('Socket connect success');
5
});
6
// error
7
instance.on('error', (error) => {
8
  console.log('Socket error', error);
9
});
10
// 断开连接
11
instance.on('disconnect', (info) => {
12
  console.log('Socket disconnect', info);
13
});
14
// 重联
15
instance.on('reconnect', (attemptNumber) => {
16
  console.log('Socket reconnect', attemptNumber);
17
});
18
// 监听channel
19
instance.on(channel, (message) => {
20
  // handle message
21
});

WebSocket封装实践

有时我们可能在业务中会有大量的ws使用需求,如果各自处理的话会很混乱,这时候可以采用类似于封装request工具的方式来进行处理。

常见场景

  • 发送一条消息,需要在返回对应结果时进行响应处理,且一次性返回结果。实际的例子有导出报表,生成文件后返回url等。
  • 发送一条消息,需要在返回对应结果时进行响应处理,且多次返回结果。实际的例子有行情推送,运行状态监控等。
  • 直接监听channel,等待数据推送持续处理。实际的例子有消息通知等。
  • 直接发送消息,不需要返回或不需要处理返回结果。

要解决的问题

  • 统一使用方式,避免业务层处理socket的连接,断开,重联等复杂情况
  • 提供丰富的消息处理机制,满足常见场景
  • 对连接异常提供重连机制,保证连接稳定
  • 重连后可以保持之前的消息处理继续生效
  • 对复杂场景提供更灵活的使用方式,暴露实例以满足业务需求

设计思想

通过单例模式实现websocket的实例,随页面加载初始化。

封装API覆盖常见场景,简化操作流程。通过参数调整是否持续处理,提供处理机制等。

因为ws的消息跟ajax请求不同,不是一一对应的。所以对于前端主动请求的结果,可以前端生成唯一标识作为id,随消息发送,后台返回的时候带上对应的id。

断开后主动重连,恢复断开前状态,重新监听之前的channel,收到消息继续之前的处理机制。

自动管理channel的监听,消息使用新的channel和重联时,自动开启。

框架设计

1
export class SocketFactory {
2
  constructor() {
3
    this.instance = null;
4
    this.handleMQ = {};
5
    this.handleMQPersist = {};
6
    this.noIdChannelList = {};
7
    this.errorCallback = {};
8
  }
9
	// 初始化
10
  init = () => {};
11
	// 关闭实例
12
  close = () => {};
13
	// 暴露实例
14
  getSocket = () => {
15
    return this.instance;
16
  };
17
	// 错误处理
18
  showError = (channel, msg, requestId) => {};
19
	// 消息处理
20
  handleMessage = (channel, msg, noId) => {};
21
	// 判定是否开启通道监听
22
  listenChannel = (channel, noId) => {
23
    if (condition) { this.openListenChannel(channel, noId) }
24
    else { ... }
25
  };
26
	// 开启通道监听
27
  openListenChannel = (channel, noId) => {};
28
	// 发送消息,并绑定后续的处理逻辑
29
  sendMessage = (channel, message, handleFun, persist, errorCallback) => {};
30
	// 直接监听某条消息
31
  listenMessage = (channel, messageId, handleFun, persist) => {};
32
	// 直接监听通道
33
  listenMessageNoId = (channel, messagetype, handleFun, persist) => {};
34
	// 停止通道或者消息的监听
35
  stopListenMessage = (channel, messageId) => {};
36
}
37
// 单例模式
38
const GlobalSocket = new SocketFactory();
39
window.GlobalSocket = GlobalSocket;
40
export default GlobalSocket;

发送消息的设计

1
sendMessage = (channel, message, handleFun, persist, errorCallback) => {
2
  const uuid = UUID(); // 生成唯一标识
3
  this.listenChannel(channel); // 通知需要监听通道
4
  if (handleFun) { // 绑定处理函数
5
    this.handleMQ[channel][uuid] = handleFun;
6
  }
7
  if (persist) { // 是否需要持续处理
8
    this.handleMQPersist[channel][uuid] = true;
9
  }
10
  if (errorCallback) { // 是否需要特殊的错误处理
11
    this.errorCallback[uuid] = errorCallback;
12
  }
13
  this.instance.emit(channel, { requestId: uuid, content: message }); // 发送消息
14
}

处理消息的设计

1
handleMessage = (channel, msg, noId) => {
2
    if (this.handleMQ[channel]) { // 判定是否处理消息
3
      if (noId) { // 监听通道的处理,如果消息通知
4
        for (const type of Object.keys(this.handleMQ[channel])) {
5
          if (msg.code === 0) {
6
            this.handleMQ[channel][type](msg.data);
7
            if (!(this.handleMQPersist[channel] && this.handleMQPersist[channel][type])) {
8
              delete this.handleMQ[channel][type];
9
            }
10
          } else if (!(this.handleMQPersist[channel]
11
            && this.handleMQPersist[channel][type])) {
12
            this.showError(channel, msg);
13
            delete this.handleMQ[channel][type];
14
          } else {
15
            this.showError(channel, msg);
16
          }
17
        }
18
      } else if (this.handleMQ[channel][msg.requestId]) { // 业务的处理
19
        if (msg.code === 0) { // 正常数据
20
          this.handleMQ[channel][msg.requestId](msg.data);
21
          if (!(this.handleMQPersist[channel] && this.handleMQPersist[channel][msg.requestId])) {
22
            delete this.handleMQ[channel][msg.requestId]; // 移除一次处理的任务
23
          }
24
        } else if (!(this.handleMQPersist[channel] // 非持续处理任务,发生错误后移除
25
          && this.handleMQPersist[channel][msg.requestId])) {
26
          this.showError(channel, msg, msg.requestId);
27
          delete this.handleMQ[channel][msg.requestId];
28
        } else {
29
          this.showError(channel, msg, msg.requestId); // 持续处理任务,发生错误后保持
30
        }
31
      }
32
    }
33
  }

重连后的设计

重连之后需要恢复之前的状态,需要在连接后判断需要开启的通道。

1
this.instance.on('connect', () => {
2
  console.log('Socket connect success');
3
  for (const channel of Object.keys(this.handleMQ)) {
4
    if (this.noIdChannelList[channel]) {
5
      this.openListenChannel(channel, true);
6
    } else {
7
      this.openListenChannel(channel);
8
    }
9
  }
10
});

因为重连之后某些通道可能会自动开启,为了避免重复监听导致处理两次,需要判断。

1
openListenChannel = (channel, noId) => {
2
	if (this.instance && !this.instance['_callbacks'][`$${channel}`]) {
3
    this.instance.on(channel, (message) => {
4
    	this.handleMessage(channel, message, noId);
5
    });
6
    console.log(channel, 'listen');
7
  }
8
}