Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/websockets/frames.py: 48%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

227 statements  

1from __future__ import annotations 

2 

3import dataclasses 

4import enum 

5import io 

6import os 

7import secrets 

8import struct 

9from collections.abc import Generator, Sequence 

10from typing import Callable 

11 

12from .exceptions import PayloadTooBig, ProtocolError 

13from .typing import BytesLike 

14 

15 

16try: 

17 from .speedups import apply_mask 

18except ImportError: 

19 from .utils import apply_mask 

20 

21 

22__all__ = [ 

23 "Opcode", 

24 "OP_CONT", 

25 "OP_TEXT", 

26 "OP_BINARY", 

27 "OP_CLOSE", 

28 "OP_PING", 

29 "OP_PONG", 

30 "DATA_OPCODES", 

31 "CTRL_OPCODES", 

32 "CloseCode", 

33 "Frame", 

34 "Close", 

35] 

36 

37 

38class Opcode(enum.IntEnum): 

39 """Opcode values for WebSocket frames.""" 

40 

41 CONT, TEXT, BINARY = 0x00, 0x01, 0x02 

42 CLOSE, PING, PONG = 0x08, 0x09, 0x0A 

43 

44 

45OP_CONT = Opcode.CONT 

46OP_TEXT = Opcode.TEXT 

47OP_BINARY = Opcode.BINARY 

48OP_CLOSE = Opcode.CLOSE 

49OP_PING = Opcode.PING 

50OP_PONG = Opcode.PONG 

51 

52DATA_OPCODES = OP_CONT, OP_TEXT, OP_BINARY 

53CTRL_OPCODES = OP_CLOSE, OP_PING, OP_PONG 

54 

55 

56class CloseCode(enum.IntEnum): 

57 """Close code values for WebSocket close frames.""" 

58 

59 NORMAL_CLOSURE = 1000 

60 GOING_AWAY = 1001 

61 PROTOCOL_ERROR = 1002 

62 UNSUPPORTED_DATA = 1003 

63 # 1004 is reserved 

64 NO_STATUS_RCVD = 1005 

65 ABNORMAL_CLOSURE = 1006 

66 INVALID_DATA = 1007 

67 POLICY_VIOLATION = 1008 

68 MESSAGE_TOO_BIG = 1009 

69 MANDATORY_EXTENSION = 1010 

70 INTERNAL_ERROR = 1011 

71 SERVICE_RESTART = 1012 

72 TRY_AGAIN_LATER = 1013 

73 BAD_GATEWAY = 1014 

74 TLS_HANDSHAKE = 1015 

75 

76 

77# See https://www.iana.org/assignments/websocket/websocket.xhtml 

78CLOSE_CODE_EXPLANATIONS: dict[int, str] = { 

79 CloseCode.NORMAL_CLOSURE: "OK", 

80 CloseCode.GOING_AWAY: "going away", 

81 CloseCode.PROTOCOL_ERROR: "protocol error", 

82 CloseCode.UNSUPPORTED_DATA: "unsupported data", 

83 CloseCode.NO_STATUS_RCVD: "no status received [internal]", 

84 CloseCode.ABNORMAL_CLOSURE: "abnormal closure [internal]", 

85 CloseCode.INVALID_DATA: "invalid frame payload data", 

86 CloseCode.POLICY_VIOLATION: "policy violation", 

87 CloseCode.MESSAGE_TOO_BIG: "message too big", 

88 CloseCode.MANDATORY_EXTENSION: "mandatory extension", 

89 CloseCode.INTERNAL_ERROR: "internal error", 

90 CloseCode.SERVICE_RESTART: "service restart", 

91 CloseCode.TRY_AGAIN_LATER: "try again later", 

92 CloseCode.BAD_GATEWAY: "bad gateway", 

93 CloseCode.TLS_HANDSHAKE: "TLS handshake failure [internal]", 

94} 

95 

96 

97# Close code that are allowed in a close frame. 

98# Using a set optimizes `code in EXTERNAL_CLOSE_CODES`. 

