Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/_internal.py: 45%
115 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-09 07:17 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-09 07:17 +0000
1from __future__ import annotations
3import logging
4import re
5import sys
6import typing as t
7from datetime import datetime
8from datetime import timezone
10if t.TYPE_CHECKING:
11 from _typeshed.wsgi import WSGIEnvironment
12 from .wrappers.request import Request
14_logger: logging.Logger | None = None
17class _Missing:
18 def __repr__(self) -> str:
19 return "no value"
21 def __reduce__(self) -> str:
22 return "_missing"
25_missing = _Missing()
28def _wsgi_decoding_dance(s: str) -> str:
29 return s.encode("latin1").decode(errors="replace")
32def _wsgi_encoding_dance(s: str) -> str:
33 return s.encode().decode("latin1")
36def _get_environ(obj: WSGIEnvironment | Request) -> WSGIEnvironment:
37 env = getattr(obj, "environ", obj)
38 assert isinstance(
39 env, dict
40 ), f"{type(obj).__name__!r} is not a WSGI environment (has to be a dict)"
41 return env
44def _has_level_handler(logger: logging.Logger) -> bool:
45 """Check if there is a handler in the logging chain that will handle
46 the given logger's effective level.
47 """
48 level = logger.getEffectiveLevel()
49 current = logger
51 while current:
52 if any(handler.level <= level for handler in current.handlers):
53 return True
55 if not current.propagate:
56 break
58 current = current.parent # type: ignore
60 return False
63class _ColorStreamHandler(logging.StreamHandler):
64 """On Windows, wrap stream with Colorama for ANSI style support."""
66 def __init__(self) -> None:
67 try:
68 import colorama
69 except ImportError:
70 stream = None
71 else:
72 stream = colorama.AnsiToWin32(sys.stderr)
74 super().__init__(stream)
77def _log(type: str, message: str, *args: t.Any, **kwargs: t.Any) -> None:
78 """Log a message to the 'werkzeug' logger.
80 The logger is created the first time it is needed. If there is no
81 level set, it is set to :data:`logging.INFO`. If there is no handler
82 for the logger's effective level, a :class:`logging.StreamHandler`
83 is added.
84 """
85 global _logger
87 if _logger is None:
88 _logger = logging.getLogger("werkzeug")
90 if _logger.level == logging.NOTSET:
91 _logger.setLevel(logging.INFO)
93 if not _has_level_handler(_logger):
94 _logger.addHandler(_ColorStreamHandler())
96 getattr(_logger, type)(message.rstrip(), *args, **kwargs)
99@t.overload
100def _dt_as_utc(dt: None) -> None:
101 ...
104@t.overload
105def _dt_as_utc(dt: datetime) -> datetime:
106 ...
109def _dt_as_utc(dt: datetime | None) -> datetime | None:
110 if dt is None:
111 return dt
113 if dt.tzinfo is None:
114 return dt.replace(tzinfo=timezone.utc)
115 elif dt.tzinfo != timezone.utc:
116 return dt.astimezone(timezone.utc)
118 return dt
121_TAccessorValue = t.TypeVar("_TAccessorValue")
124class _DictAccessorProperty(t.Generic[_TAccessorValue]):
125 """Baseclass for `environ_property` and `header_property`."""
127 read_only = False
129 def __init__(
130 self,
131 name: str,
132 default: _TAccessorValue | None = None,
133 load_func: t.Callable[[str], _TAccessorValue] | None = None,
134 dump_func: t.Callable[[_TAccessorValue], str] | None = None,
135 read_only: bool | None = None,
136 doc: str | None = None,
137 ) -> None:
138 self.name = name
139 self.default = default
140 self.load_func = load_func
141 self.dump_func = dump_func
142 if read_only is not None:
143 self.read_only = read_only
144 self.__doc__ = doc
146 def lookup(self, instance: t.Any) -> t.MutableMapping[str, t.Any]:
147 raise NotImplementedError
149 @t.overload
150 def __get__(
151 self, instance: None, owner: type
152 ) -> _DictAccessorProperty[_TAccessorValue]:
153 ...
155 @t.overload
156 def __get__(self, instance: t.Any, owner: type) -> _TAccessorValue:
157 ...
159 def __get__(
160 self, instance: t.Any | None, owner: type
161 ) -> _TAccessorValue | _DictAccessorProperty[_TAccessorValue]:
162 if instance is None:
163 return self
165 storage = self.lookup(instance)
167 if self.name not in storage:
168 return self.default # type: ignore
170 value = storage[self.name]
172 if self.load_func is not None:
173 try:
174 return self.load_func(value)
175 except (ValueError, TypeError):
176 return self.default # type: ignore
178 return value # type: ignore
180 def __set__(self, instance: t.Any, value: _TAccessorValue) -> None:
181 if self.read_only:
182 raise AttributeError("read only property")
184 if self.dump_func is not None:
185 self.lookup(instance)[self.name] = self.dump_func(value)
186 else:
187 self.lookup(instance)[self.name] = value
189 def __delete__(self, instance: t.Any) -> None:
190 if self.read_only:
191 raise AttributeError("read only property")
193 self.lookup(instance).pop(self.name, None)
195 def __repr__(self) -> str:
196 return f"<{type(self).__name__} {self.name}>"
199_plain_int_re = re.compile(r"-?\d+", re.ASCII)
202def _plain_int(value: str) -> int:
203 """Parse an int only if it is only ASCII digits and ``-``.
205 This disallows ``+``, ``_``, and non-ASCII digits, which are accepted by ``int`` but
206 are not allowed in HTTP header values.
208 Any leading or trailing whitespace is stripped
209 """
210 value = value.strip()
211 if _plain_int_re.fullmatch(value) is None:
212 raise ValueError
214 return int(value)