Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/_websocket/helpers.py: 37%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Helpers for WebSocket protocol versions 13 and 8."""
3import functools
4import re
5from re import Pattern
6from struct import Struct
7from typing import TYPE_CHECKING, Final
9from ..helpers import NO_EXTENSIONS
10from .models import WSHandshakeError
12UNPACK_LEN3 = Struct("!Q").unpack_from
13UNPACK_CLOSE_CODE = Struct("!H").unpack
14PACK_LEN1 = Struct("!BB").pack
15PACK_LEN2 = Struct("!BBH").pack
16PACK_LEN3 = Struct("!BBQ").pack
17PACK_CLOSE_CODE = Struct("!H").pack
18PACK_RANDBITS = Struct("!L").pack
19MSG_SIZE: Final[int] = 2**14
20MASK_LEN: Final[int] = 4
22WS_KEY: Final[bytes] = b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
25# Used by _websocket_mask_python
26@functools.lru_cache
27def _xor_table() -> list[bytes]:
28 return [bytes(a ^ b for a in range(256)) for b in range(256)]
31def _websocket_mask_python(mask: bytes, data: bytearray) -> None:
32 """Websocket masking function.
34 `mask` is a `bytes` object of length 4; `data` is a `bytearray`
35 object of any length. The contents of `data` are masked with `mask`,
36 as specified in section 5.3 of RFC 6455.
38 Note that this function mutates the `data` argument.
40 This pure-python implementation may be replaced by an optimized
41 version when available.
43 """
44 assert isinstance(data, bytearray), data
45 assert len(mask) == 4, mask
47 if data:
48 _XOR_TABLE = _xor_table()
49 a, b, c, d = (_XOR_TABLE[n] for n in mask)
50 data[::4] = data[::4].translate(a)
51 data[1::4] = data[1::4].translate(b)
52 data[2::4] = data[2::4].translate(c)
53 data[3::4] = data[3::4].translate(d)
56if TYPE_CHECKING or NO_EXTENSIONS:
57 websocket_mask = _websocket_mask_python
58else:
59 try:
60 from .mask import _websocket_mask_cython # type: ignore[import-not-found]
62 websocket_mask = _websocket_mask_cython
63 except ImportError: # pragma: no cover
64 websocket_mask = _websocket_mask_python
67_WS_EXT_RE: Final[Pattern[str]] = re.compile(
68 r"^(?:;\s*(?:"
69 r"(server_no_context_takeover)|"
70 r"(client_no_context_takeover)|"
71 r"(server_max_window_bits(?:=(\d+))?)|"
72 r"(client_max_window_bits(?:=(\d+))?)))*$"
73)
75_WS_EXT_RE_SPLIT: Final[Pattern[str]] = re.compile(r"permessage-deflate([^,]+)?")
78def ws_ext_parse(extstr: str | None, isserver: bool = False) -> tuple[int, bool]:
79 if not extstr:
80 return 0, False
82 compress = 0
83 notakeover = False
84 for ext in _WS_EXT_RE_SPLIT.finditer(extstr):
85 defext = ext.group(1)
86 # Return compress = 15 when get `permessage-deflate`
87 if not defext:
88 compress = 15
89 break
90 match = _WS_EXT_RE.match(defext)
91 if match:
92 compress = 15
93 if isserver:
94 # Server never fail to detect compress handshake.
95 # Server does not need to send max wbit to client
96 if match.group(4):
97 compress = int(match.group(4))
98 # Group3 must match if group4 matches
99 # Compress wbit 8 does not support in zlib
100 # If compress level not support,
101 # CONTINUE to next extension
102 if compress > 15 or compress < 9:
103 compress = 0
104 continue
105 if match.group(1):
106 notakeover = True
107 # Ignore regex group 5 & 6 for client_max_window_bits
108 break
109 else:
110 if match.group(6):
111 compress = int(match.group(6))
112 # Group5 must match if group6 matches
113 # Compress wbit 8 does not support in zlib
114 # If compress level not support,
115 # FAIL the parse progress
116 if compress > 15 or compress < 9:
117 raise WSHandshakeError("Invalid window size")
118 if match.group(2):
119 notakeover = True
120 # Ignore regex group 5 & 6 for client_max_window_bits
121 break
122 # Return Fail if client side and not match
123 elif not isserver:
124 raise WSHandshakeError("Extension for deflate not supported" + ext.group(1))
126 return compress, notakeover
129def ws_ext_gen(
130 compress: int = 15, isserver: bool = False, server_notakeover: bool = False
131) -> str:
132 # client_notakeover=False not used for server
133 # compress wbit 8 does not support in zlib
134 if compress < 9 or compress > 15:
135 raise ValueError(
136 "Compress wbits must between 9 and 15, zlib does not support wbits=8"
137 )
138 enabledext = ["permessage-deflate"]
139 if not isserver:
140 enabledext.append("client_max_window_bits")
142 if compress < 15:
143 enabledext.append("server_max_window_bits=" + str(compress))
144 if server_notakeover:
145 enabledext.append("server_no_context_takeover")
146 # if client_notakeover:
147 # enabledext.append('client_no_context_takeover')
148 return "; ".join(enabledext)