99EXTERNAL_CLOSE_CODES = { 

100 CloseCode.NORMAL_CLOSURE, 

101 CloseCode.GOING_AWAY, 

102 CloseCode.PROTOCOL_ERROR, 

103 CloseCode.UNSUPPORTED_DATA, 

104 CloseCode.INVALID_DATA, 

105 CloseCode.POLICY_VIOLATION, 

106 CloseCode.MESSAGE_TOO_BIG, 

107 CloseCode.MANDATORY_EXTENSION, 

108 CloseCode.INTERNAL_ERROR, 

109 CloseCode.SERVICE_RESTART, 

110 CloseCode.TRY_AGAIN_LATER, 

111 CloseCode.BAD_GATEWAY, 

112} 

113 

114 

115OK_CLOSE_CODES = { 

116 CloseCode.NORMAL_CLOSURE, 

117 CloseCode.GOING_AWAY, 

118 CloseCode.NO_STATUS_RCVD, 

119} 

120 

121 

122@dataclasses.dataclass 

123class Frame: 

124 """ 

125 WebSocket frame. 

126 

127 Attributes: 

128 opcode: Opcode. 

129 data: Payload data. 

130 fin: FIN bit. 

131 rsv1: RSV1 bit. 

132 rsv2: RSV2 bit. 

133 rsv3: RSV3 bit. 

134 

135 Only these fields are needed. The MASK bit, payload length and masking-key 

136 are handled on the fly when parsing and serializing frames. 

137 

138 """ 

139 

140 opcode: Opcode 

141 data: BytesLike 

142 fin: bool = True 

143 rsv1: bool = False 

144 rsv2: bool = False 

145 rsv3: bool = False 

146 

147 # Configure if you want to see more in logs. Should be a multiple of 3. 

148 MAX_LOG_SIZE = int(os.environ.get("WEBSOCKETS_MAX_LOG_SIZE", "75")) 

149 

150 DEFAULT_IS_TEXT = {OP_TEXT: True, OP_BINARY: False, OP_CLOSE: True} 

151 

152 def __str__(self) -> str: 

153 """ 

154 Return a human-readable representation of a frame. 

155 

156 This function is intended for logging and debugging. It doesn't aim to 

157 support round-tripping because payloads can be too long for displaying 

158 conveniently. Instead, it shows the beginning and the end. It's robust 

159 to incorrect data. 

160 

161 It attempts to decode UTF-8 payloads whenever possible, even for binary 

162 frames and control frames, because those frequently contain UTF-8 data. 

163 It applies the same logic to continuation frames, because we don't know 

164 if they continue a text frame or a binary frame. 

165 

166 """ 

167 expect_text = self.DEFAULT_IS_TEXT.get(self.opcode) 

168 data_repr, is_text = self._data_repr() 

169 

170 data_type = "" if expect_text == is_text else ("text" if is_text else "binary") 

171 length = f"{len(self.data)} byte{'' if len(self.data) == 1 else 's'}" 

172 non_final = "" if self.fin else "continued" 

173 metadata = ", ".join(filter(None, [data_type, length, non_final])) 

174 

175 return f"{self.opcode.name} {data_repr} [{metadata}]" 

176 

177 def _data_repr(self) -> tuple[str, bool | None]: 

178 """ 

179 Return a human-readable representation of the payload. 

180 

181 Also returns whether the payload is text. 

182 

183 The representation is elided to fit ``MAX_LOG_SIZE``. 

184 

185 This is a helper for the __str__ method. 

186 

187 """ 

188 if not self.data: 

189 return "''", self.DEFAULT_IS_TEXT.get(self.opcode) 

190 

191 # Special case for close frames: parse close code and reason. 

192 # Fall back to the standard case if the payload is malformed. 

193 

194 if self.opcode is OP_CLOSE: 

195 try: 

196 return str(Close.parse(self.data)), True 

197 except (ProtocolError, UnicodeDecodeError): 

198 pass 

199 

200 # Guess whether the payload is UTF-8 or binary, regardless of opcode, to 

201 # display UTF-8 text in binary frames nicely and generally to be helpful 

