Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/_internal.py: 30%

276 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1import logging 

2import operator 

3import re 

4import string 

5import sys 

6import typing 

7import typing as t 

8from datetime import date 

9from datetime import datetime 

10from datetime import timezone 

11from itertools import chain 

12from weakref import WeakKeyDictionary 

13 

14if t.TYPE_CHECKING: 

15 from _typeshed.wsgi import StartResponse 

16 from _typeshed.wsgi import WSGIApplication 

17 from _typeshed.wsgi import WSGIEnvironment 

18 from .wrappers.request import Request # noqa: F401 

19 

20_logger: t.Optional[logging.Logger] = None 

21_signature_cache = WeakKeyDictionary() # type: ignore 

22_epoch_ord = date(1970, 1, 1).toordinal() 

23_legal_cookie_chars = frozenset( 

24 c.encode("ascii") 

25 for c in f"{string.ascii_letters}{string.digits}/=!#$%&'*+-.^_`|~:" 

26) 

27 

28_cookie_quoting_map = {b",": b"\\054", b";": b"\\073", b'"': b'\\"', b"\\": b"\\\\"} 

29for _i in chain(range(32), range(127, 256)): 

30 _cookie_quoting_map[_i.to_bytes(1, sys.byteorder)] = f"\\{_i:03o}".encode("latin1") 

31 

32_octal_re = re.compile(rb"\\[0-3][0-7][0-7]") 

33_quote_re = re.compile(rb"[\\].") 

34_legal_cookie_chars_re = rb"[\w\d!#%&\'~_`><@,:/\$\*\+\-\.\^\|\)\(\?\}\{\=]" 

35_cookie_re = re.compile( 

36 rb""" 

37 (?P<key>[^=;]*) 

38 (?:\s*=\s* 

39 (?P<val> 

40 "(?:[^\\"]|\\.)*" | 

41 (?:.*?) 

42 ) 

43 )? 

44 \s*; 

45""", 

46 flags=re.VERBOSE, 

47) 

48 

49 

50class _Missing: 

51 def __repr__(self) -> str: 

52 return "no value" 

53 

54 def __reduce__(self) -> str: 

55 return "_missing" 

56 

57 

58_missing = _Missing() 

59 

60 

61@typing.overload 

62def _make_encode_wrapper(reference: str) -> t.Callable[[str], str]: 

63 ... 

64 

65 

66@typing.overload 

67def _make_encode_wrapper(reference: bytes) -> t.Callable[[str], bytes]: 

68 ... 

69 

70 

71def _make_encode_wrapper(reference: t.AnyStr) -> t.Callable[[str], t.AnyStr]: 

72 """Create a function that will be called with a string argument. If 

73 the reference is bytes, values will be encoded to bytes. 

74 """ 

75 if isinstance(reference, str): 

76 return lambda x: x 

77 

78 return operator.methodcaller("encode", "latin1") 

79 

80 

81def _check_str_tuple(value: t.Tuple[t.AnyStr, ...]) -> None: 

82 """Ensure tuple items are all strings or all bytes.""" 

83 if not value: 

84 return 

85 

86 item_type = str if isinstance(value[0], str) else bytes 

87 

88 if any(not isinstance(item, item_type) for item in value): 

89 raise TypeError(f"Cannot mix str and bytes arguments (got {value!r})") 

90 

91 

92_default_encoding = sys.getdefaultencoding() 

93 

94 

95def _to_bytes( 

96 x: t.Union[str, bytes], charset: str = _default_encoding, errors: str = "strict" 

97) -> bytes: 

98 if x is None or isinstance(x, bytes): 

99 return x 

100 

101 if isinstance(x, (bytearray, memoryview)): 

102 return bytes(x) 

103 

104 if isinstance(x, str): 

105 return x.encode(charset, errors) 

106 

107 raise TypeError("Expected bytes") 

108 

109 

110@typing.overload 

