Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/websocket/_abnf.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

249 statements  

1import array 

2import os 

3import struct 

4import sys 

5from threading import Lock 

6from typing import Callable, Optional, Union, Any 

7 

8from ._exceptions import WebSocketPayloadException, WebSocketProtocolException 

9from ._utils import validate_utf8 

10 

11""" 

12_abnf.py 

13websocket - WebSocket client library for Python 

14 

15Copyright 2025 engn33r 

16 

17Licensed under the Apache License, Version 2.0 (the "License"); 

18you may not use this file except in compliance with the License. 

19You may obtain a copy of the License at 

20 

21 http://www.apache.org/licenses/LICENSE-2.0 

22 

23Unless required by applicable law or agreed to in writing, software 

24distributed under the License is distributed on an "AS IS" BASIS, 

25WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

26See the License for the specific language governing permissions and 

27limitations under the License. 

28""" 

29 

30try: 

31 # If wsaccel is available, use compiled routines to mask data. 

32 # wsaccel only provides around a 10% speed boost compared 

33 # to the websocket-client _mask() implementation. 

34 # Note that wsaccel is unmaintained. 

35 from wsaccel.xormask import XorMaskerSimple 

36 

37 def _mask(mask_value: array.array, data_value: array.array) -> bytes: 

38 mask_result: bytes = XorMaskerSimple(mask_value).process(data_value) 

39 return mask_result 

40 

41except ImportError: 

42 # wsaccel is not available, use websocket-client _mask() 

43 native_byteorder = sys.byteorder 

44 

45 def _mask(mask_value: array.array, data_value: array.array) -> bytes: 

46 datalen = len(data_value) 

47 int_data_value = int.from_bytes(data_value, native_byteorder) 

