Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/local.py: 64%
274 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1import copy
2import math
3import operator
4import typing as t
5from contextvars import ContextVar
6from functools import partial
7from functools import update_wrapper
8from operator import attrgetter
10from .wsgi import ClosingIterator
12if t.TYPE_CHECKING:
13 from _typeshed.wsgi import StartResponse
14 from _typeshed.wsgi import WSGIApplication
15 from _typeshed.wsgi import WSGIEnvironment
17T = t.TypeVar("T")
18F = t.TypeVar("F", bound=t.Callable[..., t.Any])
21def release_local(local: t.Union["Local", "LocalStack"]) -> None:
22 """Release the data for the current context in a :class:`Local` or
23 :class:`LocalStack` without using a :class:`LocalManager`.
25 This should not be needed for modern use cases, and may be removed
26 in the future.
28 .. versionadded:: 0.6.1
29 """
30 local.__release_local__()
33class Local:
34 """Create a namespace of context-local data. This wraps a
35 :class:`ContextVar` containing a :class:`dict` value.
37 This may incur a performance penalty compared to using individual
38 context vars, as it has to copy data to avoid mutating the dict
39 between nested contexts.
41 :param context_var: The :class:`~contextvars.ContextVar` to use as
42 storage for this local. If not given, one will be created.
43 Context vars not created at the global scope may interfere with
44 garbage collection.
46 .. versionchanged:: 2.0
47 Uses ``ContextVar`` instead of a custom storage implementation.
48 """
50 __slots__ = ("__storage",)
52 def __init__(
53 self, context_var: t.Optional[ContextVar[t.Dict[str, t.Any]]] = None
54 ) -> None:
55 if context_var is None:
56 # A ContextVar not created at global scope interferes with
57 # Python's garbage collection. However, a local only makes
58 # sense defined at the global scope as well, in which case
59 # the GC issue doesn't seem relevant.
60 context_var = ContextVar(f"werkzeug.Local<{id(self)}>.storage")
62 object.__setattr__(self, "_Local__storage", context_var)
64 def __iter__(self) -> t.Iterator[t.Tuple[str, t.Any]]:
65 return iter(self.__storage.get({}).items())
67 def __call__(
68 self, name: str, *, unbound_message: t.Optional[str] = None
69 ) -> "LocalProxy":
70 """Create a :class:`LocalProxy` that access an attribute on this
71 local namespace.
73 :param name: Proxy this attribute.
74 :param unbound_message: The error message that the proxy will
75 show if the attribute isn't set.
76 """
77 return LocalProxy(self, name, unbound_message=unbound_message)
79 def __release_local__(self) -> None:
80 self.__storage.set({})
82 def __getattr__(self, name: str) -> t.Any:
83 values = self.__storage.get({})
85 if name in values:
86 return values[name]
88 raise AttributeError(name)
90 def __setattr__(self, name: str, value: t.Any) -> None:
91 values = self.__storage.get({}).copy()
92 values[name] = value
93 self.__storage.set(values)
95 def __delattr__(self, name: str) -> None:
96 values = self.__storage.get({})
98 if name in values:
99 values = values.copy()
100 del values[name]
101 self.__storage.set(values)
102 else:
103 raise AttributeError(name)
106class LocalStack(t.Generic[T]):
107 """Create a stack of context-local data. This wraps a
108 :class:`ContextVar` containing a :class:`list` value.
110 This may incur a performance penalty compared to using individual
111 context vars, as it has to copy data to avoid mutating the list
112 between nested contexts.
114 :param context_var: The :class:`~contextvars.ContextVar` to use as
115 storage for this local. If not given, one will be created.
116 Context vars not created at the global scope may interfere with
117 garbage collection.
119 .. versionchanged:: 2.0
120 Uses ``ContextVar`` instead of a custom storage implementation.
122 .. versionadded:: 0.6.1
123 """
125 __slots__ = ("_storage",)
127 def __init__(self, context_var: t.Optional[ContextVar[t.List[T]]] = None) -> None:
128 if context_var is None:
129 # A ContextVar not created at global scope interferes with
130 # Python's garbage collection. However, a local only makes
131 # sense defined at the global scope as well, in which case
132 # the GC issue doesn't seem relevant.
133 context_var = ContextVar(f"werkzeug.LocalStack<{id(self)}>.storage")
135 self._storage = context_var
137 def __release_local__(self) -> None:
138 self._storage.set([])
140 def push(self, obj: T) -> t.List[T]:
141 """Add a new item to the top of the stack."""
142 stack = self._storage.get([]).copy()
143 stack.append(obj)
144 self._storage.set(stack)
145 return stack
147 def pop(self) -> t.Optional[T]:
148 """Remove the top item from the stack and return it. If the
149 stack is empty, return ``None``.
150 """
151 stack = self._storage.get([])
153 if len(stack) == 0:
154 return None
156 rv = stack[-1]
157 self._storage.set(stack[:-1])
158 return rv
160 @property
161 def top(self) -> t.Optional[T]:
162 """The topmost item on the stack. If the stack is empty,
163 `None` is returned.
164 """
165 stack = self._storage.get([])
167 if len(stack) == 0:
168 return None
170 return stack[-1]
172 def __call__(
173 self, name: t.Optional[str] = None, *, unbound_message: t.Optional[str] = None
174 ) -> "LocalProxy":
175 """Create a :class:`LocalProxy` that accesses the top of this
176 local stack.
178 :param name: If given, the proxy access this attribute of the
179 top item, rather than the item itself.
180 :param unbound_message: The error message that the proxy will
181 show if the stack is empty.
182 """
183 return LocalProxy(self, name, unbound_message=unbound_message)
186class LocalManager:
187 """Manage releasing the data for the current context in one or more
188 :class:`Local` and :class:`LocalStack` objects.
190 This should not be needed for modern use cases, and may be removed
191 in the future.
193 :param locals: A local or list of locals to manage.
195 .. versionchanged:: 2.0
196 ``ident_func`` is deprecated and will be removed in Werkzeug
197 2.1.
199 .. versionchanged:: 0.7
200 The ``ident_func`` parameter was added.
202 .. versionchanged:: 0.6.1
203 The :func:`release_local` function can be used instead of a
204 manager.
205 """
207 __slots__ = ("locals",)
209 def __init__(
210 self,
211 locals: t.Optional[
212 t.Union[Local, LocalStack, t.Iterable[t.Union[Local, LocalStack]]]
213 ] = None,
214 ) -> None:
215 if locals is None:
216 self.locals = []
217 elif isinstance(locals, Local):
218 self.locals = [locals]
219 else:
220 self.locals = list(locals) # type: ignore[arg-type]
222 def cleanup(self) -> None:
223 """Release the data in the locals for this context. Call this at
224 the end of each request or use :meth:`make_middleware`.
225 """
226 for local in self.locals:
227 release_local(local)
229 def make_middleware(self, app: "WSGIApplication") -> "WSGIApplication":
230 """Wrap a WSGI application so that local data is released
231 automatically after the response has been sent for a request.
232 """
234 def application(
235 environ: "WSGIEnvironment", start_response: "StartResponse"
236 ) -> t.Iterable[bytes]:
237 return ClosingIterator(app(environ, start_response), self.cleanup)
239 return application
241 def middleware(self, func: "WSGIApplication") -> "WSGIApplication":
242 """Like :meth:`make_middleware` but used as a decorator on the
243 WSGI application function.
245 .. code-block:: python
247 @manager.middleware
248 def application(environ, start_response):
249 ...
250 """
251 return update_wrapper(self.make_middleware(func), func)
253 def __repr__(self) -> str:
254 return f"<{type(self).__name__} storages: {len(self.locals)}>"
257class _ProxyLookup:
258 """Descriptor that handles proxied attribute lookup for
259 :class:`LocalProxy`.
261 :param f: The built-in function this attribute is accessed through.
262 Instead of looking up the special method, the function call
263 is redone on the object.
264 :param fallback: Return this function if the proxy is unbound
265 instead of raising a :exc:`RuntimeError`.
266 :param is_attr: This proxied name is an attribute, not a function.
267 Call the fallback immediately to get the value.
268 :param class_value: Value to return when accessed from the
269 ``LocalProxy`` class directly. Used for ``__doc__`` so building
270 docs still works.
271 """
273 __slots__ = ("bind_f", "fallback", "is_attr", "class_value", "name")
275 def __init__(
276 self,
277 f: t.Optional[t.Callable] = None,
278 fallback: t.Optional[t.Callable] = None,
279 class_value: t.Optional[t.Any] = None,
280 is_attr: bool = False,
281 ) -> None:
282 bind_f: t.Optional[t.Callable[["LocalProxy", t.Any], t.Callable]]
284 if hasattr(f, "__get__"):
285 # A Python function, can be turned into a bound method.
287 def bind_f(instance: "LocalProxy", obj: t.Any) -> t.Callable:
288 return f.__get__(obj, type(obj)) # type: ignore
290 elif f is not None:
291 # A C function, use partial to bind the first argument.
293 def bind_f(instance: "LocalProxy", obj: t.Any) -> t.Callable:
294 return partial(f, obj)
296 else:
297 # Use getattr, which will produce a bound method.
298 bind_f = None
300 self.bind_f = bind_f
301 self.fallback = fallback
302 self.class_value = class_value
303 self.is_attr = is_attr
305 def __set_name__(self, owner: "LocalProxy", name: str) -> None:
306 self.name = name
308 def __get__(self, instance: "LocalProxy", owner: t.Optional[type] = None) -> t.Any:
309 if instance is None:
310 if self.class_value is not None:
311 return self.class_value
313 return self
315 try:
316 obj = instance._get_current_object()
317 except RuntimeError:
318 if self.fallback is None:
319 raise
321 fallback = self.fallback.__get__(instance, owner)
323 if self.is_attr:
324 # __class__ and __doc__ are attributes, not methods.
325 # Call the fallback to get the value.
326 return fallback()
328 return fallback
330 if self.bind_f is not None:
331 return self.bind_f(instance, obj)
333 return getattr(obj, self.name)
335 def __repr__(self) -> str:
336 return f"proxy {self.name}"
338 def __call__(self, instance: "LocalProxy", *args: t.Any, **kwargs: t.Any) -> t.Any:
339 """Support calling unbound methods from the class. For example,
340 this happens with ``copy.copy``, which does
341 ``type(x).__copy__(x)``. ``type(x)`` can't be proxied, so it
342 returns the proxy type and descriptor.
343 """
344 return self.__get__(instance, type(instance))(*args, **kwargs)
347class _ProxyIOp(_ProxyLookup):
348 """Look up an augmented assignment method on a proxied object. The
349 method is wrapped to return the proxy instead of the object.
350 """
352 __slots__ = ()
354 def __init__(
355 self, f: t.Optional[t.Callable] = None, fallback: t.Optional[t.Callable] = None
356 ) -> None:
357 super().__init__(f, fallback)
359 def bind_f(instance: "LocalProxy", obj: t.Any) -> t.Callable:
360 def i_op(self: t.Any, other: t.Any) -> "LocalProxy":
361 f(self, other) # type: ignore
362 return instance
364 return i_op.__get__(obj, type(obj)) # type: ignore
366 self.bind_f = bind_f
369def _l_to_r_op(op: F) -> F:
370 """Swap the argument order to turn an l-op into an r-op."""
372 def r_op(obj: t.Any, other: t.Any) -> t.Any:
373 return op(other, obj)
375 return t.cast(F, r_op)
378def _identity(o: T) -> T:
379 return o
382class LocalProxy(t.Generic[T]):
383 """A proxy to the object bound to a context-local object. All
384 operations on the proxy are forwarded to the bound object. If no
385 object is bound, a ``RuntimeError`` is raised.
387 :param local: The context-local object that provides the proxied
388 object.
389 :param name: Proxy this attribute from the proxied object.
390 :param unbound_message: The error message to show if the
391 context-local object is unbound.
393 Proxy a :class:`~contextvars.ContextVar` to make it easier to
394 access. Pass a name to proxy that attribute.
396 .. code-block:: python
398 _request_var = ContextVar("request")
399 request = LocalProxy(_request_var)
400 session = LocalProxy(_request_var, "session")
402 Proxy an attribute on a :class:`Local` namespace by calling the
403 local with the attribute name:
405 .. code-block:: python
407 data = Local()
408 user = data("user")
410 Proxy the top item on a :class:`LocalStack` by calling the local.
411 Pass a name to proxy that attribute.
413 .. code-block::
415 app_stack = LocalStack()
416 current_app = app_stack()
417 g = app_stack("g")
419 Pass a function to proxy the return value from that function. This
420 was previously used to access attributes of local objects before
421 that was supported directly.
423 .. code-block:: python
425 session = LocalProxy(lambda: request.session)
427 ``__repr__`` and ``__class__`` are proxied, so ``repr(x)`` and
428 ``isinstance(x, cls)`` will look like the proxied object. Use
429 ``issubclass(type(x), LocalProxy)`` to check if an object is a
430 proxy.
432 .. code-block:: python
434 repr(user) # <User admin>
435 isinstance(user, User) # True
436 issubclass(type(user), LocalProxy) # True
438 .. versionchanged:: 2.2.2
439 ``__wrapped__`` is set when wrapping an object, not only when
440 wrapping a function, to prevent doctest from failing.
442 .. versionchanged:: 2.2
443 Can proxy a ``ContextVar`` or ``LocalStack`` directly.
445 .. versionchanged:: 2.2
446 The ``name`` parameter can be used with any proxied object, not
447 only ``Local``.
449 .. versionchanged:: 2.2
450 Added the ``unbound_message`` parameter.
452 .. versionchanged:: 2.0
453 Updated proxied attributes and methods to reflect the current
454 data model.
456 .. versionchanged:: 0.6.1
457 The class can be instantiated with a callable.
458 """
460 __slots__ = ("__wrapped", "_get_current_object")
462 _get_current_object: t.Callable[[], T]
463 """Return the current object this proxy is bound to. If the proxy is
464 unbound, this raises a ``RuntimeError``.
466 This should be used if you need to pass the object to something that
467 doesn't understand the proxy. It can also be useful for performance
468 if you are accessing the object multiple times in a function, rather
469 than going through the proxy multiple times.
470 """
472 def __init__(
473 self,
474 local: t.Union[ContextVar[T], Local, LocalStack[T], t.Callable[[], T]],
475 name: t.Optional[str] = None,
476 *,
477 unbound_message: t.Optional[str] = None,
478 ) -> None:
479 if name is None:
480 get_name = _identity
481 else:
482 get_name = attrgetter(name) # type: ignore[assignment]
484 if unbound_message is None:
485 unbound_message = "object is not bound"
487 if isinstance(local, Local):
488 if name is None:
489 raise TypeError("'name' is required when proxying a 'Local' object.")
491 def _get_current_object() -> T:
492 try:
493 return get_name(local) # type: ignore[return-value]
494 except AttributeError:
495 raise RuntimeError(unbound_message) from None
497 elif isinstance(local, LocalStack):
499 def _get_current_object() -> T:
500 obj = local.top # type: ignore[union-attr]
502 if obj is None:
503 raise RuntimeError(unbound_message)
505 return get_name(obj)
507 elif isinstance(local, ContextVar):
509 def _get_current_object() -> T:
510 try:
511 obj = local.get() # type: ignore[union-attr]
512 except LookupError:
513 raise RuntimeError(unbound_message) from None
515 return get_name(obj)
517 elif callable(local):
519 def _get_current_object() -> T:
520 return get_name(local()) # type: ignore
522 else:
523 raise TypeError(f"Don't know how to proxy '{type(local)}'.")
525 object.__setattr__(self, "_LocalProxy__wrapped", local)
526 object.__setattr__(self, "_get_current_object", _get_current_object)
528 __doc__ = _ProxyLookup( # type: ignore
529 class_value=__doc__, fallback=lambda self: type(self).__doc__, is_attr=True
530 )
531 __wrapped__ = _ProxyLookup(
532 fallback=lambda self: self._LocalProxy__wrapped, is_attr=True
533 )
534 # __del__ should only delete the proxy
535 __repr__ = _ProxyLookup( # type: ignore
536 repr, fallback=lambda self: f"<{type(self).__name__} unbound>"
537 )
538 __str__ = _ProxyLookup(str) # type: ignore
539 __bytes__ = _ProxyLookup(bytes)
540 __format__ = _ProxyLookup() # type: ignore
541 __lt__ = _ProxyLookup(operator.lt)
542 __le__ = _ProxyLookup(operator.le)
543 __eq__ = _ProxyLookup(operator.eq) # type: ignore
544 __ne__ = _ProxyLookup(operator.ne) # type: ignore
545 __gt__ = _ProxyLookup(operator.gt)
546 __ge__ = _ProxyLookup(operator.ge)
547 __hash__ = _ProxyLookup(hash) # type: ignore
548 __bool__ = _ProxyLookup(bool, fallback=lambda self: False)
549 __getattr__ = _ProxyLookup(getattr)
550 # __getattribute__ triggered through __getattr__
551 __setattr__ = _ProxyLookup(setattr) # type: ignore
552 __delattr__ = _ProxyLookup(delattr) # type: ignore
553 __dir__ = _ProxyLookup(dir, fallback=lambda self: []) # type: ignore
554 # __get__ (proxying descriptor not supported)
555 # __set__ (descriptor)
556 # __delete__ (descriptor)
557 # __set_name__ (descriptor)
558 # __objclass__ (descriptor)
559 # __slots__ used by proxy itself
560 # __dict__ (__getattr__)
561 # __weakref__ (__getattr__)
562 # __init_subclass__ (proxying metaclass not supported)
563 # __prepare__ (metaclass)
564 __class__ = _ProxyLookup(
565 fallback=lambda self: type(self), is_attr=True
566 ) # type: ignore
567 __instancecheck__ = _ProxyLookup(lambda self, other: isinstance(other, self))
568 __subclasscheck__ = _ProxyLookup(lambda self, other: issubclass(other, self))
569 # __class_getitem__ triggered through __getitem__
570 __call__ = _ProxyLookup(lambda self, *args, **kwargs: self(*args, **kwargs))
571 __len__ = _ProxyLookup(len)
572 __length_hint__ = _ProxyLookup(operator.length_hint)
573 __getitem__ = _ProxyLookup(operator.getitem)
574 __setitem__ = _ProxyLookup(operator.setitem)
575 __delitem__ = _ProxyLookup(operator.delitem)
576 # __missing__ triggered through __getitem__
577 __iter__ = _ProxyLookup(iter)
578 __next__ = _ProxyLookup(next)
579 __reversed__ = _ProxyLookup(reversed)
580 __contains__ = _ProxyLookup(operator.contains)
581 __add__ = _ProxyLookup(operator.add)
582 __sub__ = _ProxyLookup(operator.sub)
583 __mul__ = _ProxyLookup(operator.mul)
584 __matmul__ = _ProxyLookup(operator.matmul)
585 __truediv__ = _ProxyLookup(operator.truediv)
586 __floordiv__ = _ProxyLookup(operator.floordiv)
587 __mod__ = _ProxyLookup(operator.mod)
588 __divmod__ = _ProxyLookup(divmod)
589 __pow__ = _ProxyLookup(pow)
590 __lshift__ = _ProxyLookup(operator.lshift)
591 __rshift__ = _ProxyLookup(operator.rshift)
592 __and__ = _ProxyLookup(operator.and_)
593 __xor__ = _ProxyLookup(operator.xor)
594 __or__ = _ProxyLookup(operator.or_)
595 __radd__ = _ProxyLookup(_l_to_r_op(operator.add))
596 __rsub__ = _ProxyLookup(_l_to_r_op(operator.sub))
597 __rmul__ = _ProxyLookup(_l_to_r_op(operator.mul))
598 __rmatmul__ = _ProxyLookup(_l_to_r_op(operator.matmul))
599 __rtruediv__ = _ProxyLookup(_l_to_r_op(operator.truediv))
600 __rfloordiv__ = _ProxyLookup(_l_to_r_op(operator.floordiv))
601 __rmod__ = _ProxyLookup(_l_to_r_op(operator.mod))
602 __rdivmod__ = _ProxyLookup(_l_to_r_op(divmod))
603 __rpow__ = _ProxyLookup(_l_to_r_op(pow))
604 __rlshift__ = _ProxyLookup(_l_to_r_op(operator.lshift))
605 __rrshift__ = _ProxyLookup(_l_to_r_op(operator.rshift))
606 __rand__ = _ProxyLookup(_l_to_r_op(operator.and_))
607 __rxor__ = _ProxyLookup(_l_to_r_op(operator.xor))
608 __ror__ = _ProxyLookup(_l_to_r_op(operator.or_))
609 __iadd__ = _ProxyIOp(operator.iadd)
610 __isub__ = _ProxyIOp(operator.isub)
611 __imul__ = _ProxyIOp(operator.imul)
612 __imatmul__ = _ProxyIOp(operator.imatmul)
613 __itruediv__ = _ProxyIOp(operator.itruediv)
614 __ifloordiv__ = _ProxyIOp(operator.ifloordiv)
615 __imod__ = _ProxyIOp(operator.imod)
616 __ipow__ = _ProxyIOp(operator.ipow)
617 __ilshift__ = _ProxyIOp(operator.ilshift)
618 __irshift__ = _ProxyIOp(operator.irshift)
619 __iand__ = _ProxyIOp(operator.iand)
620 __ixor__ = _ProxyIOp(operator.ixor)
621 __ior__ = _ProxyIOp(operator.ior)
622 __neg__ = _ProxyLookup(operator.neg)
623 __pos__ = _ProxyLookup(operator.pos)
624 __abs__ = _ProxyLookup(abs)
625 __invert__ = _ProxyLookup(operator.invert)
626 __complex__ = _ProxyLookup(complex)
627 __int__ = _ProxyLookup(int)
628 __float__ = _ProxyLookup(float)
629 __index__ = _ProxyLookup(operator.index)
630 __round__ = _ProxyLookup(round)
631 __trunc__ = _ProxyLookup(math.trunc)
632 __floor__ = _ProxyLookup(math.floor)
633 __ceil__ = _ProxyLookup(math.ceil)
634 __enter__ = _ProxyLookup()
635 __exit__ = _ProxyLookup()
636 __await__ = _ProxyLookup()
637 __aiter__ = _ProxyLookup()
638 __anext__ = _ProxyLookup()
639 __aenter__ = _ProxyLookup()
640 __aexit__ = _ProxyLookup()
641 __copy__ = _ProxyLookup(copy.copy)
642 __deepcopy__ = _ProxyLookup(copy.deepcopy)
643 # __getnewargs_ex__ (pickle through proxy not supported)
644 # __getnewargs__ (pickle)
645 # __getstate__ (pickle)
646 # __setstate__ (pickle)
647 # __reduce__ (pickle)
648 # __reduce_ex__ (pickle)