Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/websocket/_abnf.py: 32%

225 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:48 +0000

1import array 

2import os 

3import struct 

4import sys 

5 

6from ._exceptions import * 

7from ._utils import validate_utf8 

8from threading import Lock 

9 

10""" 

11_abnf.py 

12websocket - WebSocket client library for Python 

13 

14Copyright 2022 engn33r 

15 

16Licensed under the Apache License, Version 2.0 (the "License"); 

17you may not use this file except in compliance with the License. 

18You may obtain a copy of the License at 

19 

20 http://www.apache.org/licenses/LICENSE-2.0 

21 

22Unless required by applicable law or agreed to in writing, software 

23distributed under the License is distributed on an "AS IS" BASIS, 

24WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

25See the License for the specific language governing permissions and 

26limitations under the License. 

27""" 

28 

29try: 

30 # If wsaccel is available, use compiled routines to mask data. 

31 # wsaccel only provides around a 10% speed boost compared 

32 # to the websocket-client _mask() implementation. 

33 # Note that wsaccel is unmaintained. 

34 from wsaccel.xormask import XorMaskerSimple 

35 

36 def _mask(_m, _d) -> bytes: 

37 return XorMaskerSimple(_m).process(_d) 

38 

39except ImportError: 

40 # wsaccel is not available, use websocket-client _mask() 

41 native_byteorder = sys.byteorder 

42 

43 def _mask(mask_value: int, data_value: int) -> bytes: 

44 datalen = len(data_value) 

45 data_value = int.from_bytes(data_value, native_byteorder) 

