Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/_websocket/helpers.py: 37%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

81 statements  

1"""Helpers for WebSocket protocol versions 13 and 8.""" 

2 

3import functools 

4import re 

5from re import Pattern 

6from struct import Struct 

7from typing import TYPE_CHECKING, Final 

8 

9from ..helpers import NO_EXTENSIONS 

10from .models import WSHandshakeError 

11 

12UNPACK_LEN3 = Struct("!Q").unpack_from 

13UNPACK_CLOSE_CODE = Struct("!H").unpack 

14PACK_LEN1 = Struct("!BB").pack 

15PACK_LEN2 = Struct("!BBH").pack 

16PACK_LEN3 = Struct("!BBQ").pack 

17PACK_CLOSE_CODE = Struct("!H").pack 

18PACK_RANDBITS = Struct("!L").pack 

19MSG_SIZE: Final[int] = 2**14 

20MASK_LEN: Final[int] = 4 

21 

22WS_KEY: Final[bytes] = b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11" 

23 

24 

25# Used by _websocket_mask_python 

26@functools.lru_cache 

27def _xor_table() -> list[bytes]: 

28 return [bytes(a ^ b for a in range(256)) for b in range(256)] 

29 

30 

31def _websocket_mask_python(mask: bytes, data: bytearray) -> None: 

32 """Websocket masking function. 

33 

34 `mask` is a `bytes` object of length 4; `data` is a `bytearray` 

35 object of any length. The contents of `data` are masked with `mask`, 

36 as specified in section 5.3 of RFC 6455. 

37 

38 Note that this function mutates the `data` argument. 

39 

40 This pure-python implementation may be replaced by an optimized 

41 version when available. 

42 

43 """ 

44 assert isinstance(data, bytearray), data 

45 assert len(mask) == 4, mask 

46 

47 if data: 

48 _XOR_TABLE = _xor_table() 

49 a, b, c, d = (_XOR_TABLE[n] for n in mask) 

50 data[::4] = data[::4].translate(a) 

51 data[1::4] = data[1::4].translate(b) 

52 data[2::4] = data[2::4].translate(c) 

53 data[3::4] = data[3::4].translate(d) 

54 

55 

56if TYPE_CHECKING or NO_EXTENSIONS: 

57 websocket_mask = _websocket_mask_python 

58else: 

59 try: 

60 from .mask import _websocket_mask_cython # type: ignore[import-not-found] 

61 

62 websocket_mask = _websocket_mask_cython 

63 except ImportError: # pragma: no cover 

64 websocket_mask = _websocket_mask_python 

65 

66 

67_WS_EXT_RE: Final[Pattern[str]] = re.compile( 

68 r"^(?:;\s*(?:" 

69 r"(server_no_context_takeover)|" 

70 r"(client_no_context_takeover)|" 

71 r"(server_max_window_bits(?:=(\d+))?)|" 

72 r"(client_max_window_bits(?:=(\d+))?)))*$" 

73) 

74 

75_WS_EXT_RE_SPLIT: Final[Pattern[str]] = re.compile(r"permessage-deflate([^,]+)?") 

76 

77 

78def ws_ext_parse(extstr: str | None, isserver: bool = False) -> tuple[int, bool]: 

79 if not extstr: 

80 return 0, False 

81 

82 compress = 0 

83 notakeover = False 

84 for ext in _WS_EXT_RE_SPLIT.finditer(extstr): 

85 defext = ext.group(1) 

86 # Return compress = 15 when get `permessage-deflate` 

87 if not defext: 

88 compress = 15 

89 break 

90 match = _WS_EXT_RE.match(defext) 

91 if match: 

92 compress = 15 

93 if isserver: 

94 # Server never fail to detect compress handshake. 

95 # Server does not need to send max wbit to client 

96 if match.group(4): 

97 compress = int(match.group(4)) 

98 # Group3 must match if group4 matches 

99 # Compress wbit 8 does not support in zlib 

100 # If compress level not support, 

101 # CONTINUE to next extension 

102 if compress > 15 or compress < 9: 

103 compress = 0 

104 continue 

105 if match.group(1): 

106 notakeover = True 

107 # Ignore regex group 5 & 6 for client_max_window_bits 

108 break 

109 else: 

110 if match.group(6): 

111 compress = int(match.group(6)) 

112 # Group5 must match if group6 matches 

113 # Compress wbit 8 does not support in zlib 

114 # If compress level not support, 

115 # FAIL the parse progress 

116 if compress > 15 or compress < 9: 

117 raise WSHandshakeError("Invalid window size") 

118 if match.group(2): 

119 notakeover = True 

120 # Ignore regex group 5 & 6 for client_max_window_bits 

121 break 

122 # Return Fail if client side and not match 

123 elif not isserver: 

124 raise WSHandshakeError("Extension for deflate not supported" + ext.group(1)) 

125 

126 return compress, notakeover 

127 

128 

129def ws_ext_gen( 

130 compress: int = 15, isserver: bool = False, server_notakeover: bool = False 

131) -> str: 

132 # client_notakeover=False not used for server 

133 # compress wbit 8 does not support in zlib 

134 if compress < 9 or compress > 15: 

135 raise ValueError( 

136 "Compress wbits must between 9 and 15, zlib does not support wbits=8" 

137 ) 

138 enabledext = ["permessage-deflate"] 

139 if not isserver: 

140 enabledext.append("client_max_window_bits") 

141 

142 if compress < 15: 

143 enabledext.append("server_max_window_bits=" + str(compress)) 

144 if server_notakeover: 

145 enabledext.append("server_no_context_takeover") 

146 # if client_notakeover: 

147 # enabledext.append('client_no_context_takeover') 

148 return "; ".join(enabledext)