202 # and robust. Also support frames fragmented within UTF-8 sequences. 

203 

204 if len(self.data) > 4 * self.MAX_LOG_SIZE: 

205 # Process only the start and the end, as the middle will be elided. 

206 # Cast to bytes because self.data could be a memoryview. 

207 data_start = bytes(self.data[: 8 * self.MAX_LOG_SIZE // 3]) 

208 data_end = bytes(self.data[-4 * self.MAX_LOG_SIZE // 3 :]) 

209 is_text = is_utf8_fragment( 

210 data_start, 

211 must_start_clean=self.opcode != OP_CONT, 

212 ) and is_utf8_fragment( 

213 data_end, 

214 must_end_clean=self.fin, 

215 ) 

216 if is_text: 

217 data_repr = repr((data_start + data_end).decode(errors="replace")) 

218 

219 else: 

220 # Cast to bytes because self.data could be a memoryview. 

221 data = bytes(self.data) 

222 is_text = is_utf8_fragment( 

223 data, 

224 must_start_clean=self.opcode != OP_CONT, 

225 must_end_clean=self.fin, 

226 ) 

227 if is_text: 

228 data_repr = repr(data.decode(errors="replace")) 

229 

230 # When the payload is text (except perhaps for boundaries), we decoded 

231 # enough in ``data_repr``. Now, do the same when the payload is binary. 

232 

233 if not is_text: 

234 binary = self.data 

235 if len(binary) > self.MAX_LOG_SIZE // 3: 

236 cut = (self.MAX_LOG_SIZE // 3 - 1) // 3 # by default cut = 8 

237 # Encode two dummy bytes to force eliding and adding an ellipsis. 

238 binary = b"".join([binary[: 2 * cut], b"\x00\x00", binary[-cut:]]) 

239 data_repr = " ".join(f"{byte:02x}" for byte in binary) 

240 

241 # Elide the middle of the representation to fit the maximum log size. 

242 

243 if len(data_repr) > self.MAX_LOG_SIZE: 

244 cut = self.MAX_LOG_SIZE // 3 - 1 # by default cut = 24 

245 data_repr = data_repr[: 2 * cut] + "..." + data_repr[-cut:] 

246 

247 return data_repr, is_text 

248 

249 @classmethod 

250 def parse( 

251 cls, 

252 read_exact: Callable[[int], Generator[None, None, bytes | bytearray]], 

253 *, 

254 mask: bool, 

255 max_size: int | None = None, 

256 extensions: Sequence[extensions.Extension] | None = None, 

257 ) -> Generator[None, None, Frame]: 

258 """ 

259 Parse a WebSocket frame. 

260 

261 This is a generator-based coroutine. 

262 

263 Args: 

264 read_exact: Generator-based coroutine that reads the requested 

265 bytes or raises an exception if there isn't enough data. 

266 mask: Whether the frame should be masked i.e. whether the read 

267 happens on the server side. 

268 max_size: Maximum payload size in bytes. 

269 extensions: List of extensions, applied in reverse order. 

270 

271 Raises: 

272 EOFError: If the connection is closed without a full WebSocket frame. 

273 PayloadTooBig: If the frame's payload size exceeds ``max_size``. 

274 ProtocolError: If the frame contains incorrect values. 

275 

276 """ 

277 # Read the header. 

278 data = yield from read_exact(2) 

279 head1, head2 = struct.unpack("!BB", data) 

280 

281 # While not Pythonic, this is marginally faster than calling bool(). 

282 fin = True if head1 & 0b10000000 else False 

283 rsv1 = True if head1 & 0b01000000 else False 

284 rsv2 = True if head1 & 0b00100000 else False 

285 rsv3 = True if head1 & 0b00010000 else False 

286 

287 try: 

288 opcode = Opcode(head1 & 0b00001111) 

289 except ValueError as exc: 

290 raise ProtocolError("invalid opcode") from exc 

291 

292 if (True if head2 & 0b10000000 else False) != mask: 

293 raise ProtocolError("incorrect masking") 

294 

295 length = head2 & 0b01111111 

296 if length == 126: 

297 data = yield from read_exact(2) 

298 (length,) = struct.unpack("!H", data) 

299 elif length == 127: 

300 data = yield from read_exact(8) 

301 (length,) = struct.unpack("!Q", data) 

302 if max_size is not None and length > max_size: 

303 raise PayloadTooBig(length, max_size) 

304 if mask: 

305 mask_bytes = yield from read_exact(4) 

306 

307 # Read the data. 

308 data = yield from read_exact(length) 

309 if mask: 

310 data = apply_mask(data, mask_bytes) 

311 

312 frame = cls(opcode, data, fin, rsv1, rsv2, rsv3) 

313 

314 if extensions is None: 

315 extensions = [] 

316 for extension in reversed(extensions): 

317 frame = extension.decode(frame, max_size=max_size) 

318 

319 frame.check() 

320 

321 return frame 

322 

323 def serialize( 

324 self, 

325 *, 

326 mask: bool, 

327 extensions: Sequence[extensions.Extension] | None = None, 

328 ) -> bytes: 

329 """ 

330 Serialize a WebSocket frame. 

331 

332 Args: 

333 mask: Whether the frame should be masked i.e. whether the write 

334 happens on the client side. 

335 extensions: List of extensions, applied in order. 

336 

337 Raises: 

338 ProtocolError: If the frame contains incorrect values. 

339 

340 """ 

341 self.check() 

342 

343 if extensions is None: 

344 extensions = [] 

345 for extension in extensions: 

346 self = extension.encode(self) 

347 

348 output = io.BytesIO() 

349 

350 # Prepare the header. 

351 head1 = ( 

352 (0b10000000 if self.fin else 0) 

353 | (0b01000000 if self.rsv1 else 0) 

354 | (0b00100000 if self.rsv2 else 0) 

355 | (0b00010000 if self.rsv3 else 0) 

356 | self.opcode 

357 ) 

358 

359 head2 = 0b10000000 if mask else 0 

360 

361 length = len(self.data) 

362 if length < 126: 

363 output.write(struct.pack("!BB", head1, head2 | length)) 

364 elif length < 65536: 

365 output.write(struct.pack("!BBH", head1, head2 | 126, length)) 

366 else: 

367 output.write(struct.pack("!BBQ", head1, head2 | 127, length)) 

368 

369 if mask: 

370 mask_bytes = secrets.token_bytes(4) 

371 output.write(mask_bytes) 

372 

373 # Prepare the data. 

374 data: BytesLike 

375 if mask: 

376 data = apply_mask(self.data, mask_bytes) 

377 else: 

378 data = self.data 

379 output.write(data) 

380 

381 return output.getvalue() 

382 

383 def check(self) -> None: 

384 """ 

385 Check that reserved bits and opcode have acceptable values. 

386 

387 Raises: 

388 ProtocolError: If a reserved bit or the opcode is invalid. 

389 

390 """ 

391 if self.rsv1 or self.rsv2 or self.rsv3: 

392 raise ProtocolError("reserved bits must be 0") 

393 

394 if self.opcode in CTRL_OPCODES: 

395 if len(self.data) > 125: 

396 raise ProtocolError("control frame too long") 

397 if not self.fin: 

398 raise ProtocolError("fragmented control frame") 

399 

400 

401@dataclasses.dataclass 

402class Close: 

403 """ 

404 Code and reason for WebSocket close frames. 

405 

406 Attributes: 

407 code: Close code. 

408 reason: Close reason. 

409 

410 """ 

411 

412 code: CloseCode | int 

413 reason: str 

414 

415 def __str__(self) -> str: 

416 """ 

417 Return a human-readable representation of a close code and reason. 

418 

419 """ 

420 if 3000 <= self.code < 4000: 

421 explanation = "registered" 

422 elif 4000 <= self.code < 5000: 

423 explanation = "private use" 

424 else: 

425 explanation = CLOSE_CODE_EXPLANATIONS.get(self.code, "unknown") 

426 result = f"{self.code} ({explanation})" 

427 

428 if self.reason: 

429 result = f"{result} {self.reason}" 

430 

431 return result 

432 

433 @classmethod 

434 def parse(cls, data: BytesLike) -> Close: 

435 """ 

436 Parse the payload of a close frame. 

437 

438 Args: 

439 data: Payload of the close frame. 

440 

441 Raises: 

442 ProtocolError: If data is ill-formed. 

443 UnicodeDecodeError: If the reason isn't valid UTF-8. 

444 

445 """ 

446 if isinstance(data, memoryview): 

447 raise AssertionError("only compressed outgoing frames use memoryview") 

448 if len(data) >= 2: 

449 (code,) = struct.unpack("!H", data[:2]) 

450 reason = data[2:].decode() 

451 close = cls(code, reason) 

452 close.check() 

453 return close 

454 elif len(data) == 0: 

455 return cls(CloseCode.NO_STATUS_RCVD, "") 

456 else: 

457 raise ProtocolError("close frame too short") 

458 

459 def serialize(self) -> bytes: 

460 """ 

461 Serialize the payload of a close frame. 

462 

463 """ 

464 self.check() 

465 return struct.pack("!H", self.code) + self.reason.encode() 

466 

467 def check(self) -> None: 

468 """ 

469 Check that the close code has a valid value for a close frame. 

470 

471 Raises: 

472 ProtocolError: If the close code is invalid. 

473 

474 """ 

475 if not (self.code in EXTERNAL_CLOSE_CODES or 3000 <= self.code < 5000): 

476 raise ProtocolError("invalid status code") 

477 

478 

479def is_utf8_fragment( 

480 data: bytes, 

481 must_start_clean: bool = False, 

482 must_end_clean: bool = False, 

483) -> bool: 

484 """Guess if data is a fragment of UTF-8 text.""" 

485 # Possible byte sequences for UTF-8 characters are: 

486 # 0xxxxxxx 

487 # 110xxxxx 10xxxxxx 

488 # 1110xxxx 10xxxxxx 10xxxxxx 

489 # 11110xxx 10xxxxxx 10xxxxxx 10xxxxxx 

490 

491 # The algorithm determines ``start`` and ``end`` so that ``data[start:end]`` 

492 # must be a valid UTF-8 sequence for data to be a valid UTF-8 fragment. 

493 

494 start, end = 0, len(data) 

495 

496 if not must_start_clean: 

497 # Remove continuation bytes from the beginning. 

498 max_start = min(3, len(data)) 

499 while start < max_start: 

500 byte = data[start] 

501 

502 # Continuation byte 

503 if byte & 0b11000000 == 0b10000000: 

504 start += 1 

505 continue 

506 

507 break 

508 

509 if not must_end_clean: 

510 # Remove a partial multibyte sequence from the end. 

511 end -= 1 # index of the last byte 

512 min_end = max(len(data) - 4, start) 

513 while end >= min_end: 

514 byte = data[end] 

515 # Continuation byte 

516 if byte & 0b11000000 == 0b10000000: 

517 end -= 1 

518 continue 

519 

520 # ASCII byte 

521 if byte & 0b10000000 == 0b00000000: 

522 seq_len = 1 

523 # Leading byte of a 2-byte sequence 

524 elif byte & 0b11100000 == 0b11000000: 

525 seq_len = 2 

526 # Leading byte of a 3-byte sequence 

527 elif byte & 0b11110000 == 0b11100000: 

528 seq_len = 3 

529 # Leading byte of a 4-byte sequence 

530 elif byte & 0b11111000 == 0b11110000: 

531 seq_len = 4 

532 # Invalid byte 

533 else: 

534 seq_len = 0 

535 

536 # Cut only when there's an incomplete sequence at the end. 

537 if seq_len <= len(data) - end: 

538 end = len(data) 

539 

540 break 

541 

542 try: 

543 text = data[start:end].decode() 

544 except UnicodeDecodeError: 

545 return False 

546 else: 

547 # Non-printable characters signal binary data. 

548 return "\\x" not in repr(text) 

549 

550 

551# At the bottom to break import cycles created by type annotations. 

552from . import extensions # noqa: E402