websocket协议 应用层 websocket 其实是一种应用层的协议,client是http协议的变种,基于http协议 改变了请求头信息 Upgrade:websocket
server端 也是基于http协议解析报文基于Upgrade:websocket 头部信息识别长连接 返回status:101 并开始建立长连接
socket server端socket采用阻塞模式,高并发一般采用协程、线程等等方式
websocket报文 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 """ 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-------+-+-------------+-------------------------------+ |F|R|R|R| opcode|M| Payload len | Extended payload length | |I|S|S|S| (4) |A| (7) | (16/64) | |N|V|V|V| |S| | (if payload len==126/127) | | |1|2|3| |K| | | +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - + | Extended payload length continued, if payload len == 127 | + - - - - - - - - - - - - - - - +-------------------------------+ | |Masking-key, if MASK set to 1 | +-------------------------------+-------------------------------+ | Masking-key (continued) | Payload Data | +-------------------------------- - - - - - - - - - - - - - - - + : Payload Data continued ... : + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + | Payload Data continued ... | +---------------------------------------------------------------+ ~~~~~~~~~~ websocket 报文协议 """
建立socket长连接之后 client server 基于上序报文发送请求和响应
报文组成部分:
FIN 标识是否是最后一个报文是否结束 1代表结束
RSV1、RSV2、RSV3 暂时没有使用
opcode代表消息类型:0:附加数据 1:文本数据 2: 二进制数据 8: close消息 9: ping消息 10: pong消息
MASK body内容是否做了掩码处理
Payload len 有多种含义 总共是7个字节 如果是126 后面两个字节代表body长度 如果是127 后面八个字节代表body长度
MASK 如果mask标识为1 这个是mask加密值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 def decode_header (cls, stream ): read = stream.read data = read(2 ) if len (data) != 2 : raise WebSocketError("Unexpected EOF while decoding header" ) first_byte, second_byte = struct.unpack('!BB' , data) header = cls( fin=first_byte & cls.FIN_MASK == cls.FIN_MASK, opcode=first_byte & cls.OPCODE_MASK, flags=first_byte & cls.HEADER_FLAG_MASK, length=second_byte & cls.LENGTH_MASK) has_mask = second_byte & cls.MASK_MASK == cls.MASK_MASK if header.opcode > 0x07 : if not header.fin: raise ProtocolError( "Received fragmented control frame: {0!r}" .format (data)) if header.length > 125 : raise FrameTooLargeException( "Control frame cannot be larger than 125 bytes: " "{0!r}" .format (data)) if header.length == 126 : data = read(2 ) if len (data) != 2 : raise WebSocketError('Unexpected EOF while decoding header' ) header.length = struct.unpack('!H' , data)[0 ] elif header.length == 127 : data = read(8 ) if len (data) != 8 : raise WebSocketError('Unexpected EOF while decoding header' ) header.length = struct.unpack('!Q' , data)[0 ] if has_mask: mask = read(4 ) if len (mask) != 4 : raise WebSocketError('Unexpected EOF while decoding header' ) header.mask = mask return header
长连接模式 任何一个socket client请求到了server端 一般会用线程方式处理导致考虑到长连接的模式 一直要维护这个链接
线程是不能无限制的开启 所以一般情况下 采用的都是协程或者一步回掉的方式 python:gevent java:CompletableFuture等 (最好是加机器)
webcoket 函数方法 websocket 一般会有 on_open on_message on_close 等方法 其实实现方式都较为简单
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 def handle (self ): self.on_open() while True : try : message = self.ws.receive() stime = time.time() except WebSocketError: if not getattr (self, 'server_close' , False ): self.on_close('close' ) break if message: resp = self.on_message(message) log.info( 'time=%s|origin=%s|req=%s|resp=%s' , time.time()-stime, self.environ.get('HTTP_X_REAL_IP' , '' ) or self.environ.get( 'REMOTE_ADDR' , '' ) or '-' , message, resp or '' ) if resp: self.ws.send(resp)
链接请求过来之后 会开启一个线程或者协程的方式实例化socketHander对象 并且调用handle方法 第一次on_open 然后进入死循环
self.ws.receive() 调用的是socket recv方法 由于socket为阻塞所以一直会阻塞 如果读取消息之后会调用on_message方法, 同时也会处理on_close事件
self.ws.receive() 读取websocket报文数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 def read_message (self ): opcode = None message = bytearray () while True : header, payload = self.read_frame() log.debug('opcode:%s|payload:%s' , header.opcode, payload) f_opcode = header.opcode if f_opcode in (self.OPCODE_TEXT, self.OPCODE_BINARY): opcode = f_opcode message += payload if f_opcode == self.OPCODE_TEXT: return self._decode_bytes(message) else : return message elif f_opcode == self.OPCODE_CONTINUATION: if not opcode: raise ProtocolError("Unexpected frame with opcode=0" ) elif f_opcode == self.OPCODE_PING: self.handle_ping(header, payload) continue elif f_opcode == self.OPCODE_PONG: self.handle_pong(header, payload) continue elif f_opcode == self.OPCODE_CLOSE: self.handle_close(header, payload) return else : raise ProtocolError("Unexpected opcode={0!r}" .format (f_opcode)) if header.fin: break
原理类似 也是死循环读取数据 一个完成的websocket消息 先读取header 再读取payload 然后基于op_code处理不同的消息类型
如果是正常的请求报文则返回数据 让on_message处理
如果是ping或者pong返回ping pong帧
如果是close也返回close幁 并且处方websocketCloseError on_close事件消息
心跳机制 很多协议都含有心跳机制、websocket因为基于长连接 但是有一种可能 client socket 可能挂了 并且没有发送socket close包 这个时候服务端是不知道的 所以服务端需要有一个主动删除连接的机制
client 主动发送ping包 server记录每一次更新时间 维护链接如果时间大于指定timeout就断开连接 释放资源
一般情况都是域名访问 可能会走nginx 这个时候设置keepalive 并且注意时间的设置 ping包一定要大于nginx配置时间