48 int_mask_value = int.from_bytes( 

49 mask_value * (datalen // 4) + mask_value[: datalen % 4], native_byteorder 

50 ) 

51 return (int_data_value ^ int_mask_value).to_bytes(datalen, native_byteorder) 

52 

53 

54__all__ = [ 

55 "ABNF", 

56 "continuous_frame", 

57 "frame_buffer", 

58 "STATUS_NORMAL", 

59 "STATUS_GOING_AWAY", 

60 "STATUS_PROTOCOL_ERROR", 

61 "STATUS_UNSUPPORTED_DATA_TYPE", 

62 "STATUS_STATUS_NOT_AVAILABLE", 

63 "STATUS_ABNORMAL_CLOSED", 

64 "STATUS_INVALID_PAYLOAD", 

65 "STATUS_POLICY_VIOLATION", 

66 "STATUS_MESSAGE_TOO_BIG", 

67 "STATUS_INVALID_EXTENSION", 

68 "STATUS_UNEXPECTED_CONDITION", 

69 "STATUS_BAD_GATEWAY", 

70 "STATUS_TLS_HANDSHAKE_ERROR", 

71] 

72 

73# closing frame status codes. 

74STATUS_NORMAL = 1000 

75STATUS_GOING_AWAY = 1001 

76STATUS_PROTOCOL_ERROR = 1002 

77STATUS_UNSUPPORTED_DATA_TYPE = 1003 

78STATUS_STATUS_NOT_AVAILABLE = 1005 

79STATUS_ABNORMAL_CLOSED = 1006 

80STATUS_INVALID_PAYLOAD = 1007 

81STATUS_POLICY_VIOLATION = 1008 

82STATUS_MESSAGE_TOO_BIG = 1009 

83STATUS_INVALID_EXTENSION = 1010 

84STATUS_UNEXPECTED_CONDITION = 1011 

85STATUS_SERVICE_RESTART = 1012 

86STATUS_TRY_AGAIN_LATER = 1013 

87STATUS_BAD_GATEWAY = 1014 

88STATUS_TLS_HANDSHAKE_ERROR = 1015 

89 

90VALID_CLOSE_STATUS = ( 

91 STATUS_NORMAL, 

92 STATUS_GOING_AWAY, 

93 STATUS_PROTOCOL_ERROR, 

94 STATUS_UNSUPPORTED_DATA_TYPE, 

95 STATUS_INVALID_PAYLOAD, 

96 STATUS_POLICY_VIOLATION, 

97 STATUS_MESSAGE_TOO_BIG, 

98 STATUS_INVALID_EXTENSION, 

99 STATUS_UNEXPECTED_CONDITION, 

100 STATUS_SERVICE_RESTART, 

101 STATUS_TRY_AGAIN_LATER, 

102 STATUS_BAD_GATEWAY, 

103) 

104 

105 

106class ABNF: 

107 """ 

108 ABNF frame class. 

109 See http://tools.ietf.org/html/rfc5234 

110 and http://tools.ietf.org/html/rfc6455#section-5.2 

111 """ 

112 

113 # operation code values. 

114 OPCODE_CONT = 0x0 

115 OPCODE_TEXT = 0x1 

116 OPCODE_BINARY = 0x2 

117 OPCODE_CLOSE = 0x8 

118 OPCODE_PING = 0x9 

119 OPCODE_PONG = 0xA 

120 

121 # available operation code value tuple 

122 OPCODES = ( 

123 OPCODE_CONT, 

124 OPCODE_TEXT, 

125 OPCODE_BINARY, 

126 OPCODE_CLOSE, 

127 OPCODE_PING, 

128 OPCODE_PONG, 

129 ) 

130 

131 # opcode human readable string 

132 OPCODE_MAP = { 

133 OPCODE_CONT: "cont", 

134 OPCODE_TEXT: "text", 

135 OPCODE_BINARY: "binary", 

136 OPCODE_CLOSE: "close", 

137 OPCODE_PING: "ping", 

138 OPCODE_PONG: "pong", 

139 } 

140 

141 # data length threshold. 

142 LENGTH_7 = 0x7E 

143 LENGTH_16 = 1 << 16 

144 LENGTH_63 = 1 << 63 

145 

146 def __init__( 

147 self, 

148 fin: int = 0, 

149 rsv1: int = 0, 

150 rsv2: int = 0, 

151 rsv3: int = 0, 

152 opcode: int = OPCODE_TEXT, 

153 mask_value: int = 1, 

154 data: Optional[Union[str, bytes]] = "", 

155 ) -> None: 

156 """ 

157 Constructor for ABNF. Please check RFC for arguments. 

158 """ 

159 self.fin = fin 

160 self.rsv1 = rsv1 

161 self.rsv2 = rsv2 

162 self.rsv3 = rsv3 

163 self.opcode = opcode 

164 self.mask_value = mask_value 

165 if data is None: 

166 data = "" 

167 self.data = data 

168 self.get_mask_key = os.urandom 

169 

170 def validate(self, skip_utf8_validation: bool = False) -> None: 

171 """ 

172 Validate the ABNF frame. 

173 

174 Parameters 

175 ---------- 

176 skip_utf8_validation: skip utf8 validation. 

177 """ 

178 if self.rsv1 or self.rsv2 or self.rsv3: 

179 raise WebSocketProtocolException("rsv is not implemented, yet") 

180 

181 if self.opcode not in ABNF.OPCODES: 

182 raise WebSocketProtocolException("Invalid opcode %r", self.opcode) 

183 

184 if self.opcode == ABNF.OPCODE_PING and not self.fin: 

185 raise WebSocketProtocolException("Invalid ping frame.") 

186 

187 if self.opcode == ABNF.OPCODE_CLOSE: 

188 data_length = len(self.data) 

189 if not data_length: 

190 return 

191 if data_length == 1 or data_length >= 126: 

192 raise WebSocketProtocolException("Invalid close frame.") 

193 if ( 

194 data_length > 2 

195 and not skip_utf8_validation 

196 and not validate_utf8(self.data[2:]) 

197 ): 

198 raise WebSocketProtocolException("Invalid close frame.") 

199 

200 data_bytes = ( 

201 self.data[:2] 

202 if isinstance(self.data, bytes) 

203 else self.data[:2].encode("utf-8") 

204 ) 

205 code = struct.unpack("!H", data_bytes)[0] 

206 if not self._is_valid_close_status(code): 

207 raise WebSocketProtocolException("Invalid close opcode %r", code) 

208 

209 @staticmethod 

210 def _is_valid_close_status(code: int) -> bool: 

211 return code in VALID_CLOSE_STATUS or (3000 <= code < 5000) 

212 

213 def __str__(self) -> str: 

214 data_repr = self.data if isinstance(self.data, str) else repr(self.data) 

215 return f"fin={self.fin} opcode={self.opcode} data={data_repr}" 

216 

217 @staticmethod 

218 def create_frame(data: Union[bytes, str], opcode: int, fin: int = 1) -> "ABNF": 

219 """ 

220 Create frame to send text, binary and other data. 

221 

222 Parameters 

223 ---------- 

224 data: str 

225 data to send. This is string value(byte array). 

226 If opcode is OPCODE_TEXT and this value is unicode, 

227 data value is converted into unicode string, automatically. 

228 opcode: int 

229 operation code. please see OPCODE_MAP. 

230 fin: int 

231 fin flag. if set to 0, create continue fragmentation. 

232 """ 

233 if opcode == ABNF.OPCODE_TEXT and isinstance(data, str): 

234 data = data.encode("utf-8") 

235 # mask must be set if send data from client 

236 return ABNF(fin, 0, 0, 0, opcode, 1, data) 

237 

238 def format(self) -> bytes: 

239 """ 

240 Format this object to string(byte array) to send data to server. 

241 """ 

242 if any(x not in (0, 1) for x in [self.fin, self.rsv1, self.rsv2, self.rsv3]): 

243 raise ValueError("not 0 or 1") 

244 if self.opcode not in ABNF.OPCODES: 

245 raise ValueError("Invalid OPCODE") 

246 length = len(self.data) 

247 if length >= ABNF.LENGTH_63: 

248 raise ValueError("data is too long") 

249 

250 frame_header = chr( 

251 self.fin << 7 

252 | self.rsv1 << 6 

253 | self.rsv2 << 5 

254 | self.rsv3 << 4 

255 | self.opcode 

256 ).encode("latin-1") 

257 if length < ABNF.LENGTH_7: 

258 frame_header += chr(self.mask_value << 7 | length).encode("latin-1") 

259 elif length < ABNF.LENGTH_16: 

260 frame_header += chr(self.mask_value << 7 | 0x7E).encode("latin-1") 

261 frame_header += struct.pack("!H", length) 

262 else: 

263 frame_header += chr(self.mask_value << 7 | 0x7F).encode("latin-1") 

264 frame_header += struct.pack("!Q", length) 

265 

266 if not self.mask_value: 

267 if isinstance(self.data, str): 

268 self.data = self.data.encode("utf-8") 

269 return frame_header + self.data 

270 mask_key = self.get_mask_key(4) 

271 return frame_header + self._get_masked(mask_key) 

272 

273 def _get_masked(self, mask_key: Union[str, bytes]) -> bytes: 

274 s = ABNF.mask(mask_key, self.data) 

275 

276 if isinstance(mask_key, str): 

277 mask_key = mask_key.encode("utf-8") 

278 

279 return mask_key + s 

280 

281 @staticmethod 

282 def mask(mask_key: Union[str, bytes], data: Union[str, bytes]) -> bytes: 

283 """ 

284 Mask or unmask data. Just do xor for each byte 

285 

286 Parameters 

287 ---------- 

288 mask_key: bytes or str 

289 4 byte mask. 

290 data: bytes or str 

291 data to mask/unmask. 

292 """ 

293 if data is None: 

294 data = "" 

295 

296 if isinstance(mask_key, str): 

297 mask_key = mask_key.encode("latin-1") 

298 

299 if isinstance(data, str): 

300 data = data.encode("latin-1") 

301 

302 return _mask(array.array("B", mask_key), array.array("B", data)) 

303 

304 

305class frame_buffer: 

306 _HEADER_MASK_INDEX = 5 

307 _HEADER_LENGTH_INDEX = 6 

308 

309 def __init__( 

310 self, recv_fn: Callable[[int], int], skip_utf8_validation: bool 

311 ) -> None: 

312 self.recv = recv_fn 

313 self.skip_utf8_validation = skip_utf8_validation 

314 # Buffers over the packets from the layer beneath until desired amount 

315 # bytes of bytes are received. 

316 self.recv_buffer: list = [] 

317 self.clear() 

318 self.lock = Lock() 

319 

320 def clear(self) -> None: 

321 self.header: Optional[tuple] = None 

322 self.length: Optional[int] = None 

323 self.mask_value: Optional[Union[bytes, str]] = None 

324 

325 def needs_header(self) -> bool: 

326 return self.header is None 

327 

328 def recv_header(self) -> None: 

329 header = self.recv_strict(2) 

330 b1 = header[0] 

331 fin = b1 >> 7 & 1 

332 rsv1 = b1 >> 6 & 1 

333 rsv2 = b1 >> 5 & 1 

334 rsv3 = b1 >> 4 & 1 

335 opcode = b1 & 0xF 

336 b2 = header[1] 

337 has_mask = b2 >> 7 & 1 

338 length_bits = b2 & 0x7F 

339 

340 self.header = (fin, rsv1, rsv2, rsv3, opcode, has_mask, length_bits) 

341 

342 def has_mask(self) -> Union[bool, int]: 

343 if not self.header: 

344 return False 

345 header_val: int = self.header[frame_buffer._HEADER_MASK_INDEX] 

346 return header_val 

347 

348 def needs_length(self) -> bool: 

349 return self.length is None 

350 

351 def recv_length(self) -> None: 

352 if self.header is None: 

353 raise WebSocketProtocolException("Header not received") 

354 bits = self.header[frame_buffer._HEADER_LENGTH_INDEX] 

355 length_bits = bits & 0x7F 

356 if length_bits == 0x7E: 

357 v = self.recv_strict(2) 

358 self.length = struct.unpack("!H", v)[0] 

359 elif length_bits == 0x7F: 

360 v = self.recv_strict(8) 

361 self.length = struct.unpack("!Q", v)[0] 

362 else: 

363 self.length = length_bits 

364 

365 def needs_mask(self) -> bool: 

366 return self.mask_value is None 

367 

368 def recv_mask(self) -> None: 

369 self.mask_value = self.recv_strict(4) if self.has_mask() else "" 

370 

371 def recv_frame(self) -> ABNF: 

372 with self.lock: 

373 # Header 

374 if self.needs_header(): 

375 self.recv_header() 

376 if self.header is None: 

377 raise WebSocketProtocolException("Header not received") 

378 (fin, rsv1, rsv2, rsv3, opcode, has_mask, _) = self.header 

379 

380 # Frame length 

381 if self.needs_length(): 

382 self.recv_length() 

383 length = self.length 

384 

385 # Mask 

386 if self.needs_mask(): 

387 self.recv_mask() 

388 mask_value = self.mask_value 

389 

390 # Payload 

391 if length is None: 

392 raise WebSocketProtocolException("Length not received") 

393 payload = self.recv_strict(length) 

394 if has_mask: 

395 if mask_value is None: 

396 raise WebSocketProtocolException("Mask not received") 

397 payload = ABNF.mask(mask_value, payload) 

398 

399 # Reset for next frame 

400 self.clear() 

401 

402 frame = ABNF(fin, rsv1, rsv2, rsv3, opcode, has_mask, payload) 

403 frame.validate(self.skip_utf8_validation) 

404 

405 return frame 

406 

407 def recv_strict(self, bufsize: int) -> bytes: 

408 if not isinstance(bufsize, int): 

409 raise ValueError("bufsize must be an integer") 

410 shortage = bufsize - sum(len(buf) for buf in self.recv_buffer) 

411 while shortage > 0: 

412 # Limit buffer size that we pass to socket.recv() to avoid 

413 # fragmenting the heap -- the number of bytes recv() actually 

414 # reads is limited by socket buffer and is relatively small, 

415 # yet passing large numbers repeatedly causes lots of large 

416 # buffers allocated and then shrunk, which results in 

417 # fragmentation. 

418 bytes_ = self.recv(min(16384, shortage)) 

419 if isinstance(bytes_, bytes): 

420 self.recv_buffer.append(bytes_) 

421 shortage -= len(bytes_) 

422 else: 

423 # Handle case where recv returns int or other type 

424 break 

425 

426 unified = b"".join(self.recv_buffer) 

427 

428 if shortage == 0: 

429 self.recv_buffer = [] 

430 return unified 

431 else: 

432 self.recv_buffer = [unified[bufsize:]] 

433 return unified[:bufsize] 

434 

435 

436class continuous_frame: 

437 def __init__(self, fire_cont_frame: bool, skip_utf8_validation: bool) -> None: 

438 self.fire_cont_frame = fire_cont_frame 

439 self.skip_utf8_validation = skip_utf8_validation 

440 self.cont_data: Optional[list[Any]] = None 

441 self.recving_frames: Optional[int] = None 

442 

443 def validate(self, frame: ABNF) -> None: 

444 if not self.recving_frames and frame.opcode == ABNF.OPCODE_CONT: 

445 raise WebSocketProtocolException("Illegal frame") 

446 if self.recving_frames and frame.opcode in ( 

447 ABNF.OPCODE_TEXT, 

448 ABNF.OPCODE_BINARY, 

449 ): 

450 raise WebSocketProtocolException("Illegal frame") 

451 

452 def add(self, frame: ABNF) -> None: 

453 if self.cont_data: 

454 self.cont_data[1] += frame.data 

455 else: 

456 if frame.opcode in (ABNF.OPCODE_TEXT, ABNF.OPCODE_BINARY): 

457 self.recving_frames = frame.opcode 

458 self.cont_data = [frame.opcode, frame.data] 

459 

460 if frame.fin: 

461 self.recving_frames = None 

462 

463 def is_fire(self, frame: ABNF) -> Union[bool, int]: 

464 return frame.fin or self.fire_cont_frame 

465 

466 def extract(self, frame: ABNF) -> tuple: 

467 data = self.cont_data 

468 if data is None: 

469 raise WebSocketProtocolException("No continuation data available") 

470 self.cont_data = None 

471 frame.data = data[1] 

472 if ( 

473 not self.fire_cont_frame 

474 and data is not None 

475 and data[0] == ABNF.OPCODE_TEXT 

476 and not self.skip_utf8_validation 

477 and not validate_utf8(frame.data) 

478 ): 

479 raise WebSocketPayloadException(f"cannot decode: {repr(frame.data)}") 

480 if data is None: 

481 raise WebSocketProtocolException("No continuation data available") 

482 return data[0], frame