Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/websockets/frames.py: 56%

190 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 06:20 +0000

1from __future__ import annotations 

2 

3import dataclasses 

4import enum 

5import io 

6import secrets 

7import struct 

8from typing import Callable, Generator, Optional, Sequence, Tuple 

9 

10from . import exceptions, extensions 

11from .typing import Data 

12 

13 

14try: 

15 from .speedups import apply_mask 

16except ImportError: 

17 from .utils import apply_mask 

18 

19 

20__all__ = [ 

21 "Opcode", 

22 "OP_CONT", 

23 "OP_TEXT", 

24 "OP_BINARY", 

25 "OP_CLOSE", 

26 "OP_PING", 

27 "OP_PONG", 

28 "DATA_OPCODES", 

29 "CTRL_OPCODES", 

30 "Frame", 

31 "prepare_data", 

32 "prepare_ctrl", 

33 "Close", 

34] 

35 

36 

37class Opcode(enum.IntEnum): 

38 """Opcode values for WebSocket frames.""" 

39 

40 CONT, TEXT, BINARY = 0x00, 0x01, 0x02 

41 CLOSE, PING, PONG = 0x08, 0x09, 0x0A 

42 

43 

44OP_CONT = Opcode.CONT 

45OP_TEXT = Opcode.TEXT 

46OP_BINARY = Opcode.BINARY 

47OP_CLOSE = Opcode.CLOSE 

48OP_PING = Opcode.PING 

49OP_PONG = Opcode.PONG 

50 

51DATA_OPCODES = OP_CONT, OP_TEXT, OP_BINARY 

52CTRL_OPCODES = OP_CLOSE, OP_PING, OP_PONG 

53 

54 

55class CloseCode(enum.IntEnum): 

56 """Close code values for WebSocket close frames.""" 

57 

58 NORMAL_CLOSURE = 1000 

59 GOING_AWAY = 1001 

60 PROTOCOL_ERROR = 1002 

61 UNSUPPORTED_DATA = 1003 

62 # 1004 is reserved 

63 NO_STATUS_RCVD = 1005 

64 ABNORMAL_CLOSURE = 1006 

65 INVALID_DATA = 1007 

66 POLICY_VIOLATION = 1008 

67 MESSAGE_TOO_BIG = 1009 

68 MANDATORY_EXTENSION = 1010 

69 INTERNAL_ERROR = 1011 

70 SERVICE_RESTART = 1012 

71 TRY_AGAIN_LATER = 1013 

72 BAD_GATEWAY = 1014 

73 TLS_HANDSHAKE = 1015 

74 

75 

76# See https://www.iana.org/assignments/websocket/websocket.xhtml 

77CLOSE_CODE_EXPLANATIONS: dict[int, str] = { 

78 CloseCode.NORMAL_CLOSURE: "OK", 

79 CloseCode.GOING_AWAY: "going away", 

80 CloseCode.PROTOCOL_ERROR: "protocol error", 

81 CloseCode.UNSUPPORTED_DATA: "unsupported data", 

82 CloseCode.NO_STATUS_RCVD: "no status received [internal]", 

83 CloseCode.ABNORMAL_CLOSURE: "abnormal closure [internal]", 

84 CloseCode.INVALID_DATA: "invalid frame payload data", 

85 CloseCode.POLICY_VIOLATION: "policy violation", 

86 CloseCode.MESSAGE_TOO_BIG: "message too big", 

87 CloseCode.MANDATORY_EXTENSION: "mandatory extension", 

88 CloseCode.INTERNAL_ERROR: "internal error", 

89 CloseCode.SERVICE_RESTART: "service restart", 

90 CloseCode.TRY_AGAIN_LATER: "try again later", 

91 CloseCode.BAD_GATEWAY: "bad gateway", 

92 CloseCode.TLS_HANDSHAKE: "TLS handshake failure [internal]", 

93} 

94 

95 

96# Close code that are allowed in a close frame. 

97# Using a set optimizes `code in EXTERNAL_CLOSE_CODES`. 

