Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/_internal.py: 42%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import logging
4import re
5import sys
6import typing as t
7from datetime import datetime
8from datetime import timezone
10if t.TYPE_CHECKING:
11 from _typeshed.wsgi import WSGIEnvironment
13 from .wrappers.request import Request
15_logger: logging.Logger | None = None
18class _Missing:
19 def __repr__(self) -> str:
20 return "no value"
22 def __reduce__(self) -> str:
23 return "_missing"
26_missing = _Missing()
29def _wsgi_decoding_dance(s: str) -> str:
30 return s.encode("latin1").decode(errors="replace")
33def _wsgi_encoding_dance(s: str) -> str:
34 return s.encode().decode("latin1")
37def _get_environ(obj: WSGIEnvironment | Request) -> WSGIEnvironment:
38 env = getattr(obj, "environ", obj)
39 assert isinstance(
40 env, dict
41 ), f"{type(obj).__name__!r} is not a WSGI environment (has to be a dict)"
42 return env
45def _has_level_handler(logger: logging.Logger) -> bool:
46 """Check if there is a handler in the logging chain that will handle
47 the given logger's effective level.
48 """
49 level = logger.getEffectiveLevel()
50 current = logger
52 while current:
53 if any(handler.level <= level for handler in current.handlers):
54 return True
56 if not current.propagate:
57 break
59 current = current.parent # type: ignore
61 return False
64class _ColorStreamHandler(logging.StreamHandler): # type: ignore[type-arg]
65 """On Windows, wrap stream with Colorama for ANSI style support."""
67 def __init__(self) -> None:
68 try:
69 import colorama
70 except ImportError:
71 stream = None
72 else:
73 stream = colorama.AnsiToWin32(sys.stderr)
75 super().__init__(stream)
78def _log(type: str, message: str, *args: t.Any, **kwargs: t.Any) -> None:
79 """Log a message to the 'werkzeug' logger.
81 The logger is created the first time it is needed. If there is no
82 level set, it is set to :data:`logging.INFO`. If there is no handler
83 for the logger's effective level, a :class:`logging.StreamHandler`
84 is added.
85 """
86 global _logger
88 if _logger is None:
89 _logger = logging.getLogger("werkzeug")
91 if _logger.level == logging.NOTSET:
92 _logger.setLevel(logging.INFO)
94 if not _has_level_handler(_logger):
95 _logger.addHandler(_ColorStreamHandler())
97 getattr(_logger, type)(message.rstrip(), *args, **kwargs)
100@t.overload
101def _dt_as_utc(dt: None) -> None: ...
104@t.overload
105def _dt_as_utc(dt: datetime) -> datetime: ...
108def _dt_as_utc(dt: datetime | None) -> datetime | None:
109 if dt is None:
110 return dt
112 if dt.tzinfo is None:
113 return dt.replace(tzinfo=timezone.utc)
114 elif dt.tzinfo != timezone.utc:
115 return dt.astimezone(timezone.utc)
117 return dt
120_TAccessorValue = t.TypeVar("_TAccessorValue")
123class _DictAccessorProperty(t.Generic[_TAccessorValue]):
124 """Baseclass for `environ_property` and `header_property`."""
126 read_only = False
128 def __init__(
129 self,
130 name: str,
131 default: _TAccessorValue | None = None,
132 load_func: t.Callable[[str], _TAccessorValue] | None = None,
133 dump_func: t.Callable[[_TAccessorValue], str] | None = None,
134 read_only: bool | None = None,
135 doc: str | None = None,
136 ) -> None:
137 self.name = name
138 self.default = default
139 self.load_func = load_func
140 self.dump_func = dump_func
141 if read_only is not None:
142 self.read_only = read_only
143 self.__doc__ = doc
145 def lookup(self, instance: t.Any) -> t.MutableMapping[str, t.Any]:
146 raise NotImplementedError
148 @t.overload
149 def __get__(
150 self, instance: None, owner: type
151 ) -> _DictAccessorProperty[_TAccessorValue]: ...
153 @t.overload
154 def __get__(self, instance: t.Any, owner: type) -> _TAccessorValue: ...
156 def __get__(
157 self, instance: t.Any | None, owner: type
158 ) -> _TAccessorValue | _DictAccessorProperty[_TAccessorValue]:
159 if instance is None:
160 return self
162 storage = self.lookup(instance)
164 if self.name not in storage:
165 return self.default # type: ignore
167 value = storage[self.name]
169 if self.load_func is not None:
170 try:
171 return self.load_func(value)
172 except (ValueError, TypeError):
173 return self.default # type: ignore
175 return value # type: ignore
177 def __set__(self, instance: t.Any, value: _TAccessorValue) -> None:
178 if self.read_only:
179 raise AttributeError("read only property")
181 if self.dump_func is not None:
182 self.lookup(instance)[self.name] = self.dump_func(value)
183 else:
184 self.lookup(instance)[self.name] = value
186 def __delete__(self, instance: t.Any) -> None:
187 if self.read_only:
188 raise AttributeError("read only property")
190 self.lookup(instance).pop(self.name, None)
192 def __repr__(self) -> str:
193 return f"<{type(self).__name__} {self.name}>"
196_plain_int_re = re.compile(r"-?\d+", re.ASCII)
199def _plain_int(value: str) -> int:
200 """Parse an int only if it is only ASCII digits and ``-``.
202 This disallows ``+``, ``_``, and non-ASCII digits, which are accepted by ``int`` but
203 are not allowed in HTTP header values.
205 Any leading or trailing whitespace is stripped
206 """
207 value = value.strip()
208 if _plain_int_re.fullmatch(value) is None:
209 raise ValueError
211 return int(value)