舒晟技术专栏

E-Mail(269844948@qq.com)

websocket协议

websocket协议

应用层

websocket 其实是一种应用层的协议,client是http协议的变种,基于http协议 改变了请求头信息 Upgrade:websocket

server端 也是基于http协议解析报文基于Upgrade:websocket 头部信息识别长连接 返回status:101 并开始建立长连接

socket

server端socket采用阻塞模式,高并发一般采用协程、线程等等方式

websocket报文
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
"""
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len | Extended payload length |
|I|S|S|S| (4) |A| (7) | (16/64) |
|N|V|V|V| |S| | (if payload len==126/127) |
| |1|2|3| |K| | |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
| Extended payload length continued, if payload len == 127 |
+ - - - - - - - - - - - - - - - +-------------------------------+
| |Masking-key, if MASK set to 1 |
+-------------------------------+-------------------------------+
| Masking-key (continued) | Payload Data |
+-------------------------------- - - - - - - - - - - - - - - - +
: Payload Data continued ... :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
| Payload Data continued ... |
+---------------------------------------------------------------+
~~~~~~~~~~
websocket 报文协议

"""

建立socket长连接之后 client server 基于上序报文发送请求和响应

报文组成部分:

  1. FIN 标识是否是最后一个报文是否结束 1代表结束
  2. RSV1、RSV2、RSV3 暂时没有使用
  3. opcode代表消息类型:0:附加数据 1:文本数据 2: 二进制数据 8: close消息 9: ping消息 10: pong消息
  4. MASK body内容是否做了掩码处理
  5. Payload len 有多种含义 总共是7个字节 如果是126 后面两个字节代表body长度 如果是127 后面八个字节代表body长度
  6. MASK 如果mask标识为1 这个是mask加密值
header读取代码片段
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def decode_header(cls, stream):
read = stream.read
# 读前两个字节 16bit
data = read(2)

if len(data) != 2:
raise WebSocketError("Unexpected EOF while decoding header")

first_byte, second_byte = struct.unpack('!BB', data)

header = cls(
fin=first_byte & cls.FIN_MASK == cls.FIN_MASK, # fin 为1 代表结束
opcode=first_byte & cls.OPCODE_MASK, # 获取opcode
flags=first_byte & cls.HEADER_FLAG_MASK, # 获取rsv 操作信息
length=second_byte & cls.LENGTH_MASK) # 获取payload len 数值

has_mask = second_byte & cls.MASK_MASK == cls.MASK_MASK # 判断是否对payloadData 进行掩码处理

# 0x07 之后都是 连接关闭 ping pong 所以都是fin都应该是消息结果 并且length不应该超过125
if header.opcode > 0x07:
if not header.fin:
raise ProtocolError(
"Received fragmented control frame: {0!r}".format(data))

# Control frames MUST have a payload length of 125 bytes or less
if header.length > 125:
raise FrameTooLargeException(
"Control frame cannot be larger than 125 bytes: "
"{0!r}".format(data))

# 如果126 读取后2个字节为长度
if header.length == 126:
data = read(2)

if len(data) != 2:
raise WebSocketError('Unexpected EOF while decoding header')

# 获取无符号整形数字
header.length = struct.unpack('!H', data)[0]
# 如果126 读取后8个字节为长度
elif header.length == 127:
# 64 bit length
data = read(8)

if len(data) != 8:
raise WebSocketError('Unexpected EOF while decoding header')

# 获取无符号长整形数字
header.length = struct.unpack('!Q', data)[0]

if has_mask:
mask = read(4)

if len(mask) != 4:
raise WebSocketError('Unexpected EOF while decoding header')

header.mask = mask

return header
长连接模式

任何一个socket client请求到了server端 一般会用线程方式处理导致考虑到长连接的模式 一直要维护这个链接

线程是不能无限制的开启 所以一般情况下 采用的都是协程或者一步回掉的方式 python:gevent java:CompletableFuture等 (最好是加机器)

webcoket 函数方法

websocket 一般会有 on_open on_message on_close 等方法 其实实现方式都较为简单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def handle(self):
self.on_open()
while True:
try:
message = self.ws.receive()
stime = time.time()
except WebSocketError:
if not getattr(self, 'server_close', False):
self.on_close('close')
break
if message:
resp = self.on_message(message)
log.info(
'time=%s|origin=%s|req=%s|resp=%s',
time.time()-stime,
self.environ.get('HTTP_X_REAL_IP', '') or self.environ.get(
'REMOTE_ADDR', '') or '-',
message, resp or ''
)
if resp:
self.ws.send(resp)

链接请求过来之后 会开启一个线程或者协程的方式实例化socketHander对象 并且调用handle方法 第一次on_open 然后进入死循环

self.ws.receive() 调用的是socket recv方法 由于socket为阻塞所以一直会阻塞 如果读取消息之后会调用on_message方法, 同时也会处理on_close事件

self.ws.receive()

读取websocket报文数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def read_message(self):
opcode = None
message = bytearray()
while True:
header, payload = self.read_frame()
log.debug('opcode:%s|payload:%s', header.opcode, payload)
f_opcode = header.opcode

if f_opcode in (self.OPCODE_TEXT, self.OPCODE_BINARY):
opcode = f_opcode
message += payload
if f_opcode == self.OPCODE_TEXT:
return self._decode_bytes(message)
else:
return message
# 多帧 后面都是持续帧 目前不处理多帧情况
elif f_opcode == self.OPCODE_CONTINUATION:
if not opcode:
raise ProtocolError("Unexpected frame with opcode=0")

elif f_opcode == self.OPCODE_PING:
self.handle_ping(header, payload)
continue

elif f_opcode == self.OPCODE_PONG:
self.handle_pong(header, payload)
continue

elif f_opcode == self.OPCODE_CLOSE:
self.handle_close(header, payload)
return

else:
raise ProtocolError("Unexpected opcode={0!r}".format(f_opcode))
if header.fin:
break

原理类似 也是死循环读取数据 一个完成的websocket消息 先读取header 再读取payload 然后基于op_code处理不同的消息类型

  1. 如果是正常的请求报文则返回数据 让on_message处理
  2. 如果是ping或者pong返回ping pong帧
  3. 如果是close也返回close幁 并且处方websocketCloseError on_close事件消息

心跳机制

很多协议都含有心跳机制、websocket因为基于长连接 但是有一种可能 client socket 可能挂了 并且没有发送socket close包 这个时候服务端是不知道的 所以服务端需要有一个主动删除连接的机制

  1. client 主动发送ping包 server记录每一次更新时间 维护链接如果时间大于指定timeout就断开连接 释放资源
  2. 一般情况都是域名访问 可能会走nginx 这个时候设置keepalive 并且注意时间的设置 ping包一定要大于nginx配置时间