98EXTERNAL_CLOSE_CODES = { 

99 CloseCode.NORMAL_CLOSURE, 

100 CloseCode.GOING_AWAY, 

101 CloseCode.PROTOCOL_ERROR, 

102 CloseCode.UNSUPPORTED_DATA, 

103 CloseCode.INVALID_DATA, 

104 CloseCode.POLICY_VIOLATION, 

105 CloseCode.MESSAGE_TOO_BIG, 

106 CloseCode.MANDATORY_EXTENSION, 

107 CloseCode.INTERNAL_ERROR, 

108 CloseCode.SERVICE_RESTART, 

109 CloseCode.TRY_AGAIN_LATER, 

110 CloseCode.BAD_GATEWAY, 

111} 

112 

113 

114OK_CLOSE_CODES = { 

115 CloseCode.NORMAL_CLOSURE, 

116 CloseCode.GOING_AWAY, 

117 CloseCode.NO_STATUS_RCVD, 

118} 

119 

120 

121BytesLike = bytes, bytearray, memoryview 

122 

123 

124@dataclasses.dataclass 

125class Frame: 

126 """ 

127 WebSocket frame. 

128 

129 Attributes: 

130 opcode: Opcode. 

131 data: Payload data. 

132 fin: FIN bit. 

133 rsv1: RSV1 bit. 

134 rsv2: RSV2 bit. 

135 rsv3: RSV3 bit. 

136 

137 Only these fields are needed. The MASK bit, payload length and masking-key 

138 are handled on the fly when parsing and serializing frames. 

139 

140 """ 

141 

142 opcode: Opcode 

143 data: bytes 

144 fin: bool = True 

145 rsv1: bool = False 

146 rsv2: bool = False 

147 rsv3: bool = False 

148 

149 def __str__(self) -> str: 

150 """ 

151 Return a human-readable representation of a frame. 

152 

153 """ 

154 coding = None 

155 length = f"{len(self.data)} byte{'' if len(self.data) == 1 else 's'}" 

156 non_final = "" if self.fin else "continued" 

157 

158 if self.opcode is OP_TEXT: 

159 # Decoding only the beginning and the end is needlessly hard. 

160 # Decode the entire payload then elide later if necessary. 

161 data = repr(self.data.decode()) 

162 elif self.opcode is OP_BINARY: 

163 # We'll show at most the first 16 bytes and the last 8 bytes. 

164 # Encode just what we need, plus two dummy bytes to elide later. 

165 binary = self.data 

166 if len(binary) > 25: 

167 binary = b"".join([binary[:16], b"\x00\x00", binary[-8:]]) 

168 data = " ".join(f"{byte:02x}" for byte in binary) 

169 elif self.opcode is OP_CLOSE: 

170 data = str(Close.parse(self.data)) 

171 elif self.data: 

172 # We don't know if a Continuation frame contains text or binary. 

173 # Ping and Pong frames could contain UTF-8. 

174 # Attempt to decode as UTF-8 and display it as text; fallback to 

175 # binary. If self.data is a memoryview, it has no decode() method, 

176 # which raises AttributeError. 

177 try: 

178 data = repr(self.data.decode()) 

179 coding = "text" 

180 except (UnicodeDecodeError, AttributeError): 

181 binary = self.data 

182 if len(binary) > 25: 

183 binary = b"".join([binary[:16], b"\x00\x00", binary[-8:]]) 

184 data = " ".join(f"{byte:02x}" for byte in binary) 

185 coding = "binary" 

186 else: 

187 data = "''" 

188 

189 if len(data) > 75: 

190 data = data[:48] + "..." + data[-24:] 

191 

192 metadata = ", ".join(filter(None, [coding, length, non_final])) 

193 

194 return f"{self.opcode.name} {data} [{metadata}]" 

195 

196 @classmethod 

197 def parse( 

198 cls, 

199 read_exact: Callable[[int], Generator[None, None, bytes]], 

200 *, 

201 mask: bool, 

202 max_size: Optional[int] = None, 

203 extensions: Optional[Sequence[extensions.Extension]] = None, 

204 ) -> Generator[None, None, Frame]: 