111def _to_str( # type: ignore 

112 x: None, 

113 charset: t.Optional[str] = ..., 

114 errors: str = ..., 

115 allow_none_charset: bool = ..., 

116) -> None: 

117 ... 

118 

119 

120@typing.overload 

121def _to_str( 

122 x: t.Any, 

123 charset: t.Optional[str] = ..., 

124 errors: str = ..., 

125 allow_none_charset: bool = ..., 

126) -> str: 

127 ... 

128 

129 

130def _to_str( 

131 x: t.Optional[t.Any], 

132 charset: t.Optional[str] = _default_encoding, 

133 errors: str = "strict", 

134 allow_none_charset: bool = False, 

135) -> t.Optional[t.Union[str, bytes]]: 

136 if x is None or isinstance(x, str): 

137 return x 

138 

139 if not isinstance(x, (bytes, bytearray)): 

140 return str(x) 

141 

142 if charset is None: 

143 if allow_none_charset: 

144 return x 

145 

146 return x.decode(charset, errors) # type: ignore 

147 

148 

149def _wsgi_decoding_dance( 

150 s: str, charset: str = "utf-8", errors: str = "replace" 

151) -> str: 

152 return s.encode("latin1").decode(charset, errors) 

153 

154 

155def _wsgi_encoding_dance( 

156 s: str, charset: str = "utf-8", errors: str = "replace" 

157) -> str: 

158 if isinstance(s, bytes): 

159 return s.decode("latin1", errors) 

160 

161 return s.encode(charset).decode("latin1", errors) 

162 

163 

164def _get_environ(obj: t.Union["WSGIEnvironment", "Request"]) -> "WSGIEnvironment": 

165 env = getattr(obj, "environ", obj) 

166 assert isinstance( 

167 env, dict 

168 ), f"{type(obj).__name__!r} is not a WSGI environment (has to be a dict)" 

169 return env 

170 

171 

172def _has_level_handler(logger: logging.Logger) -> bool: 

173 """Check if there is a handler in the logging chain that will handle 

174 the given logger's effective level. 

175 """ 

176 level = logger.getEffectiveLevel() 

177 current = logger 

178 

179 while current: 

180 if any(handler.level <= level for handler in current.handlers): 

181 return True 

182 

183 if not current.propagate: 

184 break 

185 

186 current = current.parent # type: ignore 

187 

188 return False 

189 

190 

191class _ColorStreamHandler(logging.StreamHandler): 

192 """On Windows, wrap stream with Colorama for ANSI style support.""" 

193 

194 def __init__(self) -> None: 

195 try: 

196 import colorama 

197 except ImportError: 

198 stream = None 

199 else: 

200 stream = colorama.AnsiToWin32(sys.stderr) 

201 

202 super().__init__(stream) 

203 

204 

205def _log(type: str, message: str, *args: t.Any, **kwargs: t.Any) -> None: 

206 """Log a message to the 'werkzeug' logger. 

207 

208 The logger is created the first time it is needed. If there is no 

209 level set, it is set to :data:`logging.INFO`. If there is no handler 

210 for the logger's effective level, a :class:`logging.StreamHandler` 

211 is added. 

212 """ 

213 global _logger 

214 

215 if _logger is None: 

216 _logger = logging.getLogger("werkzeug") 

217 

218 if _logger.level == logging.NOTSET: 

219 _logger.setLevel(logging.INFO) 

220 

221 if not _has_level_handler(_logger): 

222 _logger.addHandler(_ColorStreamHandler()) 

223 

224 getattr(_logger, type)(message.rstrip(), *args, **kwargs) 

225 

226 

227@typing.overload 

228def _dt_as_utc(dt: None) -> None: 

229 ... 

230 

231 

232@typing.overload 

233def _dt_as_utc(dt: datetime) -> datetime: 

234 ... 

235 

236 

