1from __future__ import annotations
2
3import logging
4import re
5import sys
6import typing as t
7from datetime import datetime
8from datetime import timezone
9
10if t.TYPE_CHECKING:
11 from _typeshed.wsgi import WSGIEnvironment
12
13 from .wrappers.request import Request
14
15_logger: logging.Logger | None = None
16
17
18class _Missing:
19 def __repr__(self) -> str:
20 return "no value"
21
22 def __reduce__(self) -> str:
23 return "_missing"
24
25
26_missing = _Missing()
27
28
29def _wsgi_decoding_dance(s: str) -> str:
30 return s.encode("latin1").decode(errors="replace")
31
32
33def _wsgi_encoding_dance(s: str) -> str:
34 return s.encode().decode("latin1")
35
36
37def _get_environ(obj: WSGIEnvironment | Request) -> WSGIEnvironment:
38 env = getattr(obj, "environ", obj)
39 assert isinstance(
40 env, dict
41 ), f"{type(obj).__name__!r} is not a WSGI environment (has to be a dict)"
42 return env
43
44
45def _has_level_handler(logger: logging.Logger) -> bool:
46 """Check if there is a handler in the logging chain that will handle
47 the given logger's effective level.
48 """
49 level = logger.getEffectiveLevel()
50 current = logger
51
52 while current:
53 if any(handler.level <= level for handler in current.handlers):
54 return True
55
56 if not current.propagate:
57 break
58
59 current = current.parent # type: ignore
60
61 return False
62
63
64class _ColorStreamHandler(logging.StreamHandler): # type: ignore[type-arg]
65 """On Windows, wrap stream with Colorama for ANSI style support."""
66
67 def __init__(self) -> None:
68 try:
69 import colorama
70 except ImportError:
71 stream = None
72 else:
73 stream = colorama.AnsiToWin32(sys.stderr)
74
75 super().__init__(stream)
76
77
78def _log(type: str, message: str, *args: t.Any, **kwargs: t.Any) -> None:
79 """Log a message to the 'werkzeug' logger.
80
81 The logger is created the first time it is needed. If there is no
82 level set, it is set to :data:`logging.INFO`. If there is no handler
83 for the logger's effective level, a :class:`logging.StreamHandler`
84 is added.
85 """
86 global _logger
87
88 if _logger is None:
89 _logger = logging.getLogger("werkzeug")
90
91 if _logger.level == logging.NOTSET:
92 _logger.setLevel(logging.INFO)
93
94 if not _has_level_handler(_logger):
95 _logger.addHandler(_ColorStreamHandler())
96
97 getattr(_logger, type)(message.rstrip(), *args, **kwargs)
98
99
100@t.overload
101def _dt_as_utc(dt: None) -> None: ...
102
103
104@t.overload
105def _dt_as_utc(dt: datetime) -> datetime: ...
106
107
108def _dt_as_utc(dt: datetime | None) -> datetime | None:
109 if dt is None:
110 return dt
111
112 if dt.tzinfo is None:
113 return dt.replace(tzinfo=timezone.utc)
114 elif dt.tzinfo != timezone.utc:
115 return dt.astimezone(timezone.utc)
116
117 return dt
118
119
120_TAccessorValue = t.TypeVar("_TAccessorValue")
121
122
123class _DictAccessorProperty(t.Generic[_TAccessorValue]):
124 """Baseclass for `environ_property` and `header_property`."""
125
126 read_only = False
127
128 def __init__(
129 self,
130 name: str,
131 default: _TAccessorValue | None = None,
132 load_func: t.Callable[[str], _TAccessorValue] | None = None,
133 dump_func: t.Callable[[_TAccessorValue], str] | None = None,
134 read_only: bool | None = None,
135 doc: str | None = None,
136 ) -> None:
137 self.name = name
138 self.default = default
139 self.load_func = load_func
140 self.dump_func = dump_func
141 if read_only is not None:
142 self.read_only = read_only
143 self.__doc__ = doc
144
145 def lookup(self, instance: t.Any) -> t.MutableMapping[str, t.Any]:
146 raise NotImplementedError
147
148 @t.overload
149 def __get__(
150 self, instance: None, owner: type
151 ) -> _DictAccessorProperty[_TAccessorValue]: ...
152
153 @t.overload
154 def __get__(self, instance: t.Any, owner: type) -> _TAccessorValue: ...
155
156 def __get__(
157 self, instance: t.Any | None, owner: type
158 ) -> _TAccessorValue | _DictAccessorProperty[_TAccessorValue]:
159 if instance is None:
160 return self
161
162 storage = self.lookup(instance)
163
164 if self.name not in storage:
165 return self.default # type: ignore
166
167 value = storage[self.name]
168
169 if self.load_func is not None:
170 try:
171 return self.load_func(value)
172 except (ValueError, TypeError):
173 return self.default # type: ignore
174
175 return value # type: ignore
176
177 def __set__(self, instance: t.Any, value: _TAccessorValue) -> None:
178 if self.read_only:
179 raise AttributeError("read only property")
180
181 if self.dump_func is not None:
182 self.lookup(instance)[self.name] = self.dump_func(value)
183 else:
184 self.lookup(instance)[self.name] = value
185
186 def __delete__(self, instance: t.Any) -> None:
187 if self.read_only:
188 raise AttributeError("read only property")
189
190 self.lookup(instance).pop(self.name, None)
191
192 def __repr__(self) -> str:
193 return f"<{type(self).__name__} {self.name}>"
194
195
196_plain_int_re = re.compile(r"-?\d+", re.ASCII)
197
198
199def _plain_int(value: str) -> int:
200 """Parse an int only if it is only ASCII digits and ``-``.
201
202 This disallows ``+``, ``_``, and non-ASCII digits, which are accepted by ``int`` but
203 are not allowed in HTTP header values.
204
205 Any leading or trailing whitespace is stripped
206 """
207 value = value.strip()
208 if _plain_int_re.fullmatch(value) is None:
209 raise ValueError
210
211 return int(value)