Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/websockets/frames.py: 48%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import dataclasses
4import enum
5import io
6import os
7import secrets
8import struct
9from collections.abc import Generator, Sequence
10from typing import Callable
12from .exceptions import PayloadTooBig, ProtocolError
13from .typing import BytesLike
16try:
17 from .speedups import apply_mask
18except ImportError:
19 from .utils import apply_mask
22__all__ = [
23 "Opcode",
24 "OP_CONT",
25 "OP_TEXT",
26 "OP_BINARY",
27 "OP_CLOSE",
28 "OP_PING",
29 "OP_PONG",
30 "DATA_OPCODES",
31 "CTRL_OPCODES",
32 "CloseCode",
33 "Frame",
34 "Close",
35]
38class Opcode(enum.IntEnum):
39 """Opcode values for WebSocket frames."""
41 CONT, TEXT, BINARY = 0x00, 0x01, 0x02
42 CLOSE, PING, PONG = 0x08, 0x09, 0x0A
45OP_CONT = Opcode.CONT
46OP_TEXT = Opcode.TEXT
47OP_BINARY = Opcode.BINARY
48OP_CLOSE = Opcode.CLOSE
49OP_PING = Opcode.PING
50OP_PONG = Opcode.PONG
52DATA_OPCODES = OP_CONT, OP_TEXT, OP_BINARY
53CTRL_OPCODES = OP_CLOSE, OP_PING, OP_PONG
56class CloseCode(enum.IntEnum):
57 """Close code values for WebSocket close frames."""
59 NORMAL_CLOSURE = 1000
60 GOING_AWAY = 1001
61 PROTOCOL_ERROR = 1002
62 UNSUPPORTED_DATA = 1003
63 # 1004 is reserved
64 NO_STATUS_RCVD = 1005
65 ABNORMAL_CLOSURE = 1006
66 INVALID_DATA = 1007
67 POLICY_VIOLATION = 1008
68 MESSAGE_TOO_BIG = 1009
69 MANDATORY_EXTENSION = 1010
70 INTERNAL_ERROR = 1011
71 SERVICE_RESTART = 1012
72 TRY_AGAIN_LATER = 1013
73 BAD_GATEWAY = 1014
74 TLS_HANDSHAKE = 1015
77# See https://www.iana.org/assignments/websocket/websocket.xhtml
78CLOSE_CODE_EXPLANATIONS: dict[int, str] = {
79 CloseCode.NORMAL_CLOSURE: "OK",
80 CloseCode.GOING_AWAY: "going away",
81 CloseCode.PROTOCOL_ERROR: "protocol error",
82 CloseCode.UNSUPPORTED_DATA: "unsupported data",
83 CloseCode.NO_STATUS_RCVD: "no status received [internal]",
84 CloseCode.ABNORMAL_CLOSURE: "abnormal closure [internal]",
85 CloseCode.INVALID_DATA: "invalid frame payload data",
86 CloseCode.POLICY_VIOLATION: "policy violation",
87 CloseCode.MESSAGE_TOO_BIG: "message too big",
88 CloseCode.MANDATORY_EXTENSION: "mandatory extension",
89 CloseCode.INTERNAL_ERROR: "internal error",
90 CloseCode.SERVICE_RESTART: "service restart",
91 CloseCode.TRY_AGAIN_LATER: "try again later",
92 CloseCode.BAD_GATEWAY: "bad gateway",
93 CloseCode.TLS_HANDSHAKE: "TLS handshake failure [internal]",
94}
97# Close code that are allowed in a close frame.
98# Using a set optimizes `code in EXTERNAL_CLOSE_CODES`.
99EXTERNAL_CLOSE_CODES = {
100 CloseCode.NORMAL_CLOSURE,
101 CloseCode.GOING_AWAY,
102 CloseCode.PROTOCOL_ERROR,
103 CloseCode.UNSUPPORTED_DATA,
104 CloseCode.INVALID_DATA,
105 CloseCode.POLICY_VIOLATION,
106 CloseCode.MESSAGE_TOO_BIG,
107 CloseCode.MANDATORY_EXTENSION,
108 CloseCode.INTERNAL_ERROR,
109 CloseCode.SERVICE_RESTART,
110 CloseCode.TRY_AGAIN_LATER,
111 CloseCode.BAD_GATEWAY,
112}
115OK_CLOSE_CODES = {
116 CloseCode.NORMAL_CLOSURE,
117 CloseCode.GOING_AWAY,
118 CloseCode.NO_STATUS_RCVD,
119}
122@dataclasses.dataclass
123class Frame:
124 """
125 WebSocket frame.
127 Attributes:
128 opcode: Opcode.
129 data: Payload data.
130 fin: FIN bit.
131 rsv1: RSV1 bit.
132 rsv2: RSV2 bit.
133 rsv3: RSV3 bit.
135 Only these fields are needed. The MASK bit, payload length and masking-key
136 are handled on the fly when parsing and serializing frames.
138 """
140 opcode: Opcode
141 data: BytesLike
142 fin: bool = True
143 rsv1: bool = False
144 rsv2: bool = False
145 rsv3: bool = False
147 # Configure if you want to see more in logs. Should be a multiple of 3.
148 MAX_LOG_SIZE = int(os.environ.get("WEBSOCKETS_MAX_LOG_SIZE", "75"))
150 DEFAULT_IS_TEXT = {OP_TEXT: True, OP_BINARY: False, OP_CLOSE: True}
152 def __str__(self) -> str:
153 """
154 Return a human-readable representation of a frame.
156 This function is intended for logging and debugging. It doesn't aim to
157 support round-tripping because payloads can be too long for displaying
158 conveniently. Instead, it shows the beginning and the end. It's robust
159 to incorrect data.
161 It attempts to decode UTF-8 payloads whenever possible, even for binary
162 frames and control frames, because those frequently contain UTF-8 data.
163 It applies the same logic to continuation frames, because we don't know
164 if they continue a text frame or a binary frame.
166 """
167 expect_text = self.DEFAULT_IS_TEXT.get(self.opcode)
168 data_repr, is_text = self._data_repr()
170 data_type = "" if expect_text == is_text else ("text" if is_text else "binary")
171 length = f"{len(self.data)} byte{'' if len(self.data) == 1 else 's'}"
172 non_final = "" if self.fin else "continued"
173 metadata = ", ".join(filter(None, [data_type, length, non_final]))
175 return f"{self.opcode.name} {data_repr} [{metadata}]"
177 def _data_repr(self) -> tuple[str, bool | None]:
178 """
179 Return a human-readable representation of the payload.
181 Also returns whether the payload is text.
183 The representation is elided to fit ``MAX_LOG_SIZE``.
185 This is a helper for the __str__ method.
187 """
188 if not self.data:
189 return "''", self.DEFAULT_IS_TEXT.get(self.opcode)
191 # Special case for close frames: parse close code and reason.
192 # Fall back to the standard case if the payload is malformed.
194 if self.opcode is OP_CLOSE:
195 try:
196 return str(Close.parse(self.data)), True
197 except (ProtocolError, UnicodeDecodeError):
198 pass
200 # Guess whether the payload is UTF-8 or binary, regardless of opcode, to
201 # display UTF-8 text in binary frames nicely and generally to be helpful
202 # and robust. Also support frames fragmented within UTF-8 sequences.
204 if len(self.data) > 4 * self.MAX_LOG_SIZE:
205 # Process only the start and the end, as the middle will be elided.
206 # Cast to bytes because self.data could be a memoryview.
207 data_start = bytes(self.data[: 8 * self.MAX_LOG_SIZE // 3])
208 data_end = bytes(self.data[-4 * self.MAX_LOG_SIZE // 3 :])
209 is_text = is_utf8_fragment(
210 data_start,
211 must_start_clean=self.opcode != OP_CONT,
212 ) and is_utf8_fragment(
213 data_end,
214 must_end_clean=self.fin,
215 )
216 if is_text:
217 data_repr = repr((data_start + data_end).decode(errors="replace"))
219 else:
220 # Cast to bytes because self.data could be a memoryview.
221 data = bytes(self.data)
222 is_text = is_utf8_fragment(
223 data,
224 must_start_clean=self.opcode != OP_CONT,
225 must_end_clean=self.fin,
226 )
227 if is_text:
228 data_repr = repr(data.decode(errors="replace"))
230 # When the payload is text (except perhaps for boundaries), we decoded
231 # enough in ``data_repr``. Now, do the same when the payload is binary.
233 if not is_text:
234 binary = self.data
235 if len(binary) > self.MAX_LOG_SIZE // 3:
236 cut = (self.MAX_LOG_SIZE // 3 - 1) // 3 # by default cut = 8
237 # Encode two dummy bytes to force eliding and adding an ellipsis.
238 binary = b"".join([binary[: 2 * cut], b"\x00\x00", binary[-cut:]])
239 data_repr = " ".join(f"{byte:02x}" for byte in binary)
241 # Elide the middle of the representation to fit the maximum log size.
243 if len(data_repr) > self.MAX_LOG_SIZE:
244 cut = self.MAX_LOG_SIZE // 3 - 1 # by default cut = 24
245 data_repr = data_repr[: 2 * cut] + "..." + data_repr[-cut:]
247 return data_repr, is_text
249 @classmethod
250 def parse(
251 cls,
252 read_exact: Callable[[int], Generator[None, None, bytes | bytearray]],
253 *,
254 mask: bool,
255 max_size: int | None = None,
256 extensions: Sequence[extensions.Extension] | None = None,
257 ) -> Generator[None, None, Frame]:
258 """
259 Parse a WebSocket frame.
261 This is a generator-based coroutine.
263 Args:
264 read_exact: Generator-based coroutine that reads the requested
265 bytes or raises an exception if there isn't enough data.
266 mask: Whether the frame should be masked i.e. whether the read
267 happens on the server side.
268 max_size: Maximum payload size in bytes.
269 extensions: List of extensions, applied in reverse order.
271 Raises:
272 EOFError: If the connection is closed without a full WebSocket frame.
273 PayloadTooBig: If the frame's payload size exceeds ``max_size``.
274 ProtocolError: If the frame contains incorrect values.
276 """
277 # Read the header.
278 data = yield from read_exact(2)
279 head1, head2 = struct.unpack("!BB", data)
281 # While not Pythonic, this is marginally faster than calling bool().
282 fin = True if head1 & 0b10000000 else False
283 rsv1 = True if head1 & 0b01000000 else False
284 rsv2 = True if head1 & 0b00100000 else False
285 rsv3 = True if head1 & 0b00010000 else False
287 try:
288 opcode = Opcode(head1 & 0b00001111)
289 except ValueError as exc:
290 raise ProtocolError("invalid opcode") from exc
292 if (True if head2 & 0b10000000 else False) != mask:
293 raise ProtocolError("incorrect masking")
295 length = head2 & 0b01111111
296 if length == 126:
297 data = yield from read_exact(2)
298 (length,) = struct.unpack("!H", data)
299 elif length == 127:
300 data = yield from read_exact(8)
301 (length,) = struct.unpack("!Q", data)
302 if max_size is not None and length > max_size:
303 raise PayloadTooBig(length, max_size)
304 if mask:
305 mask_bytes = yield from read_exact(4)
307 # Read the data.
308 data = yield from read_exact(length)
309 if mask:
310 data = apply_mask(data, mask_bytes)
312 frame = cls(opcode, data, fin, rsv1, rsv2, rsv3)
314 if extensions is None:
315 extensions = []
316 for extension in reversed(extensions):
317 frame = extension.decode(frame, max_size=max_size)
319 frame.check()
321 return frame
323 def serialize(
324 self,
325 *,
326 mask: bool,
327 extensions: Sequence[extensions.Extension] | None = None,
328 ) -> bytes:
329 """
330 Serialize a WebSocket frame.
332 Args:
333 mask: Whether the frame should be masked i.e. whether the write
334 happens on the client side.
335 extensions: List of extensions, applied in order.
337 Raises:
338 ProtocolError: If the frame contains incorrect values.
340 """
341 self.check()
343 if extensions is None:
344 extensions = []
345 for extension in extensions:
346 self = extension.encode(self)
348 output = io.BytesIO()
350 # Prepare the header.
351 head1 = (
352 (0b10000000 if self.fin else 0)
353 | (0b01000000 if self.rsv1 else 0)
354 | (0b00100000 if self.rsv2 else 0)
355 | (0b00010000 if self.rsv3 else 0)
356 | self.opcode
357 )
359 head2 = 0b10000000 if mask else 0
361 length = len(self.data)
362 if length < 126:
363 output.write(struct.pack("!BB", head1, head2 | length))
364 elif length < 65536:
365 output.write(struct.pack("!BBH", head1, head2 | 126, length))
366 else:
367 output.write(struct.pack("!BBQ", head1, head2 | 127, length))
369 if mask:
370 mask_bytes = secrets.token_bytes(4)
371 output.write(mask_bytes)
373 # Prepare the data.
374 data: BytesLike
375 if mask:
376 data = apply_mask(self.data, mask_bytes)
377 else:
378 data = self.data
379 output.write(data)
381 return output.getvalue()
383 def check(self) -> None:
384 """
385 Check that reserved bits and opcode have acceptable values.
387 Raises:
388 ProtocolError: If a reserved bit or the opcode is invalid.
390 """
391 if self.rsv1 or self.rsv2 or self.rsv3:
392 raise ProtocolError("reserved bits must be 0")
394 if self.opcode in CTRL_OPCODES:
395 if len(self.data) > 125:
396 raise ProtocolError("control frame too long")
397 if not self.fin:
398 raise ProtocolError("fragmented control frame")
401@dataclasses.dataclass
402class Close:
403 """
404 Code and reason for WebSocket close frames.
406 Attributes:
407 code: Close code.
408 reason: Close reason.
410 """
412 code: CloseCode | int
413 reason: str
415 def __str__(self) -> str:
416 """
417 Return a human-readable representation of a close code and reason.
419 """
420 if 3000 <= self.code < 4000:
421 explanation = "registered"
422 elif 4000 <= self.code < 5000:
423 explanation = "private use"
424 else:
425 explanation = CLOSE_CODE_EXPLANATIONS.get(self.code, "unknown")
426 result = f"{self.code} ({explanation})"
428 if self.reason:
429 result = f"{result} {self.reason}"
431 return result
433 @classmethod
434 def parse(cls, data: BytesLike) -> Close:
435 """
436 Parse the payload of a close frame.
438 Args:
439 data: Payload of the close frame.
441 Raises:
442 ProtocolError: If data is ill-formed.
443 UnicodeDecodeError: If the reason isn't valid UTF-8.
445 """
446 if isinstance(data, memoryview):
447 raise AssertionError("only compressed outgoing frames use memoryview")
448 if len(data) >= 2:
449 (code,) = struct.unpack("!H", data[:2])
450 reason = data[2:].decode()
451 close = cls(code, reason)
452 close.check()
453 return close
454 elif len(data) == 0:
455 return cls(CloseCode.NO_STATUS_RCVD, "")
456 else:
457 raise ProtocolError("close frame too short")
459 def serialize(self) -> bytes:
460 """
461 Serialize the payload of a close frame.
463 """
464 self.check()
465 return struct.pack("!H", self.code) + self.reason.encode()
467 def check(self) -> None:
468 """
469 Check that the close code has a valid value for a close frame.
471 Raises:
472 ProtocolError: If the close code is invalid.
474 """
475 if not (self.code in EXTERNAL_CLOSE_CODES or 3000 <= self.code < 5000):
476 raise ProtocolError("invalid status code")
479def is_utf8_fragment(
480 data: bytes,
481 must_start_clean: bool = False,
482 must_end_clean: bool = False,
483) -> bool:
484 """Guess if data is a fragment of UTF-8 text."""
485 # Possible byte sequences for UTF-8 characters are:
486 # 0xxxxxxx
487 # 110xxxxx 10xxxxxx
488 # 1110xxxx 10xxxxxx 10xxxxxx
489 # 11110xxx 10xxxxxx 10xxxxxx 10xxxxxx
491 # The algorithm determines ``start`` and ``end`` so that ``data[start:end]``
492 # must be a valid UTF-8 sequence for data to be a valid UTF-8 fragment.
494 start, end = 0, len(data)
496 if not must_start_clean:
497 # Remove continuation bytes from the beginning.
498 max_start = min(3, len(data))
499 while start < max_start:
500 byte = data[start]
502 # Continuation byte
503 if byte & 0b11000000 == 0b10000000:
504 start += 1
505 continue
507 break
509 if not must_end_clean:
510 # Remove a partial multibyte sequence from the end.
511 end -= 1 # index of the last byte
512 min_end = max(len(data) - 4, start)
513 while end >= min_end:
514 byte = data[end]
515 # Continuation byte
516 if byte & 0b11000000 == 0b10000000:
517 end -= 1
518 continue
520 # ASCII byte
521 if byte & 0b10000000 == 0b00000000:
522 seq_len = 1
523 # Leading byte of a 2-byte sequence
524 elif byte & 0b11100000 == 0b11000000:
525 seq_len = 2
526 # Leading byte of a 3-byte sequence
527 elif byte & 0b11110000 == 0b11100000:
528 seq_len = 3
529 # Leading byte of a 4-byte sequence
530 elif byte & 0b11111000 == 0b11110000:
531 seq_len = 4
532 # Invalid byte
533 else:
534 seq_len = 0
536 # Cut only when there's an incomplete sequence at the end.
537 if seq_len <= len(data) - end:
538 end = len(data)
540 break
542 try:
543 text = data[start:end].decode()
544 except UnicodeDecodeError:
545 return False
546 else:
547 # Non-printable characters signal binary data.
548 return "\\x" not in repr(text)
551# At the bottom to break import cycles created by type annotations.
552from . import extensions # noqa: E402