237def _dt_as_utc(dt: t.Optional[datetime]) -> t.Optional[datetime]: 

238 if dt is None: 

239 return dt 

240 

241 if dt.tzinfo is None: 

242 return dt.replace(tzinfo=timezone.utc) 

243 elif dt.tzinfo != timezone.utc: 

244 return dt.astimezone(timezone.utc) 

245 

246 return dt 

247 

248 

249_TAccessorValue = t.TypeVar("_TAccessorValue") 

250 

251 

252class _DictAccessorProperty(t.Generic[_TAccessorValue]): 

253 """Baseclass for `environ_property` and `header_property`.""" 

254 

255 read_only = False 

256 

257 def __init__( 

258 self, 

259 name: str, 

260 default: t.Optional[_TAccessorValue] = None, 

261 load_func: t.Optional[t.Callable[[str], _TAccessorValue]] = None, 

262 dump_func: t.Optional[t.Callable[[_TAccessorValue], str]] = None, 

263 read_only: t.Optional[bool] = None, 

264 doc: t.Optional[str] = None, 

265 ) -> None: 

266 self.name = name 

267 self.default = default 

268 self.load_func = load_func 

269 self.dump_func = dump_func 

270 if read_only is not None: 

271 self.read_only = read_only 

272 self.__doc__ = doc 

273 

274 def lookup(self, instance: t.Any) -> t.MutableMapping[str, t.Any]: 

275 raise NotImplementedError 

276 

277 @typing.overload 

278 def __get__( 

279 self, instance: None, owner: type 

280 ) -> "_DictAccessorProperty[_TAccessorValue]": 

281 ... 

282 

283 @typing.overload 

284 def __get__(self, instance: t.Any, owner: type) -> _TAccessorValue: 

285 ... 

286 

287 def __get__( 

288 self, instance: t.Optional[t.Any], owner: type 

289 ) -> t.Union[_TAccessorValue, "_DictAccessorProperty[_TAccessorValue]"]: 

290 if instance is None: 

291 return self 

292 

293 storage = self.lookup(instance) 

294 

295 if self.name not in storage: 

296 return self.default # type: ignore 

297 

298 value = storage[self.name] 

299 

300 if self.load_func is not None: 

301 try: 

302 return self.load_func(value) 

303 except (ValueError, TypeError): 

304 return self.default # type: ignore 

305 

306 return value # type: ignore 

307 

308 def __set__(self, instance: t.Any, value: _TAccessorValue) -> None: 

309 if self.read_only: 

310 raise AttributeError("read only property") 

311 

312 if self.dump_func is not None: 

313 self.lookup(instance)[self.name] = self.dump_func(value) 

314 else: 

315 self.lookup(instance)[self.name] = value 

316 

317 def __delete__(self, instance: t.Any) -> None: 

318 if self.read_only: 

319 raise AttributeError("read only property") 

320 

321 self.lookup(instance).pop(self.name, None) 

322 

323 def __repr__(self) -> str: 

324 return f"<{type(self).__name__} {self.name}>" 

325 

326 

327def _cookie_quote(b: bytes) -> bytes: 

328 buf = bytearray() 

329 all_legal = True 

330 _lookup = _cookie_quoting_map.get 

331 _push = buf.extend 

332 

333 for char_int in b: 

334 char = char_int.to_bytes(1, sys.byteorder) 

335 if char not in _legal_cookie_chars: 

336 all_legal = False 

337 char = _lookup(char, char) 

338 _push(char) 

339 

340 if all_legal: 

341 return bytes(buf) 

342 return bytes(b'"' + buf + b'"') 

343 

344 

345def _cookie_unquote(b: bytes) -> bytes: 

346 if len(b) < 2: 

347 return b 

348 if b[:1] != b'"' or b[-1:] != b'"': 

349 return b 

350 

351 b = b[1:-1] 

352 

353 i = 0 

354 n = len(b) 

355 rv = bytearray() 