205 """ 

206 Parse a WebSocket frame. 

207 

208 This is a generator-based coroutine. 

209 

210 Args: 

211 read_exact: generator-based coroutine that reads the requested 

212 bytes or raises an exception if there isn't enough data. 

213 mask: whether the frame should be masked i.e. whether the read 

214 happens on the server side. 

215 max_size: maximum payload size in bytes. 

216 extensions: list of extensions, applied in reverse order. 

217 

218 Raises: 

219 EOFError: if the connection is closed without a full WebSocket frame. 

220 UnicodeDecodeError: if the frame contains invalid UTF-8. 

221 PayloadTooBig: if the frame's payload size exceeds ``max_size``. 

222 ProtocolError: if the frame contains incorrect values. 

223 

224 """ 

225 # Read the header. 

226 data = yield from read_exact(2) 

227 head1, head2 = struct.unpack("!BB", data) 

228 

229 # While not Pythonic, this is marginally faster than calling bool(). 

230 fin = True if head1 & 0b10000000 else False 

231 rsv1 = True if head1 & 0b01000000 else False 

232 rsv2 = True if head1 & 0b00100000 else False 

233 rsv3 = True if head1 & 0b00010000 else False 

234 

235 try: 

236 opcode = Opcode(head1 & 0b00001111) 

237 except ValueError as exc: 

238 raise exceptions.ProtocolError("invalid opcode") from exc 

239 

240 if (True if head2 & 0b10000000 else False) != mask: 

241 raise exceptions.ProtocolError("incorrect masking") 

242 

243 length = head2 & 0b01111111 

244 if length == 126: 

245 data = yield from read_exact(2) 

246 (length,) = struct.unpack("!H", data) 

247 elif length == 127: 

248 data = yield from read_exact(8) 

249 (length,) = struct.unpack("!Q", data) 

250 if max_size is not None and length > max_size: 

251 raise exceptions.PayloadTooBig( 

252 f"over size limit ({length} > {max_size} bytes)" 

253 ) 

254 if mask: 

255 mask_bytes = yield from read_exact(4) 

256 

257 # Read the data. 

258 data = yield from read_exact(length) 

259 if mask: 

260 data = apply_mask(data, mask_bytes) 

261 

262 frame = cls(opcode, data, fin, rsv1, rsv2, rsv3) 

263 

264 if extensions is None: 

265 extensions = [] 

266 for extension in reversed(extensions): 

267 frame = extension.decode(frame, max_size=max_size) 

268 

269 frame.check() 

270 

271 return frame 

272 

273 def serialize( 

274 self, 

275 *, 

276 mask: bool, 

277 extensions: Optional[Sequence[extensions.Extension]] = None, 

278 ) -> bytes: 

279 """ 

280 Serialize a WebSocket frame. 

281 

282 Args: 

283 mask: whether the frame should be masked i.e. whether the write 

284 happens on the client side. 

285 extensions: list of extensions, applied in order. 

286 

287 Raises: 

288 ProtocolError: if the frame contains incorrect values. 

289 

290 """ 

291 self.check() 

292 

293 if extensions is None: 

294 extensions = [] 

295 for extension in extensions: 

296 self = extension.encode(self) 

297 

298 output = io.BytesIO() 

299 

300 # Prepare the header. 

301 head1 = ( 

302 (0b10000000 if self.fin else 0) 

303 | (0b01000000 if self.rsv1 else 0) 

304 | (0b00100000 if self.rsv2 else 0) 

305 | (0b00010000 if self.rsv3 else 0) 

306 | self.opcode 

307 ) 

308 

309 head2 = 0b10000000 if mask else 0 

310 

311 length = len(self.data) 

312 if length < 126: 

313 output.write(struct.pack("!BB", head1, head2 | length)) 

314 elif length < 65536: 

315 output.write(struct.pack("!BBH", head1, head2 | 126, length)) 

316 else: 

317 output.write(struct.pack("!BBQ", head1, head2 | 127, length)) 

318 

319 if mask: 

320 mask_bytes = secrets.token_bytes(4) 

321 output.write(mask_bytes) 

322 

323 # Prepare the data. 

324 if mask: 

325 data = apply_mask(self.data, mask_bytes) 

326 else: 

327 data = self.data 

328 output.write(data) 

329 

330 return output.getvalue() 

331 

332 def check(self) -> None: 

333 """ 

334 Check that reserved bits and opcode have acceptable values. 

335 

336 Raises: 

337 ProtocolError: if a reserved bit or the opcode is invalid. 

338 

339 """ 

