Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/yarl/_quoting_py.py: 61%

155 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:40 +0000

1import codecs 

2import re 

3from string import ascii_letters, ascii_lowercase, digits 

4from typing import Optional, cast 

5 

6BASCII_LOWERCASE = ascii_lowercase.encode("ascii") 

7BPCT_ALLOWED = {f"%{i:02X}".encode("ascii") for i in range(256)} 

8GEN_DELIMS = ":/?#[]@" 

9SUB_DELIMS_WITHOUT_QS = "!$'()*," 

10SUB_DELIMS = SUB_DELIMS_WITHOUT_QS + "+&=;" 

11RESERVED = GEN_DELIMS + SUB_DELIMS 

12UNRESERVED = ascii_letters + digits + "-._~" 

13ALLOWED = UNRESERVED + SUB_DELIMS_WITHOUT_QS 

14 

15 

16_IS_HEX = re.compile(b"[A-Z0-9][A-Z0-9]") 

17_IS_HEX_STR = re.compile("[A-Fa-f0-9][A-Fa-f0-9]") 

18 

19utf8_decoder = codecs.getincrementaldecoder("utf-8") 

20 

21 

22class _Quoter: 

23 def __init__( 

24 self, 

25 *, 

26 safe: str = "", 

27 protected: str = "", 

28 qs: bool = False, 

29 requote: bool = True, 

30 ) -> None: 

31 self._safe = safe 

32 self._protected = protected 

33 self._qs = qs 

34 self._requote = requote 

35 

36 def __call__(self, val: Optional[str]) -> Optional[str]: 

37 if val is None: 

38 return None 

39 if not isinstance(val, str): 

40 raise TypeError("Argument should be str") 

41 if not val: 

42 return "" 

43 bval = cast(str, val).encode("utf8", errors="ignore") 

44 ret = bytearray() 

45 pct = bytearray() 

46 safe = self._safe 

47 safe += ALLOWED 

48 if not self._qs: 

49 safe += "+&=;" 

50 safe += self._protected 

51 bsafe = safe.encode("ascii") 

52 idx = 0 

53 while idx < len(bval): 

54 ch = bval[idx] 

55 idx += 1 

56 

57 if pct: 

58 if ch in BASCII_LOWERCASE: 

59 ch = ch - 32 # convert to uppercase 

60 pct.append(ch) 

61 if len(pct) == 3: # pragma: no branch # peephole optimizer 

62 buf = pct[1:] 

63 if not _IS_HEX.match(buf): 

64 ret.extend(b"%25") 

65 pct.clear() 

66 idx -= 2 

67 continue 

68 try: 

69 unquoted = chr(int(pct[1:].decode("ascii"), base=16)) 

70 except ValueError: 

71 ret.extend(b"%25") 

72 pct.clear() 

73 idx -= 2 

74 continue 

75 

76 if unquoted in self._protected: 

77 ret.extend(pct) 

78 elif unquoted in safe: 

79 ret.append(ord(unquoted)) 

80 else: 

81 ret.extend(pct) 

82 pct.clear() 

83 

84 # special case, if we have only one char after "%" 

85 elif len(pct) == 2 and idx == len(bval): 

86 ret.extend(b"%25") 

87 pct.clear() 

88 idx -= 1 

89 

90 continue 

91 

92 elif ch == ord("%") and self._requote: 

93 pct.clear() 

94 pct.append(ch) 

95 

96 # special case if "%" is last char 

97 if idx == len(bval): 

98 ret.extend(b"%25") 

99 

100 continue 

101 

102 if self._qs: 

103 if ch == ord(" "): 

104 ret.append(ord("+")) 

105 continue 

106 if ch in bsafe: 

107 ret.append(ch) 

108 continue 

109 

110 ret.extend((f"%{ch:02X}").encode("ascii")) 

111 

112 ret2 = ret.decode("ascii") 

113 if ret2 == val: 

114 return val 

115 return ret2 

116 

117 

118class _Unquoter: 

119 def __init__(self, *, unsafe: str = "", qs: bool = False) -> None: 

120 self._unsafe = unsafe 

121 self._qs = qs 

122 self._quoter = _Quoter() 

123 self._qs_quoter = _Quoter(qs=True) 

124 

125 def __call__(self, val: Optional[str]) -> Optional[str]: 

126 if val is None: 

127 return None 

128 if not isinstance(val, str): 

129 raise TypeError("Argument should be str") 

130 if not val: 

131 return "" 

132 decoder = cast(codecs.BufferedIncrementalDecoder, utf8_decoder()) 

133 ret = [] 

134 idx = 0 

135 while idx < len(val): 

136 ch = val[idx] 

137 idx += 1 

138 if ch == "%" and idx <= len(val) - 2: 

139 pct = val[idx : idx + 2] 

140 if _IS_HEX_STR.fullmatch(pct): 

141 b = bytes([int(pct, base=16)]) 

142 idx += 2 

143 try: 

144 unquoted = decoder.decode(b) 

145 except UnicodeDecodeError: 

146 start_pct = idx - 3 - len(decoder.buffer) * 3 

147 ret.append(val[start_pct : idx - 3]) 

148 decoder.reset() 

149 try: 

150 unquoted = decoder.decode(b) 

151 except UnicodeDecodeError: 

152 ret.append(val[idx - 3 : idx]) 

153 continue 

154 if not unquoted: 

155 continue 

156 if self._qs and unquoted in "+=&;": 

157 to_add = self._qs_quoter(unquoted) 

158 if to_add is None: # pragma: no cover 

159 raise RuntimeError("Cannot quote None") 

160 ret.append(to_add) 

161 elif unquoted in self._unsafe: 

162 to_add = self._quoter(unquoted) 

163 if to_add is None: # pragma: no cover 

164 raise RuntimeError("Cannot quote None") 

165 ret.append(to_add) 

166 else: 

167 ret.append(unquoted) 

168 continue 

169 

170 if decoder.buffer: 

171 start_pct = idx - 1 - len(decoder.buffer) * 3 

172 ret.append(val[start_pct : idx - 1]) 

173 decoder.reset() 

174 

175 if ch == "+": 

176 if not self._qs or ch in self._unsafe: 

177 ret.append("+") 

178 else: 

179 ret.append(" ") 

180 continue 

181 

182 if ch in self._unsafe: 

183 ret.append("%") 

184 h = hex(ord(ch)).upper()[2:] 

185 for ch in h: 

186 ret.append(ch) 

187 continue 

188 

189 ret.append(ch) 

190 

191 if decoder.buffer: 

192 ret.append(val[-len(decoder.buffer) * 3 :]) 

193 

194 ret2 = "".join(ret) 

195 if ret2 == val: 

196 return val 

197 return ret2