46 mask_value = int.from_bytes(mask_value * (datalen // 4) + mask_value[: datalen % 4], native_byteorder) 

47 return (data_value ^ mask_value).to_bytes(datalen, native_byteorder) 

48 

49 

50__all__ = [ 

51 'ABNF', 'continuous_frame', 'frame_buffer', 

52 'STATUS_NORMAL', 

53 'STATUS_GOING_AWAY', 

54 'STATUS_PROTOCOL_ERROR', 

55 'STATUS_UNSUPPORTED_DATA_TYPE', 

56 'STATUS_STATUS_NOT_AVAILABLE', 

57 'STATUS_ABNORMAL_CLOSED', 

58 'STATUS_INVALID_PAYLOAD', 

59 'STATUS_POLICY_VIOLATION', 

60 'STATUS_MESSAGE_TOO_BIG', 

61 'STATUS_INVALID_EXTENSION', 

62 'STATUS_UNEXPECTED_CONDITION', 

63 'STATUS_BAD_GATEWAY', 

64 'STATUS_TLS_HANDSHAKE_ERROR', 

65] 

66 

67# closing frame status codes. 

68STATUS_NORMAL = 1000 

69STATUS_GOING_AWAY = 1001 

70STATUS_PROTOCOL_ERROR = 1002 

71STATUS_UNSUPPORTED_DATA_TYPE = 1003 

72STATUS_STATUS_NOT_AVAILABLE = 1005 

73STATUS_ABNORMAL_CLOSED = 1006 

74STATUS_INVALID_PAYLOAD = 1007 

75STATUS_POLICY_VIOLATION = 1008 

76STATUS_MESSAGE_TOO_BIG = 1009 

77STATUS_INVALID_EXTENSION = 1010 

78STATUS_UNEXPECTED_CONDITION = 1011 

79STATUS_SERVICE_RESTART = 1012 

80STATUS_TRY_AGAIN_LATER = 1013 

81STATUS_BAD_GATEWAY = 1014 

82STATUS_TLS_HANDSHAKE_ERROR = 1015 

83 

84VALID_CLOSE_STATUS = ( 

85 STATUS_NORMAL, 

86 STATUS_GOING_AWAY, 

87 STATUS_PROTOCOL_ERROR, 

88 STATUS_UNSUPPORTED_DATA_TYPE, 

89 STATUS_INVALID_PAYLOAD, 

90 STATUS_POLICY_VIOLATION, 

91 STATUS_MESSAGE_TOO_BIG, 

92 STATUS_INVALID_EXTENSION, 

93 STATUS_UNEXPECTED_CONDITION, 

94 STATUS_SERVICE_RESTART, 

95 STATUS_TRY_AGAIN_LATER, 

96 STATUS_BAD_GATEWAY, 

97) 

98 

99 

100class ABNF: 

101 """ 

102 ABNF frame class. 

103 See http://tools.ietf.org/html/rfc5234 

104 and http://tools.ietf.org/html/rfc6455#section-5.2 

105 """ 

106 

107 # operation code values. 

108 OPCODE_CONT = 0x0 

109 OPCODE_TEXT = 0x1 

110 OPCODE_BINARY = 0x2 

111 OPCODE_CLOSE = 0x8 

112 OPCODE_PING = 0x9 

113 OPCODE_PONG = 0xa 

114 

115 # available operation code value tuple 

116 OPCODES = (OPCODE_CONT, OPCODE_TEXT, OPCODE_BINARY, OPCODE_CLOSE, 

117 OPCODE_PING, OPCODE_PONG) 

118 

119 # opcode human readable string 

120 OPCODE_MAP = { 

121 OPCODE_CONT: "cont", 

122 OPCODE_TEXT: "text", 

123 OPCODE_BINARY: "binary", 

124 OPCODE_CLOSE: "close", 

125 OPCODE_PING: "ping", 

126 OPCODE_PONG: "pong" 

127 } 

128 

129 # data length threshold. 

130 LENGTH_7 = 0x7e 

131 LENGTH_16 = 1 << 16 

132 LENGTH_63 = 1 << 63 

133 

134 def __init__(self, fin: int = 0, rsv1: int = 0, rsv2: int = 0, rsv3: int = 0, 

135 opcode: int = OPCODE_TEXT, mask: int = 1, data: str = "") -> None: 

136 """ 

137 Constructor for ABNF. Please check RFC for arguments. 

138 """ 

139 self.fin = fin 

140 self.rsv1 = rsv1 

141 self.rsv2 = rsv2 

142 self.rsv3 = rsv3 

143 self.opcode = opcode 

144 self.mask = mask 

145 if data is None: 

146 data = "" 

147 self.data = data 

148 self.get_mask_key = os.urandom 

149 

150 def validate(self, skip_utf8_validation: bool = False) -> None: 

151 """ 

152 Validate the ABNF frame. 

153 

154 Parameters 

155 ---------- 

156 skip_utf8_validation: skip utf8 validation. 

157 """ 

158 if self.rsv1 or self.rsv2 or self.rsv3: 

159 raise WebSocketProtocolException("rsv is not implemented, yet") 

160 

161 if self.opcode not in ABNF.OPCODES: 

162 raise WebSocketProtocolException("Invalid opcode %r", self.opcode) 

163 

164 if self.opcode == ABNF.OPCODE_PING and not self.fin: 

165 raise WebSocketProtocolException("Invalid ping frame.") 

166 

167 if self.opcode == ABNF.OPCODE_CLOSE: 

168 l = len(self.data) 

169 if not l: 

170 return 

171 if l == 1 or l >= 126: 

172 raise WebSocketProtocolException("Invalid close frame.") 

173 if l > 2 and not skip_utf8_validation and not validate_utf8(self.data[2:]): 

174 raise WebSocketProtocolException("Invalid close frame.") 

175 

176 code = 256 * self.data[0] + self.data[1] 

177 if not self._is_valid_close_status(code): 

178 raise WebSocketProtocolException("Invalid close opcode %r", code) 

179 

180 @staticmethod 

181 def _is_valid_close_status(code: int) -> bool: 

182 return code in VALID_CLOSE_STATUS or (3000 <= code < 5000) 

183 

184 def __str__(self) -> str: 

185 return "fin=" + str(self.fin) \ 

186 + " opcode=" + str(self.opcode) \ 

187 + " data=" + str(self.data) 

188 

189 @staticmethod 

190 def create_frame(data: str, opcode: int, fin: int = 1) -> 'ABNF': 

191 """ 

192 Create frame to send text, binary and other data. 

193 

194 Parameters 

195 ---------- 

196 data: <type> 

197 data to send. This is string value(byte array). 

198 If opcode is OPCODE_TEXT and this value is unicode, 

199 data value is converted into unicode string, automatically. 

200 opcode: <type> 

201 operation code. please see OPCODE_XXX. 

202 fin: <type> 

203 fin flag. if set to 0, create continue fragmentation. 

204 """ 

205 if opcode == ABNF.OPCODE_TEXT and isinstance(data, str): 

206 data = data.encode("utf-8") 

207 # mask must be set if send data from client 

208 return ABNF(fin, 0, 0, 0, opcode, 1, data) 

209 

210 def format(self) -> bytes: 

211 """ 

212 Format this object to string(byte array) to send data to server. 

213 """ 

214 if any(x not in (0, 1) for x in [self.fin, self.rsv1, self.rsv2, self.rsv3]): 

215 raise ValueError("not 0 or 1") 

216 if self.opcode not in ABNF.OPCODES: 

217 raise ValueError("Invalid OPCODE") 

218 length = len(self.data) 

219 if length >= ABNF.LENGTH_63: 

220 raise ValueError("data is too long") 

221 

222 frame_header = chr(self.fin << 7 | 

223 self.rsv1 << 6 | self.rsv2 << 5 | self.rsv3 << 4 | 

224 self.opcode).encode('latin-1') 

225 if length < ABNF.LENGTH_7: 

226 frame_header += chr(self.mask << 7 | length).encode('latin-1') 

227 elif length < ABNF.LENGTH_16: 

228 frame_header += chr(self.mask << 7 | 0x7e).encode('latin-1') 

229 frame_header += struct.pack("!H", length) 

230 else: 

231 frame_header += chr(self.mask << 7 | 0x7f).encode('latin-1') 

232 frame_header += struct.pack("!Q", length) 

233 

234 if not self.mask: 

235 return frame_header + self.data 

236 else: 

237 mask_key = self.get_mask_key(4) 

238 return frame_header + self._get_masked(mask_key) 

239 

240 def _get_masked(self, mask_key: bytes) -> bytes: 

241 s = ABNF.mask(mask_key, self.data) 

242 

243 if isinstance(mask_key, str): 

244 mask_key = mask_key.encode('utf-8') 

245 

246 return mask_key + s 

247 

248 @staticmethod 

249 def mask(mask_key: bytes, data: bytes) -> bytes: 

250 """ 

251 Mask or unmask data. Just do xor for each byte 

252 

253 Parameters 

254 ---------- 

255 mask_key: bytes or str 

256 4 byte mask. 

257 data: bytes or str 

258 data to mask/unmask. 

259 """ 

260 if data is None: 

261 data = "" 

262 

263 if isinstance(mask_key, str): 

264 mask_key = mask_key.encode('latin-1') 

265 

266 if isinstance(data, str): 

267 data = data.encode('latin-1') 

268 

269 return _mask(array.array("B", mask_key), array.array("B", data)) 

270 

271 

272class frame_buffer: 

273 _HEADER_MASK_INDEX = 5 

274 _HEADER_LENGTH_INDEX = 6 

275 

276 def __init__(self, recv_fn: int, skip_utf8_validation: bool) -> None: 

277 self.recv = recv_fn 

278 self.skip_utf8_validation = skip_utf8_validation 

279 # Buffers over the packets from the layer beneath until desired amount 

280 # bytes of bytes are received. 

281 self.recv_buffer = [] 

282 self.clear() 

283 self.lock = Lock() 

284 

285 def clear(self) -> None: 

286 self.header = None 

287 self.length = None 

288 self.mask = None 

289 

290 def has_received_header(self) -> bool: 

291 return self.header is None 

292 

293 def recv_header(self) -> None: 

294 header = self.recv_strict(2) 

295 b1 = header[0] 

296 fin = b1 >> 7 & 1 

297 rsv1 = b1 >> 6 & 1 

298 rsv2 = b1 >> 5 & 1 

299 rsv3 = b1 >> 4 & 1 

300 opcode = b1 & 0xf 

301 b2 = header[1] 

302 has_mask = b2 >> 7 & 1 

303 length_bits = b2 & 0x7f 

304 

305 self.header = (fin, rsv1, rsv2, rsv3, opcode, has_mask, length_bits) 

306 

307 def has_mask(self) -> bool or int: 

308 if not self.header: 

309 return False 

310 return self.header[frame_buffer._HEADER_MASK_INDEX] 

311 

312 def has_received_length(self) -> bool: 

313 return self.length is None 

314 

315 def recv_length(self) -> None: 

316 bits = self.header[frame_buffer._HEADER_LENGTH_INDEX] 

317 length_bits = bits & 0x7f 

318 if length_bits == 0x7e: 

319 v = self.recv_strict(2) 

320 self.length = struct.unpack("!H", v)[0] 

321 elif length_bits == 0x7f: 

322 v = self.recv_strict(8) 

323 self.length = struct.unpack("!Q", v)[0] 

324 else: 

325 self.length = length_bits 

326 

327 def has_received_mask(self) -> bool: 

328 return self.mask is None 

329 

330 def recv_mask(self) -> None: 

331 self.mask = self.recv_strict(4) if self.has_mask() else "" 

332 

333 def recv_frame(self) -> ABNF: 

334 

335 with self.lock: 

336 # Header 

337 if self.has_received_header(): 

338 self.recv_header() 

339 (fin, rsv1, rsv2, rsv3, opcode, has_mask, _) = self.header 

340 

341 # Frame length 

342 if self.has_received_length(): 

343 self.recv_length() 

344 length = self.length 

345 

346 # Mask 

347 if self.has_received_mask(): 

348 self.recv_mask() 

349 mask = self.mask 

350 

351 # Payload 

352 payload = self.recv_strict(length) 

353 if has_mask: 

354 payload = ABNF.mask(mask, payload) 

355 

356 # Reset for next frame 

357 self.clear() 

358 

359 frame = ABNF(fin, rsv1, rsv2, rsv3, opcode, has_mask, payload) 

360 frame.validate(self.skip_utf8_validation) 

361 

362 return frame 

363 

364 def recv_strict(self, bufsize: int) -> bytes: 

365 shortage = bufsize - sum(map(len, self.recv_buffer)) 

366 while shortage > 0: 

367 # Limit buffer size that we pass to socket.recv() to avoid 

368 # fragmenting the heap -- the number of bytes recv() actually 

369 # reads is limited by socket buffer and is relatively small, 

370 # yet passing large numbers repeatedly causes lots of large 

371 # buffers allocated and then shrunk, which results in 

372 # fragmentation. 

373 bytes_ = self.recv(min(16384, shortage)) 

374 self.recv_buffer.append(bytes_) 

375 shortage -= len(bytes_) 

376 

377 unified = b"".join(self.recv_buffer) 

378 

379 if shortage == 0: 

380 self.recv_buffer = [] 

381 return unified 

382 else: 

383 self.recv_buffer = [unified[bufsize:]] 

384 return unified[:bufsize] 

385 

386 

387class continuous_frame: 

388 

389 def __init__(self, fire_cont_frame: bool, skip_utf8_validation: bool) -> None: 

390 self.fire_cont_frame = fire_cont_frame 

391 self.skip_utf8_validation = skip_utf8_validation 

392 self.cont_data = None 

393 self.recving_frames = None 

394 

395 def validate(self, frame: ABNF) -> None: 

396 if not self.recving_frames and frame.opcode == ABNF.OPCODE_CONT: 

397 raise WebSocketProtocolException("Illegal frame") 

398 if self.recving_frames and \ 

399 frame.opcode in (ABNF.OPCODE_TEXT, ABNF.OPCODE_BINARY): 

400 raise WebSocketProtocolException("Illegal frame") 

401 

402 def add(self, frame: ABNF) -> None: 

403 if self.cont_data: 

404 self.cont_data[1] += frame.data 

405 else: 

406 if frame.opcode in (ABNF.OPCODE_TEXT, ABNF.OPCODE_BINARY): 

407 self.recving_frames = frame.opcode 

408 self.cont_data = [frame.opcode, frame.data] 

409 

410 if frame.fin: 

411 self.recving_frames = None 

412 

413 def is_fire(self, frame: ABNF) -> bool or int: 

414 return frame.fin or self.fire_cont_frame 

415 

416 def extract(self, frame: ABNF) -> list: 

417 data = self.cont_data 

418 self.cont_data = None 

419 frame.data = data[1] 

420 if not self.fire_cont_frame and data[0] == ABNF.OPCODE_TEXT and not self.skip_utf8_validation and not validate_utf8(frame.data): 

421 raise WebSocketPayloadException( 

422 "cannot decode: " + repr(frame.data)) 

423 

424 return [data[0], frame]