356 _push = rv.extend 

357 

358 while 0 <= i < n: 

359 o_match = _octal_re.search(b, i) 

360 q_match = _quote_re.search(b, i) 

361 if not o_match and not q_match: 

362 rv.extend(b[i:]) 

363 break 

364 j = k = -1 

365 if o_match: 

366 j = o_match.start(0) 

367 if q_match: 

368 k = q_match.start(0) 

369 if q_match and (not o_match or k < j): 

370 _push(b[i:k]) 

371 _push(b[k + 1 : k + 2]) 

372 i = k + 2 

373 else: 

374 _push(b[i:j]) 

375 rv.append(int(b[j + 1 : j + 4], 8)) 

376 i = j + 4 

377 

378 return bytes(rv) 

379 

380 

381def _cookie_parse_impl(b: bytes) -> t.Iterator[t.Tuple[bytes, bytes]]: 

382 """Lowlevel cookie parsing facility that operates on bytes.""" 

383 i = 0 

384 n = len(b) 

385 b += b";" 

386 

387 while i < n: 

388 match = _cookie_re.match(b, i) 

389 

390 if not match: 

391 break 

392 

393 i = match.end(0) 

394 key = match.group("key").strip() 

395 

396 if not key: 

397 continue 

398 

399 value = match.group("val") or b"" 

400 yield key, _cookie_unquote(value) 

401 

402 

403def _encode_idna(domain: str) -> bytes: 

404 # If we're given bytes, make sure they fit into ASCII 

405 if isinstance(domain, bytes): 

406 domain.decode("ascii") 

407 return domain 

408 

409 # Otherwise check if it's already ascii, then return 

410 try: 

411 return domain.encode("ascii") 

412 except UnicodeError: 

413 pass 

414 

415 # Otherwise encode each part separately 

416 return b".".join(p.encode("idna") for p in domain.split(".")) 

417 

418 

419def _decode_idna(domain: t.Union[str, bytes]) -> str: 

420 # If the input is a string try to encode it to ascii to do the idna 

421 # decoding. If that fails because of a unicode error, then we 

422 # already have a decoded idna domain. 

423 if isinstance(domain, str): 

424 try: 

425 domain = domain.encode("ascii") 

426 except UnicodeError: 

427 return domain # type: ignore 

428 

429 # Decode each part separately. If a part fails, try to decode it 

430 # with ascii and silently ignore errors. This makes sense because 

431 # the idna codec does not have error handling. 

432 def decode_part(part: bytes) -> str: 

433 try: 

434 return part.decode("idna") 

435 except UnicodeError: 

436 return part.decode("ascii", "ignore") 

437 

438 return ".".join(decode_part(p) for p in domain.split(b".")) 

439 

440 

441@typing.overload 

442def _make_cookie_domain(domain: None) -> None: 

443 ... 

444 

445 

446@typing.overload 

447def _make_cookie_domain(domain: str) -> bytes: 

448 ... 

449 

450 

451def _make_cookie_domain(domain: t.Optional[str]) -> t.Optional[bytes]: 

452 if domain is None: 

453 return None 

454 domain = _encode_idna(domain) 

455 if b":" in domain: 

456 domain = domain.split(b":", 1)[0] 

457 if b"." in domain: 

458 return domain 

459 raise ValueError( 

460 "Setting 'domain' for a cookie on a server running locally (ex: " 

461 "localhost) is not supported by complying browsers. You should " 

462 "have something like: '127.0.0.1 localhost dev.localhost' on " 

463 "your hosts file and then point your server to run on " 

464 "'dev.localhost' and also set 'domain' for 'dev.localhost'" 

465 ) 

466 

467 

468def _easteregg(app: t.Optional["WSGIApplication"] = None) -> "WSGIApplication": 

469 """Like the name says. But who knows how it works?""" 

470 

471 def bzzzzzzz(gyver: bytes) -> str: 

472 import base64 

