Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/yarl/_quoting_py.py: 61%
155 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:40 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:40 +0000
1import codecs
2import re
3from string import ascii_letters, ascii_lowercase, digits
4from typing import Optional, cast
6BASCII_LOWERCASE = ascii_lowercase.encode("ascii")
7BPCT_ALLOWED = {f"%{i:02X}".encode("ascii") for i in range(256)}
8GEN_DELIMS = ":/?#[]@"
9SUB_DELIMS_WITHOUT_QS = "!$'()*,"
10SUB_DELIMS = SUB_DELIMS_WITHOUT_QS + "+&=;"
11RESERVED = GEN_DELIMS + SUB_DELIMS
12UNRESERVED = ascii_letters + digits + "-._~"
13ALLOWED = UNRESERVED + SUB_DELIMS_WITHOUT_QS
16_IS_HEX = re.compile(b"[A-Z0-9][A-Z0-9]")
17_IS_HEX_STR = re.compile("[A-Fa-f0-9][A-Fa-f0-9]")
19utf8_decoder = codecs.getincrementaldecoder("utf-8")
22class _Quoter:
23 def __init__(
24 self,
25 *,
26 safe: str = "",
27 protected: str = "",
28 qs: bool = False,
29 requote: bool = True,
30 ) -> None:
31 self._safe = safe
32 self._protected = protected
33 self._qs = qs
34 self._requote = requote
36 def __call__(self, val: Optional[str]) -> Optional[str]:
37 if val is None:
38 return None
39 if not isinstance(val, str):
40 raise TypeError("Argument should be str")
41 if not val:
42 return ""
43 bval = cast(str, val).encode("utf8", errors="ignore")
44 ret = bytearray()
45 pct = bytearray()
46 safe = self._safe
47 safe += ALLOWED
48 if not self._qs:
49 safe += "+&=;"
50 safe += self._protected
51 bsafe = safe.encode("ascii")
52 idx = 0
53 while idx < len(bval):
54 ch = bval[idx]
55 idx += 1
57 if pct:
58 if ch in BASCII_LOWERCASE:
59 ch = ch - 32 # convert to uppercase
60 pct.append(ch)
61 if len(pct) == 3: # pragma: no branch # peephole optimizer
62 buf = pct[1:]
63 if not _IS_HEX.match(buf):
64 ret.extend(b"%25")
65 pct.clear()
66 idx -= 2
67 continue
68 try:
69 unquoted = chr(int(pct[1:].decode("ascii"), base=16))
70 except ValueError:
71 ret.extend(b"%25")
72 pct.clear()
73 idx -= 2
74 continue
76 if unquoted in self._protected:
77 ret.extend(pct)
78 elif unquoted in safe:
79 ret.append(ord(unquoted))
80 else:
81 ret.extend(pct)
82 pct.clear()
84 # special case, if we have only one char after "%"
85 elif len(pct) == 2 and idx == len(bval):
86 ret.extend(b"%25")
87 pct.clear()
88 idx -= 1
90 continue
92 elif ch == ord("%") and self._requote:
93 pct.clear()
94 pct.append(ch)
96 # special case if "%" is last char
97 if idx == len(bval):
98 ret.extend(b"%25")
100 continue
102 if self._qs:
103 if ch == ord(" "):
104 ret.append(ord("+"))
105 continue
106 if ch in bsafe:
107 ret.append(ch)
108 continue
110 ret.extend((f"%{ch:02X}").encode("ascii"))
112 ret2 = ret.decode("ascii")
113 if ret2 == val:
114 return val
115 return ret2
118class _Unquoter:
119 def __init__(self, *, unsafe: str = "", qs: bool = False) -> None:
120 self._unsafe = unsafe
121 self._qs = qs
122 self._quoter = _Quoter()
123 self._qs_quoter = _Quoter(qs=True)
125 def __call__(self, val: Optional[str]) -> Optional[str]:
126 if val is None:
127 return None
128 if not isinstance(val, str):
129 raise TypeError("Argument should be str")
130 if not val:
131 return ""
132 decoder = cast(codecs.BufferedIncrementalDecoder, utf8_decoder())
133 ret = []
134 idx = 0
135 while idx < len(val):
136 ch = val[idx]
137 idx += 1
138 if ch == "%" and idx <= len(val) - 2:
139 pct = val[idx : idx + 2]
140 if _IS_HEX_STR.fullmatch(pct):
141 b = bytes([int(pct, base=16)])
142 idx += 2
143 try:
144 unquoted = decoder.decode(b)
145 except UnicodeDecodeError:
146 start_pct = idx - 3 - len(decoder.buffer) * 3
147 ret.append(val[start_pct : idx - 3])
148 decoder.reset()
149 try:
150 unquoted = decoder.decode(b)
151 except UnicodeDecodeError:
152 ret.append(val[idx - 3 : idx])
153 continue
154 if not unquoted:
155 continue
156 if self._qs and unquoted in "+=&;":
157 to_add = self._qs_quoter(unquoted)
158 if to_add is None: # pragma: no cover
159 raise RuntimeError("Cannot quote None")
160 ret.append(to_add)
161 elif unquoted in self._unsafe:
162 to_add = self._quoter(unquoted)
163 if to_add is None: # pragma: no cover
164 raise RuntimeError("Cannot quote None")
165 ret.append(to_add)
166 else:
167 ret.append(unquoted)
168 continue
170 if decoder.buffer:
171 start_pct = idx - 1 - len(decoder.buffer) * 3
172 ret.append(val[start_pct : idx - 1])
173 decoder.reset()
175 if ch == "+":
176 if not self._qs or ch in self._unsafe:
177 ret.append("+")
178 else:
179 ret.append(" ")
180 continue
182 if ch in self._unsafe:
183 ret.append("%")
184 h = hex(ord(ch)).upper()[2:]
185 for ch in h:
186 ret.append(ch)
187 continue
189 ret.append(ch)
191 if decoder.buffer:
192 ret.append(val[-len(decoder.buffer) * 3 :])
194 ret2 = "".join(ret)
195 if ret2 == val:
196 return val
197 return ret2