Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/_internal.py: 47%
176 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-09 06:08 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-09 06:08 +0000
1from __future__ import annotations
3import logging
4import operator
5import re
6import sys
7import typing as t
8from datetime import datetime
9from datetime import timezone
11if t.TYPE_CHECKING:
12 from _typeshed.wsgi import WSGIEnvironment
13 from .wrappers.request import Request
15_logger: logging.Logger | None = None
18class _Missing:
19 def __repr__(self) -> str:
20 return "no value"
22 def __reduce__(self) -> str:
23 return "_missing"
26_missing = _Missing()
29@t.overload
30def _make_encode_wrapper(reference: str) -> t.Callable[[str], str]:
31 ...
34@t.overload
35def _make_encode_wrapper(reference: bytes) -> t.Callable[[str], bytes]:
36 ...
39def _make_encode_wrapper(reference: t.AnyStr) -> t.Callable[[str], t.AnyStr]:
40 """Create a function that will be called with a string argument. If
41 the reference is bytes, values will be encoded to bytes.
42 """
43 if isinstance(reference, str):
44 return lambda x: x
46 return operator.methodcaller("encode", "latin1")
49def _check_str_tuple(value: tuple[t.AnyStr, ...]) -> None:
50 """Ensure tuple items are all strings or all bytes."""
51 if not value:
52 return
54 item_type = str if isinstance(value[0], str) else bytes
56 if any(not isinstance(item, item_type) for item in value):
57 raise TypeError(f"Cannot mix str and bytes arguments (got {value!r})")
60_default_encoding = sys.getdefaultencoding()
63def _to_bytes(
64 x: str | bytes, charset: str = _default_encoding, errors: str = "strict"
65) -> bytes:
66 if x is None or isinstance(x, bytes):
67 return x
69 if isinstance(x, (bytearray, memoryview)):
70 return bytes(x)
72 if isinstance(x, str):
73 return x.encode(charset, errors)
75 raise TypeError("Expected bytes")
78@t.overload
79def _to_str( # type: ignore
80 x: None,
81 charset: str | None = ...,
82 errors: str = ...,
83 allow_none_charset: bool = ...,
84) -> None:
85 ...
88@t.overload
89def _to_str(
90 x: t.Any,
91 charset: str | None = ...,
92 errors: str = ...,
93 allow_none_charset: bool = ...,
94) -> str:
95 ...
98def _to_str(
99 x: t.Any | None,
100 charset: str | None = _default_encoding,
101 errors: str = "strict",
102 allow_none_charset: bool = False,
103) -> str | bytes | None:
104 if x is None or isinstance(x, str):
105 return x
107 if not isinstance(x, (bytes, bytearray)):
108 return str(x)
110 if charset is None:
111 if allow_none_charset:
112 return x
114 return x.decode(charset, errors) # type: ignore
117def _wsgi_decoding_dance(
118 s: str, charset: str = "utf-8", errors: str = "replace"
119) -> str:
120 return s.encode("latin1").decode(charset, errors)
123def _wsgi_encoding_dance(s: str, charset: str = "utf-8", errors: str = "strict") -> str:
124 return s.encode(charset).decode("latin1", errors)
127def _get_environ(obj: WSGIEnvironment | Request) -> WSGIEnvironment:
128 env = getattr(obj, "environ", obj)
129 assert isinstance(
130 env, dict
131 ), f"{type(obj).__name__!r} is not a WSGI environment (has to be a dict)"
132 return env
135def _has_level_handler(logger: logging.Logger) -> bool:
136 """Check if there is a handler in the logging chain that will handle
137 the given logger's effective level.
138 """
139 level = logger.getEffectiveLevel()
140 current = logger
142 while current:
143 if any(handler.level <= level for handler in current.handlers):
144 return True
146 if not current.propagate:
147 break
149 current = current.parent # type: ignore
151 return False
154class _ColorStreamHandler(logging.StreamHandler):
155 """On Windows, wrap stream with Colorama for ANSI style support."""
157 def __init__(self) -> None:
158 try:
159 import colorama
160 except ImportError:
161 stream = None
162 else:
163 stream = colorama.AnsiToWin32(sys.stderr)
165 super().__init__(stream)
168def _log(type: str, message: str, *args: t.Any, **kwargs: t.Any) -> None:
169 """Log a message to the 'werkzeug' logger.
171 The logger is created the first time it is needed. If there is no
172 level set, it is set to :data:`logging.INFO`. If there is no handler
173 for the logger's effective level, a :class:`logging.StreamHandler`
174 is added.
175 """
176 global _logger
178 if _logger is None:
179 _logger = logging.getLogger("werkzeug")
181 if _logger.level == logging.NOTSET:
182 _logger.setLevel(logging.INFO)
184 if not _has_level_handler(_logger):
185 _logger.addHandler(_ColorStreamHandler())
187 getattr(_logger, type)(message.rstrip(), *args, **kwargs)
190@t.overload
191def _dt_as_utc(dt: None) -> None:
192 ...
195@t.overload
196def _dt_as_utc(dt: datetime) -> datetime:
197 ...
200def _dt_as_utc(dt: datetime | None) -> datetime | None:
201 if dt is None:
202 return dt
204 if dt.tzinfo is None:
205 return dt.replace(tzinfo=timezone.utc)
206 elif dt.tzinfo != timezone.utc:
207 return dt.astimezone(timezone.utc)
209 return dt
212_TAccessorValue = t.TypeVar("_TAccessorValue")
215class _DictAccessorProperty(t.Generic[_TAccessorValue]):
216 """Baseclass for `environ_property` and `header_property`."""
218 read_only = False
220 def __init__(
221 self,
222 name: str,
223 default: _TAccessorValue | None = None,
224 load_func: t.Callable[[str], _TAccessorValue] | None = None,
225 dump_func: t.Callable[[_TAccessorValue], str] | None = None,
226 read_only: bool | None = None,
227 doc: str | None = None,
228 ) -> None:
229 self.name = name
230 self.default = default
231 self.load_func = load_func
232 self.dump_func = dump_func
233 if read_only is not None:
234 self.read_only = read_only
235 self.__doc__ = doc
237 def lookup(self, instance: t.Any) -> t.MutableMapping[str, t.Any]:
238 raise NotImplementedError
240 @t.overload
241 def __get__(
242 self, instance: None, owner: type
243 ) -> _DictAccessorProperty[_TAccessorValue]:
244 ...
246 @t.overload
247 def __get__(self, instance: t.Any, owner: type) -> _TAccessorValue:
248 ...
250 def __get__(
251 self, instance: t.Any | None, owner: type
252 ) -> _TAccessorValue | _DictAccessorProperty[_TAccessorValue]:
253 if instance is None:
254 return self
256 storage = self.lookup(instance)
258 if self.name not in storage:
259 return self.default # type: ignore
261 value = storage[self.name]
263 if self.load_func is not None:
264 try:
265 return self.load_func(value)
266 except (ValueError, TypeError):
267 return self.default # type: ignore
269 return value # type: ignore
271 def __set__(self, instance: t.Any, value: _TAccessorValue) -> None:
272 if self.read_only:
273 raise AttributeError("read only property")
275 if self.dump_func is not None:
276 self.lookup(instance)[self.name] = self.dump_func(value)
277 else:
278 self.lookup(instance)[self.name] = value
280 def __delete__(self, instance: t.Any) -> None:
281 if self.read_only:
282 raise AttributeError("read only property")
284 self.lookup(instance).pop(self.name, None)
286 def __repr__(self) -> str:
287 return f"<{type(self).__name__} {self.name}>"
290def _decode_idna(domain: str) -> str:
291 try:
292 data = domain.encode("ascii")
293 except UnicodeEncodeError:
294 # If the domain is not ASCII, it's decoded already.
295 return domain
297 try:
298 # Try decoding in one shot.
299 return data.decode("idna")
300 except UnicodeDecodeError:
301 pass
303 # Decode each part separately, leaving invalid parts as punycode.
304 parts = []
306 for part in data.split(b"."):
307 try:
308 parts.append(part.decode("idna"))
309 except UnicodeDecodeError:
310 parts.append(part.decode("ascii"))
312 return ".".join(parts)
315_plain_int_re = re.compile(r"-?\d+", re.ASCII)
316_plain_float_re = re.compile(r"-?\d+\.\d+", re.ASCII)
319def _plain_int(value: str) -> int:
320 """Parse an int only if it is only ASCII digits and ``-``.
322 This disallows ``+``, ``_``, and non-ASCII digits, which are accepted by ``int`` but
323 are not allowed in HTTP header values.
324 """
325 if _plain_int_re.fullmatch(value) is None:
326 raise ValueError
328 return int(value)
331def _plain_float(value: str) -> float:
332 """Parse a float only if it is only ASCII digits and ``-``, and contains digits
333 before and after the ``.``.
335 This disallows ``+``, ``_``, non-ASCII digits, and ``.123``, which are accepted by
336 ``float`` but are not allowed in HTTP header values.
337 """
338 if _plain_float_re.fullmatch(value) is None:
339 raise ValueError
341 return float(value)