473 import zlib 

474 

475 return zlib.decompress(base64.b64decode(gyver)).decode("ascii") 

476 

477 gyver = "\n".join( 

478 [ 

479 x + (77 - len(x)) * " " 

480 for x in bzzzzzzz( 

481 b""" 

482eJyFlzuOJDkMRP06xRjymKgDJCDQStBYT8BCgK4gTwfQ2fcFs2a2FzvZk+hvlcRvRJD148efHt9m 

4839Xz94dRY5hGt1nrYcXx7us9qlcP9HHNh28rz8dZj+q4rynVFFPdlY4zH873NKCexrDM6zxxRymzz 

4844QIxzK4bth1PV7+uHn6WXZ5C4ka/+prFzx3zWLMHAVZb8RRUxtFXI5DTQ2n3Hi2sNI+HK43AOWSY 

485jmEzE4naFp58PdzhPMdslLVWHTGUVpSxImw+pS/D+JhzLfdS1j7PzUMxij+mc2U0I9zcbZ/HcZxc 

486q1QjvvcThMYFnp93agEx392ZdLJWXbi/Ca4Oivl4h/Y1ErEqP+lrg7Xa4qnUKu5UE9UUA4xeqLJ5 

487jWlPKJvR2yhRI7xFPdzPuc6adXu6ovwXwRPXXnZHxlPtkSkqWHilsOrGrvcVWXgGP3daXomCj317 

4888P2UOw/NnA0OOikZyFf3zZ76eN9QXNwYdD8f8/LdBRFg0BO3bB+Pe/+G8er8tDJv83XTkj7WeMBJ 

489v/rnAfdO51d6sFglfi8U7zbnr0u9tyJHhFZNXYfH8Iafv2Oa+DT6l8u9UYlajV/hcEgk1x8E8L/r 

490XJXl2SK+GJCxtnyhVKv6GFCEB1OO3f9YWAIEbwcRWv/6RPpsEzOkXURMN37J0PoCSYeBnJQd9Giu 

491LxYQJNlYPSo/iTQwgaihbART7Fcyem2tTSCcwNCs85MOOpJtXhXDe0E7zgZJkcxWTar/zEjdIVCk 

492iXy87FW6j5aGZhttDBoAZ3vnmlkx4q4mMmCdLtnHkBXFMCReqthSGkQ+MDXLLCpXwBs0t+sIhsDI 

493tjBB8MwqYQpLygZ56rRHHpw+OAVyGgaGRHWy2QfXez+ZQQTTBkmRXdV/A9LwH6XGZpEAZU8rs4pE 

4941R4FQ3Uwt8RKEtRc0/CrANUoes3EzM6WYcFyskGZ6UTHJWenBDS7h163Eo2bpzqxNE9aVgEM2CqI 

495GAJe9Yra4P5qKmta27VjzYdR04Vc7KHeY4vs61C0nbywFmcSXYjzBHdiEjraS7PGG2jHHTpJUMxN 

496Jlxr3pUuFvlBWLJGE3GcA1/1xxLcHmlO+LAXbhrXah1tD6Ze+uqFGdZa5FM+3eHcKNaEarutAQ0A 

497QMAZHV+ve6LxAwWnXbbSXEG2DmCX5ijeLCKj5lhVFBrMm+ryOttCAeFpUdZyQLAQkA06RLs56rzG 

4988MID55vqr/g64Qr/wqwlE0TVxgoiZhHrbY2h1iuuyUVg1nlkpDrQ7Vm1xIkI5XRKLedN9EjzVchu 

499jQhXcVkjVdgP2O99QShpdvXWoSwkp5uMwyjt3jiWCqWGSiaaPAzohjPanXVLbM3x0dNskJsaCEyz 

500DTKIs+7WKJD4ZcJGfMhLFBf6hlbnNkLEePF8Cx2o2kwmYF4+MzAxa6i+6xIQkswOqGO+3x9NaZX8 

501MrZRaFZpLeVTYI9F/djY6DDVVs340nZGmwrDqTCiiqD5luj3OzwpmQCiQhdRYowUYEA3i1WWGwL4 

502GCtSoO4XbIPFeKGU13XPkDf5IdimLpAvi2kVDVQbzOOa4KAXMFlpi/hV8F6IDe0Y2reg3PuNKT3i 

503RYhZqtkQZqSB2Qm0SGtjAw7RDwaM1roESC8HWiPxkoOy0lLTRFG39kvbLZbU9gFKFRvixDZBJmpi 

504Xyq3RE5lW00EJjaqwp/v3EByMSpVZYsEIJ4APaHmVtpGSieV5CALOtNUAzTBiw81GLgC0quyzf6c 

505NlWknzJeCsJ5fup2R4d8CYGN77mu5vnO1UqbfElZ9E6cR6zbHjgsr9ly18fXjZoPeDjPuzlWbFwS 

506pdvPkhntFvkc13qb9094LL5NrA3NIq3r9eNnop9DizWOqCEbyRBFJTHn6Tt3CG1o8a4HevYh0XiJ 

507sR0AVVHuGuMOIfbuQ/OKBkGRC6NJ4u7sbPX8bG/n5sNIOQ6/Y/BX3IwRlTSabtZpYLB85lYtkkgm 

508p1qXK3Du2mnr5INXmT/78KI12n11EFBkJHHp0wJyLe9MvPNUGYsf+170maayRoy2lURGHAIapSpQ 

509krEDuNoJCHNlZYhKpvw4mspVWxqo415n8cD62N9+EfHrAvqQnINStetek7RY2Urv8nxsnGaZfRr/ 

510nhXbJ6m/yl1LzYqscDZA9QHLNbdaSTTr+kFg3bC0iYbX/eQy0Bv3h4B50/SGYzKAXkCeOLI3bcAt 

511mj2Z/FM1vQWgDynsRwNvrWnJHlespkrp8+vO1jNaibm+PhqXPPv30YwDZ6jApe3wUjFQobghvW9p 

5127f2zLkGNv8b191cD/3vs9Q833z8t""" 

513 ).splitlines() 

514 ] 

515 ) 

