Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/websockets/frames.py: 56%
190 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:20 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:20 +0000
1from __future__ import annotations
3import dataclasses
4import enum
5import io
6import secrets
7import struct
8from typing import Callable, Generator, Optional, Sequence, Tuple
10from . import exceptions, extensions
11from .typing import Data
14try:
15 from .speedups import apply_mask
16except ImportError:
17 from .utils import apply_mask
20__all__ = [
21 "Opcode",
22 "OP_CONT",
23 "OP_TEXT",
24 "OP_BINARY",
25 "OP_CLOSE",
26 "OP_PING",
27 "OP_PONG",
28 "DATA_OPCODES",
29 "CTRL_OPCODES",
30 "Frame",
31 "prepare_data",
32 "prepare_ctrl",
33 "Close",
34]
37class Opcode(enum.IntEnum):
38 """Opcode values for WebSocket frames."""
40 CONT, TEXT, BINARY = 0x00, 0x01, 0x02
41 CLOSE, PING, PONG = 0x08, 0x09, 0x0A
44OP_CONT = Opcode.CONT
45OP_TEXT = Opcode.TEXT
46OP_BINARY = Opcode.BINARY
47OP_CLOSE = Opcode.CLOSE
48OP_PING = Opcode.PING
49OP_PONG = Opcode.PONG
51DATA_OPCODES = OP_CONT, OP_TEXT, OP_BINARY
52CTRL_OPCODES = OP_CLOSE, OP_PING, OP_PONG
55class CloseCode(enum.IntEnum):
56 """Close code values for WebSocket close frames."""
58 NORMAL_CLOSURE = 1000
59 GOING_AWAY = 1001
60 PROTOCOL_ERROR = 1002
61 UNSUPPORTED_DATA = 1003
62 # 1004 is reserved
63 NO_STATUS_RCVD = 1005
64 ABNORMAL_CLOSURE = 1006
65 INVALID_DATA = 1007
66 POLICY_VIOLATION = 1008
67 MESSAGE_TOO_BIG = 1009
68 MANDATORY_EXTENSION = 1010
69 INTERNAL_ERROR = 1011
70 SERVICE_RESTART = 1012
71 TRY_AGAIN_LATER = 1013
72 BAD_GATEWAY = 1014
73 TLS_HANDSHAKE = 1015
76# See https://www.iana.org/assignments/websocket/websocket.xhtml
77CLOSE_CODE_EXPLANATIONS: dict[int, str] = {
78 CloseCode.NORMAL_CLOSURE: "OK",
79 CloseCode.GOING_AWAY: "going away",
80 CloseCode.PROTOCOL_ERROR: "protocol error",
81 CloseCode.UNSUPPORTED_DATA: "unsupported data",
82 CloseCode.NO_STATUS_RCVD: "no status received [internal]",
83 CloseCode.ABNORMAL_CLOSURE: "abnormal closure [internal]",
84 CloseCode.INVALID_DATA: "invalid frame payload data",
85 CloseCode.POLICY_VIOLATION: "policy violation",
86 CloseCode.MESSAGE_TOO_BIG: "message too big",
87 CloseCode.MANDATORY_EXTENSION: "mandatory extension",
88 CloseCode.INTERNAL_ERROR: "internal error",
89 CloseCode.SERVICE_RESTART: "service restart",
90 CloseCode.TRY_AGAIN_LATER: "try again later",
91 CloseCode.BAD_GATEWAY: "bad gateway",
92 CloseCode.TLS_HANDSHAKE: "TLS handshake failure [internal]",
93}
96# Close code that are allowed in a close frame.
97# Using a set optimizes `code in EXTERNAL_CLOSE_CODES`.
98EXTERNAL_CLOSE_CODES = {
99 CloseCode.NORMAL_CLOSURE,
100 CloseCode.GOING_AWAY,
101 CloseCode.PROTOCOL_ERROR,
102 CloseCode.UNSUPPORTED_DATA,
103 CloseCode.INVALID_DATA,
104 CloseCode.POLICY_VIOLATION,
105 CloseCode.MESSAGE_TOO_BIG,
106 CloseCode.MANDATORY_EXTENSION,
107 CloseCode.INTERNAL_ERROR,
108 CloseCode.SERVICE_RESTART,
109 CloseCode.TRY_AGAIN_LATER,
110 CloseCode.BAD_GATEWAY,
111}
114OK_CLOSE_CODES = {
115 CloseCode.NORMAL_CLOSURE,
116 CloseCode.GOING_AWAY,
117 CloseCode.NO_STATUS_RCVD,
118}
121BytesLike = bytes, bytearray, memoryview
124@dataclasses.dataclass
125class Frame:
126 """
127 WebSocket frame.
129 Attributes:
130 opcode: Opcode.
131 data: Payload data.
132 fin: FIN bit.
133 rsv1: RSV1 bit.
134 rsv2: RSV2 bit.
135 rsv3: RSV3 bit.
137 Only these fields are needed. The MASK bit, payload length and masking-key
138 are handled on the fly when parsing and serializing frames.
140 """
142 opcode: Opcode
143 data: bytes
144 fin: bool = True
145 rsv1: bool = False
146 rsv2: bool = False
147 rsv3: bool = False
149 def __str__(self) -> str:
150 """
151 Return a human-readable representation of a frame.
153 """
154 coding = None
155 length = f"{len(self.data)} byte{'' if len(self.data) == 1 else 's'}"
156 non_final = "" if self.fin else "continued"
158 if self.opcode is OP_TEXT:
159 # Decoding only the beginning and the end is needlessly hard.
160 # Decode the entire payload then elide later if necessary.
161 data = repr(self.data.decode())
162 elif self.opcode is OP_BINARY:
163 # We'll show at most the first 16 bytes and the last 8 bytes.
164 # Encode just what we need, plus two dummy bytes to elide later.
165 binary = self.data
166 if len(binary) > 25:
167 binary = b"".join([binary[:16], b"\x00\x00", binary[-8:]])
168 data = " ".join(f"{byte:02x}" for byte in binary)
169 elif self.opcode is OP_CLOSE:
170 data = str(Close.parse(self.data))
171 elif self.data:
172 # We don't know if a Continuation frame contains text or binary.
173 # Ping and Pong frames could contain UTF-8.
174 # Attempt to decode as UTF-8 and display it as text; fallback to
175 # binary. If self.data is a memoryview, it has no decode() method,
176 # which raises AttributeError.
177 try:
178 data = repr(self.data.decode())
179 coding = "text"
180 except (UnicodeDecodeError, AttributeError):
181 binary = self.data
182 if len(binary) > 25:
183 binary = b"".join([binary[:16], b"\x00\x00", binary[-8:]])
184 data = " ".join(f"{byte:02x}" for byte in binary)
185 coding = "binary"
186 else:
187 data = "''"
189 if len(data) > 75:
190 data = data[:48] + "..." + data[-24:]
192 metadata = ", ".join(filter(None, [coding, length, non_final]))
194 return f"{self.opcode.name} {data} [{metadata}]"
196 @classmethod
197 def parse(
198 cls,
199 read_exact: Callable[[int], Generator[None, None, bytes]],
200 *,
201 mask: bool,
202 max_size: Optional[int] = None,
203 extensions: Optional[Sequence[extensions.Extension]] = None,
204 ) -> Generator[None, None, Frame]:
205 """
206 Parse a WebSocket frame.
208 This is a generator-based coroutine.
210 Args:
211 read_exact: generator-based coroutine that reads the requested
212 bytes or raises an exception if there isn't enough data.
213 mask: whether the frame should be masked i.e. whether the read
214 happens on the server side.
215 max_size: maximum payload size in bytes.
216 extensions: list of extensions, applied in reverse order.
218 Raises:
219 EOFError: if the connection is closed without a full WebSocket frame.
220 UnicodeDecodeError: if the frame contains invalid UTF-8.
221 PayloadTooBig: if the frame's payload size exceeds ``max_size``.
222 ProtocolError: if the frame contains incorrect values.
224 """
225 # Read the header.
226 data = yield from read_exact(2)
227 head1, head2 = struct.unpack("!BB", data)
229 # While not Pythonic, this is marginally faster than calling bool().
230 fin = True if head1 & 0b10000000 else False
231 rsv1 = True if head1 & 0b01000000 else False
232 rsv2 = True if head1 & 0b00100000 else False
233 rsv3 = True if head1 & 0b00010000 else False
235 try:
236 opcode = Opcode(head1 & 0b00001111)
237 except ValueError as exc:
238 raise exceptions.ProtocolError("invalid opcode") from exc
240 if (True if head2 & 0b10000000 else False) != mask:
241 raise exceptions.ProtocolError("incorrect masking")
243 length = head2 & 0b01111111
244 if length == 126:
245 data = yield from read_exact(2)
246 (length,) = struct.unpack("!H", data)
247 elif length == 127:
248 data = yield from read_exact(8)
249 (length,) = struct.unpack("!Q", data)
250 if max_size is not None and length > max_size:
251 raise exceptions.PayloadTooBig(
252 f"over size limit ({length} > {max_size} bytes)"
253 )
254 if mask:
255 mask_bytes = yield from read_exact(4)
257 # Read the data.
258 data = yield from read_exact(length)
259 if mask:
260 data = apply_mask(data, mask_bytes)
262 frame = cls(opcode, data, fin, rsv1, rsv2, rsv3)
264 if extensions is None:
265 extensions = []
266 for extension in reversed(extensions):
267 frame = extension.decode(frame, max_size=max_size)
269 frame.check()
271 return frame
273 def serialize(
274 self,
275 *,
276 mask: bool,
277 extensions: Optional[Sequence[extensions.Extension]] = None,
278 ) -> bytes:
279 """
280 Serialize a WebSocket frame.
282 Args:
283 mask: whether the frame should be masked i.e. whether the write
284 happens on the client side.
285 extensions: list of extensions, applied in order.
287 Raises:
288 ProtocolError: if the frame contains incorrect values.
290 """
291 self.check()
293 if extensions is None:
294 extensions = []
295 for extension in extensions:
296 self = extension.encode(self)
298 output = io.BytesIO()
300 # Prepare the header.
301 head1 = (
302 (0b10000000 if self.fin else 0)
303 | (0b01000000 if self.rsv1 else 0)
304 | (0b00100000 if self.rsv2 else 0)
305 | (0b00010000 if self.rsv3 else 0)
306 | self.opcode
307 )
309 head2 = 0b10000000 if mask else 0
311 length = len(self.data)
312 if length < 126:
313 output.write(struct.pack("!BB", head1, head2 | length))
314 elif length < 65536:
315 output.write(struct.pack("!BBH", head1, head2 | 126, length))
316 else:
317 output.write(struct.pack("!BBQ", head1, head2 | 127, length))
319 if mask:
320 mask_bytes = secrets.token_bytes(4)
321 output.write(mask_bytes)
323 # Prepare the data.
324 if mask:
325 data = apply_mask(self.data, mask_bytes)
326 else:
327 data = self.data
328 output.write(data)
330 return output.getvalue()
332 def check(self) -> None:
333 """
334 Check that reserved bits and opcode have acceptable values.
336 Raises:
337 ProtocolError: if a reserved bit or the opcode is invalid.
339 """
340 if self.rsv1 or self.rsv2 or self.rsv3:
341 raise exceptions.ProtocolError("reserved bits must be 0")
343 if self.opcode in CTRL_OPCODES:
344 if len(self.data) > 125:
345 raise exceptions.ProtocolError("control frame too long")
346 if not self.fin:
347 raise exceptions.ProtocolError("fragmented control frame")
350def prepare_data(data: Data) -> Tuple[int, bytes]:
351 """
352 Convert a string or byte-like object to an opcode and a bytes-like object.
354 This function is designed for data frames.
356 If ``data`` is a :class:`str`, return ``OP_TEXT`` and a :class:`bytes`
357 object encoding ``data`` in UTF-8.
359 If ``data`` is a bytes-like object, return ``OP_BINARY`` and a bytes-like
360 object.
362 Raises:
363 TypeError: if ``data`` doesn't have a supported type.
365 """
366 if isinstance(data, str):
367 return OP_TEXT, data.encode("utf-8")
368 elif isinstance(data, BytesLike):
369 return OP_BINARY, data
370 else:
371 raise TypeError("data must be str or bytes-like")
374def prepare_ctrl(data: Data) -> bytes:
375 """
376 Convert a string or byte-like object to bytes.
378 This function is designed for ping and pong frames.
380 If ``data`` is a :class:`str`, return a :class:`bytes` object encoding
381 ``data`` in UTF-8.
383 If ``data`` is a bytes-like object, return a :class:`bytes` object.
385 Raises:
386 TypeError: if ``data`` doesn't have a supported type.
388 """
389 if isinstance(data, str):
390 return data.encode("utf-8")
391 elif isinstance(data, BytesLike):
392 return bytes(data)
393 else:
394 raise TypeError("data must be str or bytes-like")
397@dataclasses.dataclass
398class Close:
399 """
400 Code and reason for WebSocket close frames.
402 Attributes:
403 code: Close code.
404 reason: Close reason.
406 """
408 code: int
409 reason: str
411 def __str__(self) -> str:
412 """
413 Return a human-readable representation of a close code and reason.
415 """
416 if 3000 <= self.code < 4000:
417 explanation = "registered"
418 elif 4000 <= self.code < 5000:
419 explanation = "private use"
420 else:
421 explanation = CLOSE_CODE_EXPLANATIONS.get(self.code, "unknown")
422 result = f"{self.code} ({explanation})"
424 if self.reason:
425 result = f"{result} {self.reason}"
427 return result
429 @classmethod
430 def parse(cls, data: bytes) -> Close:
431 """
432 Parse the payload of a close frame.
434 Args:
435 data: payload of the close frame.
437 Raises:
438 ProtocolError: if data is ill-formed.
439 UnicodeDecodeError: if the reason isn't valid UTF-8.
441 """
442 if len(data) >= 2:
443 (code,) = struct.unpack("!H", data[:2])
444 reason = data[2:].decode("utf-8")
445 close = cls(code, reason)
446 close.check()
447 return close
448 elif len(data) == 0:
449 return cls(CloseCode.NO_STATUS_RCVD, "")
450 else:
451 raise exceptions.ProtocolError("close frame too short")
453 def serialize(self) -> bytes:
454 """
455 Serialize the payload of a close frame.
457 """
458 self.check()
459 return struct.pack("!H", self.code) + self.reason.encode("utf-8")
461 def check(self) -> None:
462 """
463 Check that the close code has a valid value for a close frame.
465 Raises:
466 ProtocolError: if the close code is invalid.
468 """
469 if not (self.code in EXTERNAL_CLOSE_CODES or 3000 <= self.code < 5000):
470 raise exceptions.ProtocolError("invalid status code")