Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/_internal.py: 31%
273 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1import logging
2import operator
3import re
4import string
5import sys
6import typing
7import typing as t
8from datetime import date
9from datetime import datetime
10from datetime import timezone
11from itertools import chain
12from weakref import WeakKeyDictionary
14if t.TYPE_CHECKING:
15 from _typeshed.wsgi import StartResponse
16 from _typeshed.wsgi import WSGIApplication
17 from _typeshed.wsgi import WSGIEnvironment
18 from .wrappers.request import Request # noqa: F401
20_logger: t.Optional[logging.Logger] = None
21_signature_cache = WeakKeyDictionary() # type: ignore
22_epoch_ord = date(1970, 1, 1).toordinal()
23_legal_cookie_chars = frozenset(
24 c.encode("ascii")
25 for c in f"{string.ascii_letters}{string.digits}/=!#$%&'*+-.^_`|~:"
26)
28_cookie_quoting_map = {b",": b"\\054", b";": b"\\073", b'"': b'\\"', b"\\": b"\\\\"}
29for _i in chain(range(32), range(127, 256)):
30 _cookie_quoting_map[_i.to_bytes(1, sys.byteorder)] = f"\\{_i:03o}".encode("latin1")
32_octal_re = re.compile(rb"\\[0-3][0-7][0-7]")
33_quote_re = re.compile(rb"[\\].")
34_legal_cookie_chars_re = rb"[\w\d!#%&\'~_`><@,:/\$\*\+\-\.\^\|\)\(\?\}\{\=]"
35_cookie_re = re.compile(
36 rb"""
37 (?P<key>[^=;]+)
38 (?:\s*=\s*
39 (?P<val>
40 "(?:[^\\"]|\\.)*" |
41 (?:.*?)
42 )
43 )?
44 \s*;
45""",
46 flags=re.VERBOSE,
47)
50class _Missing:
51 def __repr__(self) -> str:
52 return "no value"
54 def __reduce__(self) -> str:
55 return "_missing"
58_missing = _Missing()
61@typing.overload
62def _make_encode_wrapper(reference: str) -> t.Callable[[str], str]:
63 ...
66@typing.overload
67def _make_encode_wrapper(reference: bytes) -> t.Callable[[str], bytes]:
68 ...
71def _make_encode_wrapper(reference: t.AnyStr) -> t.Callable[[str], t.AnyStr]:
72 """Create a function that will be called with a string argument. If
73 the reference is bytes, values will be encoded to bytes.
74 """
75 if isinstance(reference, str):
76 return lambda x: x
78 return operator.methodcaller("encode", "latin1")
81def _check_str_tuple(value: t.Tuple[t.AnyStr, ...]) -> None:
82 """Ensure tuple items are all strings or all bytes."""
83 if not value:
84 return
86 item_type = str if isinstance(value[0], str) else bytes
88 if any(not isinstance(item, item_type) for item in value):
89 raise TypeError(f"Cannot mix str and bytes arguments (got {value!r})")
92_default_encoding = sys.getdefaultencoding()
95def _to_bytes(
96 x: t.Union[str, bytes], charset: str = _default_encoding, errors: str = "strict"
97) -> bytes:
98 if x is None or isinstance(x, bytes):
99 return x
101 if isinstance(x, (bytearray, memoryview)):
102 return bytes(x)
104 if isinstance(x, str):
105 return x.encode(charset, errors)
107 raise TypeError("Expected bytes")
110@typing.overload
111def _to_str( # type: ignore
112 x: None,
113 charset: t.Optional[str] = ...,
114 errors: str = ...,
115 allow_none_charset: bool = ...,
116) -> None:
117 ...
120@typing.overload
121def _to_str(
122 x: t.Any,
123 charset: t.Optional[str] = ...,
124 errors: str = ...,
125 allow_none_charset: bool = ...,
126) -> str:
127 ...
130def _to_str(
131 x: t.Optional[t.Any],
132 charset: t.Optional[str] = _default_encoding,
133 errors: str = "strict",
134 allow_none_charset: bool = False,
135) -> t.Optional[t.Union[str, bytes]]:
136 if x is None or isinstance(x, str):
137 return x
139 if not isinstance(x, (bytes, bytearray)):
140 return str(x)
142 if charset is None:
143 if allow_none_charset:
144 return x
146 return x.decode(charset, errors) # type: ignore
149def _wsgi_decoding_dance(
150 s: str, charset: str = "utf-8", errors: str = "replace"
151) -> str:
152 return s.encode("latin1").decode(charset, errors)
155def _wsgi_encoding_dance(
156 s: str, charset: str = "utf-8", errors: str = "replace"
157) -> str:
158 if isinstance(s, bytes):
159 return s.decode("latin1", errors)
161 return s.encode(charset).decode("latin1", errors)
164def _get_environ(obj: t.Union["WSGIEnvironment", "Request"]) -> "WSGIEnvironment":
165 env = getattr(obj, "environ", obj)
166 assert isinstance(
167 env, dict
168 ), f"{type(obj).__name__!r} is not a WSGI environment (has to be a dict)"
169 return env
172def _has_level_handler(logger: logging.Logger) -> bool:
173 """Check if there is a handler in the logging chain that will handle
174 the given logger's effective level.
175 """
176 level = logger.getEffectiveLevel()
177 current = logger
179 while current:
180 if any(handler.level <= level for handler in current.handlers):
181 return True
183 if not current.propagate:
184 break
186 current = current.parent # type: ignore
188 return False
191class _ColorStreamHandler(logging.StreamHandler):
192 """On Windows, wrap stream with Colorama for ANSI style support."""
194 def __init__(self) -> None:
195 try:
196 import colorama
197 except ImportError:
198 stream = None
199 else:
200 stream = colorama.AnsiToWin32(sys.stderr)
202 super().__init__(stream)
205def _log(type: str, message: str, *args: t.Any, **kwargs: t.Any) -> None:
206 """Log a message to the 'werkzeug' logger.
208 The logger is created the first time it is needed. If there is no
209 level set, it is set to :data:`logging.INFO`. If there is no handler
210 for the logger's effective level, a :class:`logging.StreamHandler`
211 is added.
212 """
213 global _logger
215 if _logger is None:
216 _logger = logging.getLogger("werkzeug")
218 if _logger.level == logging.NOTSET:
219 _logger.setLevel(logging.INFO)
221 if not _has_level_handler(_logger):
222 _logger.addHandler(_ColorStreamHandler())
224 getattr(_logger, type)(message.rstrip(), *args, **kwargs)
227@typing.overload
228def _dt_as_utc(dt: None) -> None:
229 ...
232@typing.overload
233def _dt_as_utc(dt: datetime) -> datetime:
234 ...
237def _dt_as_utc(dt: t.Optional[datetime]) -> t.Optional[datetime]:
238 if dt is None:
239 return dt
241 if dt.tzinfo is None:
242 return dt.replace(tzinfo=timezone.utc)
243 elif dt.tzinfo != timezone.utc:
244 return dt.astimezone(timezone.utc)
246 return dt
249_TAccessorValue = t.TypeVar("_TAccessorValue")
252class _DictAccessorProperty(t.Generic[_TAccessorValue]):
253 """Baseclass for `environ_property` and `header_property`."""
255 read_only = False
257 def __init__(
258 self,
259 name: str,
260 default: t.Optional[_TAccessorValue] = None,
261 load_func: t.Optional[t.Callable[[str], _TAccessorValue]] = None,
262 dump_func: t.Optional[t.Callable[[_TAccessorValue], str]] = None,
263 read_only: t.Optional[bool] = None,
264 doc: t.Optional[str] = None,
265 ) -> None:
266 self.name = name
267 self.default = default
268 self.load_func = load_func
269 self.dump_func = dump_func
270 if read_only is not None:
271 self.read_only = read_only
272 self.__doc__ = doc
274 def lookup(self, instance: t.Any) -> t.MutableMapping[str, t.Any]:
275 raise NotImplementedError
277 @typing.overload
278 def __get__(
279 self, instance: None, owner: type
280 ) -> "_DictAccessorProperty[_TAccessorValue]":
281 ...
283 @typing.overload
284 def __get__(self, instance: t.Any, owner: type) -> _TAccessorValue:
285 ...
287 def __get__(
288 self, instance: t.Optional[t.Any], owner: type
289 ) -> t.Union[_TAccessorValue, "_DictAccessorProperty[_TAccessorValue]"]:
290 if instance is None:
291 return self
293 storage = self.lookup(instance)
295 if self.name not in storage:
296 return self.default # type: ignore
298 value = storage[self.name]
300 if self.load_func is not None:
301 try:
302 return self.load_func(value)
303 except (ValueError, TypeError):
304 return self.default # type: ignore
306 return value # type: ignore
308 def __set__(self, instance: t.Any, value: _TAccessorValue) -> None:
309 if self.read_only:
310 raise AttributeError("read only property")
312 if self.dump_func is not None:
313 self.lookup(instance)[self.name] = self.dump_func(value)
314 else:
315 self.lookup(instance)[self.name] = value
317 def __delete__(self, instance: t.Any) -> None:
318 if self.read_only:
319 raise AttributeError("read only property")
321 self.lookup(instance).pop(self.name, None)
323 def __repr__(self) -> str:
324 return f"<{type(self).__name__} {self.name}>"
327def _cookie_quote(b: bytes) -> bytes:
328 buf = bytearray()
329 all_legal = True
330 _lookup = _cookie_quoting_map.get
331 _push = buf.extend
333 for char_int in b:
334 char = char_int.to_bytes(1, sys.byteorder)
335 if char not in _legal_cookie_chars:
336 all_legal = False
337 char = _lookup(char, char)
338 _push(char)
340 if all_legal:
341 return bytes(buf)
342 return bytes(b'"' + buf + b'"')
345def _cookie_unquote(b: bytes) -> bytes:
346 if len(b) < 2:
347 return b
348 if b[:1] != b'"' or b[-1:] != b'"':
349 return b
351 b = b[1:-1]
353 i = 0
354 n = len(b)
355 rv = bytearray()
356 _push = rv.extend
358 while 0 <= i < n:
359 o_match = _octal_re.search(b, i)
360 q_match = _quote_re.search(b, i)
361 if not o_match and not q_match:
362 rv.extend(b[i:])
363 break
364 j = k = -1
365 if o_match:
366 j = o_match.start(0)
367 if q_match:
368 k = q_match.start(0)
369 if q_match and (not o_match or k < j):
370 _push(b[i:k])
371 _push(b[k + 1 : k + 2])
372 i = k + 2
373 else:
374 _push(b[i:j])
375 rv.append(int(b[j + 1 : j + 4], 8))
376 i = j + 4
378 return bytes(rv)
381def _cookie_parse_impl(b: bytes) -> t.Iterator[t.Tuple[bytes, bytes]]:
382 """Lowlevel cookie parsing facility that operates on bytes."""
383 i = 0
384 n = len(b)
386 while i < n:
387 match = _cookie_re.search(b + b";", i)
388 if not match:
389 break
391 key = match.group("key").strip()
392 value = match.group("val") or b""
393 i = match.end(0)
395 yield key, _cookie_unquote(value)
398def _encode_idna(domain: str) -> bytes:
399 # If we're given bytes, make sure they fit into ASCII
400 if isinstance(domain, bytes):
401 domain.decode("ascii")
402 return domain
404 # Otherwise check if it's already ascii, then return
405 try:
406 return domain.encode("ascii")
407 except UnicodeError:
408 pass
410 # Otherwise encode each part separately
411 return b".".join(p.encode("idna") for p in domain.split("."))
414def _decode_idna(domain: t.Union[str, bytes]) -> str:
415 # If the input is a string try to encode it to ascii to do the idna
416 # decoding. If that fails because of a unicode error, then we
417 # already have a decoded idna domain.
418 if isinstance(domain, str):
419 try:
420 domain = domain.encode("ascii")
421 except UnicodeError:
422 return domain # type: ignore
424 # Decode each part separately. If a part fails, try to decode it
425 # with ascii and silently ignore errors. This makes sense because
426 # the idna codec does not have error handling.
427 def decode_part(part: bytes) -> str:
428 try:
429 return part.decode("idna")
430 except UnicodeError:
431 return part.decode("ascii", "ignore")
433 return ".".join(decode_part(p) for p in domain.split(b"."))
436@typing.overload
437def _make_cookie_domain(domain: None) -> None:
438 ...
441@typing.overload
442def _make_cookie_domain(domain: str) -> bytes:
443 ...
446def _make_cookie_domain(domain: t.Optional[str]) -> t.Optional[bytes]:
447 if domain is None:
448 return None
449 domain = _encode_idna(domain)
450 if b":" in domain:
451 domain = domain.split(b":", 1)[0]
452 if b"." in domain:
453 return domain
454 raise ValueError(
455 "Setting 'domain' for a cookie on a server running locally (ex: "
456 "localhost) is not supported by complying browsers. You should "
457 "have something like: '127.0.0.1 localhost dev.localhost' on "
458 "your hosts file and then point your server to run on "
459 "'dev.localhost' and also set 'domain' for 'dev.localhost'"
460 )
463def _easteregg(app: t.Optional["WSGIApplication"] = None) -> "WSGIApplication":
464 """Like the name says. But who knows how it works?"""
466 def bzzzzzzz(gyver: bytes) -> str:
467 import base64
468 import zlib
470 return zlib.decompress(base64.b64decode(gyver)).decode("ascii")
472 gyver = "\n".join(
473 [
474 x + (77 - len(x)) * " "
475 for x in bzzzzzzz(
476 b"""
477eJyFlzuOJDkMRP06xRjymKgDJCDQStBYT8BCgK4gTwfQ2fcFs2a2FzvZk+hvlcRvRJD148efHt9m
4789Xz94dRY5hGt1nrYcXx7us9qlcP9HHNh28rz8dZj+q4rynVFFPdlY4zH873NKCexrDM6zxxRymzz
4794QIxzK4bth1PV7+uHn6WXZ5C4ka/+prFzx3zWLMHAVZb8RRUxtFXI5DTQ2n3Hi2sNI+HK43AOWSY
480jmEzE4naFp58PdzhPMdslLVWHTGUVpSxImw+pS/D+JhzLfdS1j7PzUMxij+mc2U0I9zcbZ/HcZxc
481q1QjvvcThMYFnp93agEx392ZdLJWXbi/Ca4Oivl4h/Y1ErEqP+lrg7Xa4qnUKu5UE9UUA4xeqLJ5
482jWlPKJvR2yhRI7xFPdzPuc6adXu6ovwXwRPXXnZHxlPtkSkqWHilsOrGrvcVWXgGP3daXomCj317
4838P2UOw/NnA0OOikZyFf3zZ76eN9QXNwYdD8f8/LdBRFg0BO3bB+Pe/+G8er8tDJv83XTkj7WeMBJ
484v/rnAfdO51d6sFglfi8U7zbnr0u9tyJHhFZNXYfH8Iafv2Oa+DT6l8u9UYlajV/hcEgk1x8E8L/r
485XJXl2SK+GJCxtnyhVKv6GFCEB1OO3f9YWAIEbwcRWv/6RPpsEzOkXURMN37J0PoCSYeBnJQd9Giu
486LxYQJNlYPSo/iTQwgaihbART7Fcyem2tTSCcwNCs85MOOpJtXhXDe0E7zgZJkcxWTar/zEjdIVCk
487iXy87FW6j5aGZhttDBoAZ3vnmlkx4q4mMmCdLtnHkBXFMCReqthSGkQ+MDXLLCpXwBs0t+sIhsDI
488tjBB8MwqYQpLygZ56rRHHpw+OAVyGgaGRHWy2QfXez+ZQQTTBkmRXdV/A9LwH6XGZpEAZU8rs4pE
4891R4FQ3Uwt8RKEtRc0/CrANUoes3EzM6WYcFyskGZ6UTHJWenBDS7h163Eo2bpzqxNE9aVgEM2CqI
490GAJe9Yra4P5qKmta27VjzYdR04Vc7KHeY4vs61C0nbywFmcSXYjzBHdiEjraS7PGG2jHHTpJUMxN
491Jlxr3pUuFvlBWLJGE3GcA1/1xxLcHmlO+LAXbhrXah1tD6Ze+uqFGdZa5FM+3eHcKNaEarutAQ0A
492QMAZHV+ve6LxAwWnXbbSXEG2DmCX5ijeLCKj5lhVFBrMm+ryOttCAeFpUdZyQLAQkA06RLs56rzG
4938MID55vqr/g64Qr/wqwlE0TVxgoiZhHrbY2h1iuuyUVg1nlkpDrQ7Vm1xIkI5XRKLedN9EjzVchu
494jQhXcVkjVdgP2O99QShpdvXWoSwkp5uMwyjt3jiWCqWGSiaaPAzohjPanXVLbM3x0dNskJsaCEyz
495DTKIs+7WKJD4ZcJGfMhLFBf6hlbnNkLEePF8Cx2o2kwmYF4+MzAxa6i+6xIQkswOqGO+3x9NaZX8
496MrZRaFZpLeVTYI9F/djY6DDVVs340nZGmwrDqTCiiqD5luj3OzwpmQCiQhdRYowUYEA3i1WWGwL4
497GCtSoO4XbIPFeKGU13XPkDf5IdimLpAvi2kVDVQbzOOa4KAXMFlpi/hV8F6IDe0Y2reg3PuNKT3i
498RYhZqtkQZqSB2Qm0SGtjAw7RDwaM1roESC8HWiPxkoOy0lLTRFG39kvbLZbU9gFKFRvixDZBJmpi
499Xyq3RE5lW00EJjaqwp/v3EByMSpVZYsEIJ4APaHmVtpGSieV5CALOtNUAzTBiw81GLgC0quyzf6c
500NlWknzJeCsJ5fup2R4d8CYGN77mu5vnO1UqbfElZ9E6cR6zbHjgsr9ly18fXjZoPeDjPuzlWbFwS
501pdvPkhntFvkc13qb9094LL5NrA3NIq3r9eNnop9DizWOqCEbyRBFJTHn6Tt3CG1o8a4HevYh0XiJ
502sR0AVVHuGuMOIfbuQ/OKBkGRC6NJ4u7sbPX8bG/n5sNIOQ6/Y/BX3IwRlTSabtZpYLB85lYtkkgm
503p1qXK3Du2mnr5INXmT/78KI12n11EFBkJHHp0wJyLe9MvPNUGYsf+170maayRoy2lURGHAIapSpQ
504krEDuNoJCHNlZYhKpvw4mspVWxqo415n8cD62N9+EfHrAvqQnINStetek7RY2Urv8nxsnGaZfRr/
505nhXbJ6m/yl1LzYqscDZA9QHLNbdaSTTr+kFg3bC0iYbX/eQy0Bv3h4B50/SGYzKAXkCeOLI3bcAt
506mj2Z/FM1vQWgDynsRwNvrWnJHlespkrp8+vO1jNaibm+PhqXPPv30YwDZ6jApe3wUjFQobghvW9p
5077f2zLkGNv8b191cD/3vs9Q833z8t"""
508 ).splitlines()
509 ]
510 )
512 def easteregged(
513 environ: "WSGIEnvironment", start_response: "StartResponse"
514 ) -> t.Iterable[bytes]:
515 def injecting_start_response(
516 status: str, headers: t.List[t.Tuple[str, str]], exc_info: t.Any = None
517 ) -> t.Callable[[bytes], t.Any]:
518 headers.append(("X-Powered-By", "Werkzeug"))
519 return start_response(status, headers, exc_info)
521 if app is not None and environ.get("QUERY_STRING") != "macgybarchakku":
522 return app(environ, injecting_start_response)
523 injecting_start_response("200 OK", [("Content-Type", "text/html")])
524 return [
525 f"""\
526<!doctype html>
527<html lang=en>
528<head>
529<title>About Werkzeug</title>
530<style type="text/css">
531 body {{ font: 15px Georgia, serif; text-align: center; }}
532 a {{ color: #333; text-decoration: none; }}
533 h1 {{ font-size: 30px; margin: 20px 0 10px 0; }}
534 p {{ margin: 0 0 30px 0; }}
535 pre {{ font: 11px 'Consolas', 'Monaco', monospace; line-height: 0.95; }}
536</style>
537</head>
538<body>
539<h1><a href="http://werkzeug.pocoo.org/">Werkzeug</a></h1>
540<p>the Swiss Army knife of Python web development.</p>
541<pre>{gyver}\n\n\n</pre>
542</body>
543</html>""".encode(
544 "latin1"
545 )
546 ]
548 return easteregged