516 

517 def easteregged( 

518 environ: "WSGIEnvironment", start_response: "StartResponse" 

519 ) -> t.Iterable[bytes]: 

520 def injecting_start_response( 

521 status: str, headers: t.List[t.Tuple[str, str]], exc_info: t.Any = None 

522 ) -> t.Callable[[bytes], t.Any]: 

523 headers.append(("X-Powered-By", "Werkzeug")) 

524 return start_response(status, headers, exc_info) 

525 

526 if app is not None and environ.get("QUERY_STRING") != "macgybarchakku": 

527 return app(environ, injecting_start_response) 

528 injecting_start_response("200 OK", [("Content-Type", "text/html")]) 

529 return [ 

530 f"""\ 

531<!doctype html> 

532<html lang=en> 

533<head> 

534<title>About Werkzeug</title> 

535<style type="text/css"> 

536 body {{ font: 15px Georgia, serif; text-align: center; }} 

537 a {{ color: #333; text-decoration: none; }} 

538 h1 {{ font-size: 30px; margin: 20px 0 10px 0; }} 

539 p {{ margin: 0 0 30px 0; }} 

540 pre {{ font: 11px 'Consolas', 'Monaco', monospace; line-height: 0.95; }} 

541</style> 

542</head> 

543<body> 

544<h1><a href="http://werkzeug.pocoo.org/">Werkzeug</a></h1> 

545<p>the Swiss Army knife of Python web development.</p> 

546<pre>{gyver}\n\n\n</pre> 

547</body> 

548</html>""".encode( 

549 "latin1" 

550 ) 

551 ] 

552 

553 return easteregged