340 if self.rsv1 or self.rsv2 or self.rsv3: 

341 raise exceptions.ProtocolError("reserved bits must be 0") 

342 

343 if self.opcode in CTRL_OPCODES: 

344 if len(self.data) > 125: 

345 raise exceptions.ProtocolError("control frame too long") 

346 if not self.fin: 

347 raise exceptions.ProtocolError("fragmented control frame") 

348 

349 

350def prepare_data(data: Data) -> Tuple[int, bytes]: 

351 """ 

352 Convert a string or byte-like object to an opcode and a bytes-like object. 

353 

354 This function is designed for data frames. 

355 

356 If ``data`` is a :class:`str`, return ``OP_TEXT`` and a :class:`bytes` 

357 object encoding ``data`` in UTF-8. 

358 

359 If ``data`` is a bytes-like object, return ``OP_BINARY`` and a bytes-like 

360 object. 

361 

362 Raises: 

363 TypeError: if ``data`` doesn't have a supported type. 

364 

365 """ 

366 if isinstance(data, str): 

367 return OP_TEXT, data.encode("utf-8") 

368 elif isinstance(data, BytesLike): 

369 return OP_BINARY, data 

370 else: 

371 raise TypeError("data must be str or bytes-like") 

372 

373 

374def prepare_ctrl(data: Data) -> bytes: 

375 """ 

376 Convert a string or byte-like object to bytes. 

377 

378 This function is designed for ping and pong frames. 

379 

380 If ``data`` is a :class:`str`, return a :class:`bytes` object encoding 

381 ``data`` in UTF-8. 

382 

383 If ``data`` is a bytes-like object, return a :class:`bytes` object. 

384 

385 Raises: 

386 TypeError: if ``data`` doesn't have a supported type. 

387 

388 """ 

389 if isinstance(data, str): 

390 return data.encode("utf-8") 

391 elif isinstance(data, BytesLike): 

392 return bytes(data) 

393 else: 

394 raise TypeError("data must be str or bytes-like") 

395 

396 

397@dataclasses.dataclass 

398class Close: 

399 """ 

400 Code and reason for WebSocket close frames. 

401 

402 Attributes: 

403 code: Close code. 

404 reason: Close reason. 

405 

406 """ 

407 

408 code: int 

409 reason: str 

410 

411 def __str__(self) -> str: 

412 """ 

413 Return a human-readable representation of a close code and reason. 

414 

415 """ 

416 if 3000 <= self.code < 4000: 

417 explanation = "registered" 

418 elif 4000 <= self.code < 5000: 

419 explanation = "private use" 

420 else: 

421 explanation = CLOSE_CODE_EXPLANATIONS.get(self.code, "unknown") 

422 result = f"{self.code} ({explanation})" 

423 

424 if self.reason: 

425 result = f"{result} {self.reason}" 

426 

427 return result 

428 

429 @classmethod 

430 def parse(cls, data: bytes) -> Close: 

431 """ 

432 Parse the payload of a close frame. 

433 

434 Args: 

435 data: payload of the close frame. 

436 

437 Raises: 

438 ProtocolError: if data is ill-formed. 

439 UnicodeDecodeError: if the reason isn't valid UTF-8. 

440 

441 """ 

442 if len(data) >= 2: 

443 (code,) = struct.unpack("!H", data[:2]) 

444 reason = data[2:].decode("utf-8") 

445 close = cls(code, reason) 

446 close.check() 

447 return close 

448 elif len(data) == 0: 

449 return cls(CloseCode.NO_STATUS_RCVD, "") 

450 else: 

451 raise exceptions.ProtocolError("close frame too short") 

452 

453 def serialize(self) -> bytes: 

454 """ 

455 Serialize the payload of a close frame. 

456 

457 """ 

458 self.check() 

459 return struct.pack("!H", self.code) + self.reason.encode("utf-8") 

460 

461 def check(self) -> None: 

462 """ 

463 Check that the close code has a valid value for a close frame. 

464 

465 Raises: 

466 ProtocolError: if the close code is invalid. 

467 

468 """ 

469 if not (self.code in EXTERNAL_CLOSE_CODES or 3000 <= self.code < 5000): 

470 raise exceptions.ProtocolError("invalid status code")