1# util/langhelpers.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Routines to help with the creation, loading and introspection of
10modules, classes, hierarchies, attributes, functions, and methods.
11
12"""
13from __future__ import annotations
14
15import collections
16import enum
17from functools import update_wrapper
18import inspect
19import itertools
20import operator
21import re
22import sys
23import textwrap
24import threading
25import types
26from types import CodeType
27from typing import Any
28from typing import Callable
29from typing import cast
30from typing import Dict
31from typing import FrozenSet
32from typing import Generic
33from typing import Iterator
34from typing import List
35from typing import Mapping
36from typing import NoReturn
37from typing import Optional
38from typing import overload
39from typing import Sequence
40from typing import Set
41from typing import Tuple
42from typing import Type
43from typing import TYPE_CHECKING
44from typing import TypeVar
45from typing import Union
46import warnings
47
48from . import _collections
49from . import compat
50from ._has_cy import HAS_CYEXTENSION
51from .typing import Literal
52from .. import exc
53
54_T = TypeVar("_T")
55_T_co = TypeVar("_T_co", covariant=True)
56_F = TypeVar("_F", bound=Callable[..., Any])
57_MP = TypeVar("_MP", bound="memoized_property[Any]")
58_MA = TypeVar("_MA", bound="HasMemoized.memoized_attribute[Any]")
59_HP = TypeVar("_HP", bound="hybridproperty[Any]")
60_HM = TypeVar("_HM", bound="hybridmethod[Any]")
61
62
63if compat.py314:
64 # vendor a minimal form of get_annotations per
65 # https://github.com/python/cpython/issues/133684#issuecomment-2863841891
66
67 from annotationlib import call_annotate_function # type: ignore
68 from annotationlib import Format
69
70 def _get_and_call_annotate(obj, format): # noqa: A002
71 annotate = getattr(obj, "__annotate__", None)
72 if annotate is not None:
73 ann = call_annotate_function(annotate, format, owner=obj)
74 if not isinstance(ann, dict):
75 raise ValueError(f"{obj!r}.__annotate__ returned a non-dict")
76 return ann
77 return None
78
79 # this is ported from py3.13.0a7
80 _BASE_GET_ANNOTATIONS = type.__dict__["__annotations__"].__get__ # type: ignore # noqa: E501
81
82 def _get_dunder_annotations(obj):
83 if isinstance(obj, type):
84 try:
85 ann = _BASE_GET_ANNOTATIONS(obj)
86 except AttributeError:
87 # For static types, the descriptor raises AttributeError.
88 return {}
89 else:
90 ann = getattr(obj, "__annotations__", None)
91 if ann is None:
92 return {}
93
94 if not isinstance(ann, dict):
95 raise ValueError(
96 f"{obj!r}.__annotations__ is neither a dict nor None"
97 )
98 return dict(ann)
99
100 def _vendored_get_annotations(
101 obj: Any, *, format: Format # noqa: A002
102 ) -> Mapping[str, Any]:
103 """A sparse implementation of annotationlib.get_annotations()"""
104
105 try:
106 ann = _get_dunder_annotations(obj)
107 except Exception:
108 pass
109 else:
110 if ann is not None:
111 return dict(ann)
112
113 # But if __annotations__ threw a NameError, we try calling __annotate__
114 ann = _get_and_call_annotate(obj, format)
115 if ann is None:
116 # If that didn't work either, we have a very weird object:
117 # evaluating
118 # __annotations__ threw NameError and there is no __annotate__.
119 # In that case,
120 # we fall back to trying __annotations__ again.
121 ann = _get_dunder_annotations(obj)
122
123 if ann is None:
124 if isinstance(obj, type) or callable(obj):
125 return {}
126 raise TypeError(f"{obj!r} does not have annotations")
127
128 if not ann:
129 return {}
130
131 return dict(ann)
132
133 def get_annotations(obj: Any) -> Mapping[str, Any]:
134 # FORWARDREF has the effect of giving us ForwardRefs and not
135 # actually trying to evaluate the annotations. We need this so
136 # that the annotations act as much like
137 # "from __future__ import annotations" as possible, which is going
138 # away in future python as a separate mode
139 return _vendored_get_annotations(obj, format=Format.FORWARDREF)
140
141elif compat.py310:
142
143 def get_annotations(obj: Any) -> Mapping[str, Any]:
144 return inspect.get_annotations(obj)
145
146else:
147
148 def get_annotations(obj: Any) -> Mapping[str, Any]:
149 # it's been observed that cls.__annotations__ can be non present.
150 # it's not clear what causes this, running under tox py37/38 it
151 # happens, running straight pytest it doesnt
152
153 # https://docs.python.org/3/howto/annotations.html#annotations-howto
154 if isinstance(obj, type):
155 ann = obj.__dict__.get("__annotations__", None)
156 else:
157 ann = getattr(obj, "__annotations__", None)
158
159 if ann is None:
160 return _collections.EMPTY_DICT
161 else:
162 return cast("Mapping[str, Any]", ann)
163
164
165def md5_hex(x: Any) -> str:
166 x = x.encode("utf-8")
167 m = compat.md5_not_for_security()
168 m.update(x)
169 return cast(str, m.hexdigest())
170
171
172class safe_reraise:
173 """Reraise an exception after invoking some
174 handler code.
175
176 Stores the existing exception info before
177 invoking so that it is maintained across a potential
178 coroutine context switch.
179
180 e.g.::
181
182 try:
183 sess.commit()
184 except:
185 with safe_reraise():
186 sess.rollback()
187
188 TODO: we should at some point evaluate current behaviors in this regard
189 based on current greenlet, gevent/eventlet implementations in Python 3, and
190 also see the degree to which our own asyncio (based on greenlet also) is
191 impacted by this. .rollback() will cause IO / context switch to occur in
192 all these scenarios; what happens to the exception context from an
193 "except:" block if we don't explicitly store it? Original issue was #2703.
194
195 """
196
197 __slots__ = ("_exc_info",)
198
199 _exc_info: Union[
200 None,
201 Tuple[
202 Type[BaseException],
203 BaseException,
204 types.TracebackType,
205 ],
206 Tuple[None, None, None],
207 ]
208
209 def __enter__(self) -> None:
210 self._exc_info = sys.exc_info()
211
212 def __exit__(
213 self,
214 type_: Optional[Type[BaseException]],
215 value: Optional[BaseException],
216 traceback: Optional[types.TracebackType],
217 ) -> NoReturn:
218 assert self._exc_info is not None
219 # see #2703 for notes
220 if type_ is None:
221 exc_type, exc_value, exc_tb = self._exc_info
222 assert exc_value is not None
223 self._exc_info = None # remove potential circular references
224 raise exc_value.with_traceback(exc_tb)
225 else:
226 self._exc_info = None # remove potential circular references
227 assert value is not None
228 raise value.with_traceback(traceback)
229
230
231def walk_subclasses(cls: Type[_T]) -> Iterator[Type[_T]]:
232 seen: Set[Any] = set()
233
234 stack = [cls]
235 while stack:
236 cls = stack.pop()
237 if cls in seen:
238 continue
239 else:
240 seen.add(cls)
241 stack.extend(cls.__subclasses__())
242 yield cls
243
244
245def string_or_unprintable(element: Any) -> str:
246 if isinstance(element, str):
247 return element
248 else:
249 try:
250 return str(element)
251 except Exception:
252 return "unprintable element %r" % element
253
254
255def clsname_as_plain_name(
256 cls: Type[Any], use_name: Optional[str] = None
257) -> str:
258 name = use_name or cls.__name__
259 return " ".join(n.lower() for n in re.findall(r"([A-Z][a-z]+|SQL)", name))
260
261
262def method_is_overridden(
263 instance_or_cls: Union[Type[Any], object],
264 against_method: Callable[..., Any],
265) -> bool:
266 """Return True if the two class methods don't match."""
267
268 if not isinstance(instance_or_cls, type):
269 current_cls = instance_or_cls.__class__
270 else:
271 current_cls = instance_or_cls
272
273 method_name = against_method.__name__
274
275 current_method: types.MethodType = getattr(current_cls, method_name)
276
277 return current_method != against_method
278
279
280def decode_slice(slc: slice) -> Tuple[Any, ...]:
281 """decode a slice object as sent to __getitem__.
282
283 takes into account the 2.5 __index__() method, basically.
284
285 """
286 ret: List[Any] = []
287 for x in slc.start, slc.stop, slc.step:
288 if hasattr(x, "__index__"):
289 x = x.__index__()
290 ret.append(x)
291 return tuple(ret)
292
293
294def _unique_symbols(used: Sequence[str], *bases: str) -> Iterator[str]:
295 used_set = set(used)
296 for base in bases:
297 pool = itertools.chain(
298 (base,),
299 map(lambda i: base + str(i), range(1000)),
300 )
301 for sym in pool:
302 if sym not in used_set:
303 used_set.add(sym)
304 yield sym
305 break
306 else:
307 raise NameError("exhausted namespace for symbol base %s" % base)
308
309
310def map_bits(fn: Callable[[int], Any], n: int) -> Iterator[Any]:
311 """Call the given function given each nonzero bit from n."""
312
313 while n:
314 b = n & (~n + 1)
315 yield fn(b)
316 n ^= b
317
318
319_Fn = TypeVar("_Fn", bound="Callable[..., Any]")
320
321# this seems to be in flux in recent mypy versions
322
323
324def decorator(target: Callable[..., Any]) -> Callable[[_Fn], _Fn]:
325 """A signature-matching decorator factory."""
326
327 def decorate(fn: _Fn) -> _Fn:
328 if not inspect.isfunction(fn) and not inspect.ismethod(fn):
329 raise Exception("not a decoratable function")
330
331 # Python 3.14 defer creating __annotations__ until its used.
332 # We do not want to create __annotations__ now.
333 annofunc = getattr(fn, "__annotate__", None)
334 if annofunc is not None:
335 fn.__annotate__ = None # type: ignore[union-attr]
336 try:
337 spec = compat.inspect_getfullargspec(fn)
338 finally:
339 fn.__annotate__ = annofunc # type: ignore[union-attr]
340 else:
341 spec = compat.inspect_getfullargspec(fn)
342
343 # Do not generate code for annotations.
344 # update_wrapper() copies the annotation from fn to decorated.
345 # We use dummy defaults for code generation to avoid having
346 # copy of large globals for compiling.
347 # We copy __defaults__ and __kwdefaults__ from fn to decorated.
348 empty_defaults = (None,) * len(spec.defaults or ())
349 empty_kwdefaults = dict.fromkeys(spec.kwonlydefaults or ())
350 spec = spec._replace(
351 annotations={},
352 defaults=empty_defaults,
353 kwonlydefaults=empty_kwdefaults,
354 )
355
356 names = (
357 tuple(cast("Tuple[str, ...]", spec[0]))
358 + cast("Tuple[str, ...]", spec[1:3])
359 + (fn.__name__,)
360 )
361 targ_name, fn_name = _unique_symbols(names, "target", "fn")
362
363 metadata: Dict[str, Optional[str]] = dict(target=targ_name, fn=fn_name)
364 metadata.update(format_argspec_plus(spec, grouped=False))
365 metadata["name"] = fn.__name__
366
367 if inspect.iscoroutinefunction(fn):
368 metadata["prefix"] = "async "
369 metadata["target_prefix"] = "await "
370 else:
371 metadata["prefix"] = ""
372 metadata["target_prefix"] = ""
373
374 # look for __ positional arguments. This is a convention in
375 # SQLAlchemy that arguments should be passed positionally
376 # rather than as keyword
377 # arguments. note that apply_pos doesn't currently work in all cases
378 # such as when a kw-only indicator "*" is present, which is why
379 # we limit the use of this to just that case we can detect. As we add
380 # more kinds of methods that use @decorator, things may have to
381 # be further improved in this area
382 if "__" in repr(spec[0]):
383 code = (
384 """\
385%(prefix)sdef %(name)s%(grouped_args)s:
386 return %(target_prefix)s%(target)s(%(fn)s, %(apply_pos)s)
387"""
388 % metadata
389 )
390 else:
391 code = (
392 """\
393%(prefix)sdef %(name)s%(grouped_args)s:
394 return %(target_prefix)s%(target)s(%(fn)s, %(apply_kw)s)
395"""
396 % metadata
397 )
398
399 env: Dict[str, Any] = {
400 targ_name: target,
401 fn_name: fn,
402 "__name__": fn.__module__,
403 }
404
405 decorated = cast(
406 types.FunctionType,
407 _exec_code_in_env(code, env, fn.__name__),
408 )
409 decorated.__defaults__ = fn.__defaults__
410 decorated.__kwdefaults__ = fn.__kwdefaults__ # type: ignore
411 return update_wrapper(decorated, fn) # type: ignore[return-value]
412
413 return update_wrapper(decorate, target) # type: ignore[return-value]
414
415
416def _exec_code_in_env(
417 code: Union[str, types.CodeType], env: Dict[str, Any], fn_name: str
418) -> Callable[..., Any]:
419 exec(code, env)
420 return env[fn_name] # type: ignore[no-any-return]
421
422
423_PF = TypeVar("_PF")
424_TE = TypeVar("_TE")
425
426
427class PluginLoader:
428 def __init__(
429 self, group: str, auto_fn: Optional[Callable[..., Any]] = None
430 ):
431 self.group = group
432 self.impls: Dict[str, Any] = {}
433 self.auto_fn = auto_fn
434
435 def clear(self):
436 self.impls.clear()
437
438 def load(self, name: str) -> Any:
439 if name in self.impls:
440 return self.impls[name]()
441
442 if self.auto_fn:
443 loader = self.auto_fn(name)
444 if loader:
445 self.impls[name] = loader
446 return loader()
447
448 for impl in compat.importlib_metadata_get(self.group):
449 if impl.name == name:
450 self.impls[name] = impl.load
451 return impl.load()
452
453 raise exc.NoSuchModuleError(
454 "Can't load plugin: %s:%s" % (self.group, name)
455 )
456
457 def register(self, name: str, modulepath: str, objname: str) -> None:
458 def load():
459 mod = __import__(modulepath)
460 for token in modulepath.split(".")[1:]:
461 mod = getattr(mod, token)
462 return getattr(mod, objname)
463
464 self.impls[name] = load
465
466 def deregister(self, name: str) -> None:
467 del self.impls[name]
468
469
470def _inspect_func_args(fn):
471 try:
472 co_varkeywords = inspect.CO_VARKEYWORDS
473 except AttributeError:
474 # https://docs.python.org/3/library/inspect.html
475 # The flags are specific to CPython, and may not be defined in other
476 # Python implementations. Furthermore, the flags are an implementation
477 # detail, and can be removed or deprecated in future Python releases.
478 spec = compat.inspect_getfullargspec(fn)
479 return spec[0], bool(spec[2])
480 else:
481 # use fn.__code__ plus flags to reduce method call overhead
482 co = fn.__code__
483 nargs = co.co_argcount
484 return (
485 list(co.co_varnames[:nargs]),
486 bool(co.co_flags & co_varkeywords),
487 )
488
489
490@overload
491def get_cls_kwargs(
492 cls: type,
493 *,
494 _set: Optional[Set[str]] = None,
495 raiseerr: Literal[True] = ...,
496) -> Set[str]: ...
497
498
499@overload
500def get_cls_kwargs(
501 cls: type, *, _set: Optional[Set[str]] = None, raiseerr: bool = False
502) -> Optional[Set[str]]: ...
503
504
505def get_cls_kwargs(
506 cls: type, *, _set: Optional[Set[str]] = None, raiseerr: bool = False
507) -> Optional[Set[str]]:
508 r"""Return the full set of inherited kwargs for the given `cls`.
509
510 Probes a class's __init__ method, collecting all named arguments. If the
511 __init__ defines a \**kwargs catch-all, then the constructor is presumed
512 to pass along unrecognized keywords to its base classes, and the
513 collection process is repeated recursively on each of the bases.
514
515 Uses a subset of inspect.getfullargspec() to cut down on method overhead,
516 as this is used within the Core typing system to create copies of type
517 objects which is a performance-sensitive operation.
518
519 No anonymous tuple arguments please !
520
521 """
522 toplevel = _set is None
523 if toplevel:
524 _set = set()
525 assert _set is not None
526
527 ctr = cls.__dict__.get("__init__", False)
528
529 has_init = (
530 ctr
531 and isinstance(ctr, types.FunctionType)
532 and isinstance(ctr.__code__, types.CodeType)
533 )
534
535 if has_init:
536 names, has_kw = _inspect_func_args(ctr)
537 _set.update(names)
538
539 if not has_kw and not toplevel:
540 if raiseerr:
541 raise TypeError(
542 f"given cls {cls} doesn't have an __init__ method"
543 )
544 else:
545 return None
546 else:
547 has_kw = False
548
549 if not has_init or has_kw:
550 for c in cls.__bases__:
551 if get_cls_kwargs(c, _set=_set) is None:
552 break
553
554 _set.discard("self")
555 return _set
556
557
558def get_func_kwargs(func: Callable[..., Any]) -> List[str]:
559 """Return the set of legal kwargs for the given `func`.
560
561 Uses getargspec so is safe to call for methods, functions,
562 etc.
563
564 """
565
566 return compat.inspect_getfullargspec(func)[0]
567
568
569def get_callable_argspec(
570 fn: Callable[..., Any], no_self: bool = False, _is_init: bool = False
571) -> compat.FullArgSpec:
572 """Return the argument signature for any callable.
573
574 All pure-Python callables are accepted, including
575 functions, methods, classes, objects with __call__;
576 builtins and other edge cases like functools.partial() objects
577 raise a TypeError.
578
579 """
580 if inspect.isbuiltin(fn):
581 raise TypeError("Can't inspect builtin: %s" % fn)
582 elif inspect.isfunction(fn):
583 if _is_init and no_self:
584 spec = compat.inspect_getfullargspec(fn)
585 return compat.FullArgSpec(
586 spec.args[1:],
587 spec.varargs,
588 spec.varkw,
589 spec.defaults,
590 spec.kwonlyargs,
591 spec.kwonlydefaults,
592 spec.annotations,
593 )
594 else:
595 return compat.inspect_getfullargspec(fn)
596 elif inspect.ismethod(fn):
597 if no_self and (_is_init or fn.__self__):
598 spec = compat.inspect_getfullargspec(fn.__func__)
599 return compat.FullArgSpec(
600 spec.args[1:],
601 spec.varargs,
602 spec.varkw,
603 spec.defaults,
604 spec.kwonlyargs,
605 spec.kwonlydefaults,
606 spec.annotations,
607 )
608 else:
609 return compat.inspect_getfullargspec(fn.__func__)
610 elif inspect.isclass(fn):
611 return get_callable_argspec(
612 fn.__init__, no_self=no_self, _is_init=True
613 )
614 elif hasattr(fn, "__func__"):
615 return compat.inspect_getfullargspec(fn.__func__)
616 elif hasattr(fn, "__call__"):
617 if inspect.ismethod(fn.__call__):
618 return get_callable_argspec(fn.__call__, no_self=no_self)
619 else:
620 raise TypeError("Can't inspect callable: %s" % fn)
621 else:
622 raise TypeError("Can't inspect callable: %s" % fn)
623
624
625def format_argspec_plus(
626 fn: Union[Callable[..., Any], compat.FullArgSpec], grouped: bool = True
627) -> Dict[str, Optional[str]]:
628 """Returns a dictionary of formatted, introspected function arguments.
629
630 A enhanced variant of inspect.formatargspec to support code generation.
631
632 fn
633 An inspectable callable or tuple of inspect getargspec() results.
634 grouped
635 Defaults to True; include (parens, around, argument) lists
636
637 Returns:
638
639 args
640 Full inspect.formatargspec for fn
641 self_arg
642 The name of the first positional argument, varargs[0], or None
643 if the function defines no positional arguments.
644 apply_pos
645 args, re-written in calling rather than receiving syntax. Arguments are
646 passed positionally.
647 apply_kw
648 Like apply_pos, except keyword-ish args are passed as keywords.
649 apply_pos_proxied
650 Like apply_pos but omits the self/cls argument
651
652 Example::
653
654 >>> format_argspec_plus(lambda self, a, b, c=3, **d: 123)
655 {'grouped_args': '(self, a, b, c=3, **d)',
656 'self_arg': 'self',
657 'apply_kw': '(self, a, b, c=c, **d)',
658 'apply_pos': '(self, a, b, c, **d)'}
659
660 """
661 if callable(fn):
662 spec = compat.inspect_getfullargspec(fn)
663 else:
664 spec = fn
665
666 args = compat.inspect_formatargspec(*spec)
667
668 apply_pos = compat.inspect_formatargspec(
669 spec[0], spec[1], spec[2], None, spec[4]
670 )
671
672 if spec[0]:
673 self_arg = spec[0][0]
674
675 apply_pos_proxied = compat.inspect_formatargspec(
676 spec[0][1:], spec[1], spec[2], None, spec[4]
677 )
678
679 elif spec[1]:
680 # I'm not sure what this is
681 self_arg = "%s[0]" % spec[1]
682
683 apply_pos_proxied = apply_pos
684 else:
685 self_arg = None
686 apply_pos_proxied = apply_pos
687
688 num_defaults = 0
689 if spec[3]:
690 num_defaults += len(cast(Tuple[Any], spec[3]))
691 if spec[4]:
692 num_defaults += len(spec[4])
693
694 name_args = spec[0] + spec[4]
695
696 defaulted_vals: Union[List[str], Tuple[()]]
697
698 if num_defaults:
699 defaulted_vals = name_args[0 - num_defaults :]
700 else:
701 defaulted_vals = ()
702
703 apply_kw = compat.inspect_formatargspec(
704 name_args,
705 spec[1],
706 spec[2],
707 defaulted_vals,
708 formatvalue=lambda x: "=" + str(x),
709 )
710
711 if spec[0]:
712 apply_kw_proxied = compat.inspect_formatargspec(
713 name_args[1:],
714 spec[1],
715 spec[2],
716 defaulted_vals,
717 formatvalue=lambda x: "=" + str(x),
718 )
719 else:
720 apply_kw_proxied = apply_kw
721
722 if grouped:
723 return dict(
724 grouped_args=args,
725 self_arg=self_arg,
726 apply_pos=apply_pos,
727 apply_kw=apply_kw,
728 apply_pos_proxied=apply_pos_proxied,
729 apply_kw_proxied=apply_kw_proxied,
730 )
731 else:
732 return dict(
733 grouped_args=args,
734 self_arg=self_arg,
735 apply_pos=apply_pos[1:-1],
736 apply_kw=apply_kw[1:-1],
737 apply_pos_proxied=apply_pos_proxied[1:-1],
738 apply_kw_proxied=apply_kw_proxied[1:-1],
739 )
740
741
742def format_argspec_init(method, grouped=True):
743 """format_argspec_plus with considerations for typical __init__ methods
744
745 Wraps format_argspec_plus with error handling strategies for typical
746 __init__ cases:
747
748 .. sourcecode:: text
749
750 object.__init__ -> (self)
751 other unreflectable (usually C) -> (self, *args, **kwargs)
752
753 """
754 if method is object.__init__:
755 grouped_args = "(self)"
756 args = "(self)" if grouped else "self"
757 proxied = "()" if grouped else ""
758 else:
759 try:
760 return format_argspec_plus(method, grouped=grouped)
761 except TypeError:
762 grouped_args = "(self, *args, **kwargs)"
763 args = grouped_args if grouped else "self, *args, **kwargs"
764 proxied = "(*args, **kwargs)" if grouped else "*args, **kwargs"
765 return dict(
766 self_arg="self",
767 grouped_args=grouped_args,
768 apply_pos=args,
769 apply_kw=args,
770 apply_pos_proxied=proxied,
771 apply_kw_proxied=proxied,
772 )
773
774
775def create_proxy_methods(
776 target_cls: Type[Any],
777 target_cls_sphinx_name: str,
778 proxy_cls_sphinx_name: str,
779 classmethods: Sequence[str] = (),
780 methods: Sequence[str] = (),
781 attributes: Sequence[str] = (),
782 use_intermediate_variable: Sequence[str] = (),
783) -> Callable[[_T], _T]:
784 """A class decorator indicating attributes should refer to a proxy
785 class.
786
787 This decorator is now a "marker" that does nothing at runtime. Instead,
788 it is consumed by the tools/generate_proxy_methods.py script to
789 statically generate proxy methods and attributes that are fully
790 recognized by typing tools such as mypy.
791
792 """
793
794 def decorate(cls):
795 return cls
796
797 return decorate
798
799
800def getargspec_init(method):
801 """inspect.getargspec with considerations for typical __init__ methods
802
803 Wraps inspect.getargspec with error handling for typical __init__ cases:
804
805 .. sourcecode:: text
806
807 object.__init__ -> (self)
808 other unreflectable (usually C) -> (self, *args, **kwargs)
809
810 """
811 try:
812 return compat.inspect_getfullargspec(method)
813 except TypeError:
814 if method is object.__init__:
815 return (["self"], None, None, None)
816 else:
817 return (["self"], "args", "kwargs", None)
818
819
820def unbound_method_to_callable(func_or_cls):
821 """Adjust the incoming callable such that a 'self' argument is not
822 required.
823
824 """
825
826 if isinstance(func_or_cls, types.MethodType) and not func_or_cls.__self__:
827 return func_or_cls.__func__
828 else:
829 return func_or_cls
830
831
832def generic_repr(
833 obj: Any,
834 additional_kw: Sequence[Tuple[str, Any]] = (),
835 to_inspect: Optional[Union[object, List[object]]] = None,
836 omit_kwarg: Sequence[str] = (),
837) -> str:
838 """Produce a __repr__() based on direct association of the __init__()
839 specification vs. same-named attributes present.
840
841 """
842 if to_inspect is None:
843 to_inspect = [obj]
844 else:
845 to_inspect = _collections.to_list(to_inspect)
846
847 missing = object()
848
849 pos_args = []
850 kw_args: _collections.OrderedDict[str, Any] = _collections.OrderedDict()
851 vargs = None
852 for i, insp in enumerate(to_inspect):
853 try:
854 spec = compat.inspect_getfullargspec(insp.__init__)
855 except TypeError:
856 continue
857 else:
858 default_len = len(spec.defaults) if spec.defaults else 0
859 if i == 0:
860 if spec.varargs:
861 vargs = spec.varargs
862 if default_len:
863 pos_args.extend(spec.args[1:-default_len])
864 else:
865 pos_args.extend(spec.args[1:])
866 else:
867 kw_args.update(
868 [(arg, missing) for arg in spec.args[1:-default_len]]
869 )
870
871 if default_len:
872 assert spec.defaults
873 kw_args.update(
874 [
875 (arg, default)
876 for arg, default in zip(
877 spec.args[-default_len:], spec.defaults
878 )
879 ]
880 )
881 output: List[str] = []
882
883 output.extend(repr(getattr(obj, arg, None)) for arg in pos_args)
884
885 if vargs is not None and hasattr(obj, vargs):
886 output.extend([repr(val) for val in getattr(obj, vargs)])
887
888 for arg, defval in kw_args.items():
889 if arg in omit_kwarg:
890 continue
891 try:
892 val = getattr(obj, arg, missing)
893 if val is not missing and val != defval:
894 output.append("%s=%r" % (arg, val))
895 except Exception:
896 pass
897
898 if additional_kw:
899 for arg, defval in additional_kw:
900 try:
901 val = getattr(obj, arg, missing)
902 if val is not missing and val != defval:
903 output.append("%s=%r" % (arg, val))
904 except Exception:
905 pass
906
907 return "%s(%s)" % (obj.__class__.__name__, ", ".join(output))
908
909
910class portable_instancemethod:
911 """Turn an instancemethod into a (parent, name) pair
912 to produce a serializable callable.
913
914 """
915
916 __slots__ = "target", "name", "kwargs", "__weakref__"
917
918 def __getstate__(self):
919 return {
920 "target": self.target,
921 "name": self.name,
922 "kwargs": self.kwargs,
923 }
924
925 def __setstate__(self, state):
926 self.target = state["target"]
927 self.name = state["name"]
928 self.kwargs = state.get("kwargs", ())
929
930 def __init__(self, meth, kwargs=()):
931 self.target = meth.__self__
932 self.name = meth.__name__
933 self.kwargs = kwargs
934
935 def __call__(self, *arg, **kw):
936 kw.update(self.kwargs)
937 return getattr(self.target, self.name)(*arg, **kw)
938
939
940def class_hierarchy(cls):
941 """Return an unordered sequence of all classes related to cls.
942
943 Traverses diamond hierarchies.
944
945 Fibs slightly: subclasses of builtin types are not returned. Thus
946 class_hierarchy(class A(object)) returns (A, object), not A plus every
947 class systemwide that derives from object.
948
949 """
950
951 hier = {cls}
952 process = list(cls.__mro__)
953 while process:
954 c = process.pop()
955 bases = (_ for _ in c.__bases__ if _ not in hier)
956
957 for b in bases:
958 process.append(b)
959 hier.add(b)
960
961 if c.__module__ == "builtins" or not hasattr(c, "__subclasses__"):
962 continue
963
964 for s in [
965 _
966 for _ in (
967 c.__subclasses__()
968 if not issubclass(c, type)
969 else c.__subclasses__(c)
970 )
971 if _ not in hier
972 ]:
973 process.append(s)
974 hier.add(s)
975 return list(hier)
976
977
978def iterate_attributes(cls):
979 """iterate all the keys and attributes associated
980 with a class, without using getattr().
981
982 Does not use getattr() so that class-sensitive
983 descriptors (i.e. property.__get__()) are not called.
984
985 """
986 keys = dir(cls)
987 for key in keys:
988 for c in cls.__mro__:
989 if key in c.__dict__:
990 yield (key, c.__dict__[key])
991 break
992
993
994def monkeypatch_proxied_specials(
995 into_cls,
996 from_cls,
997 skip=None,
998 only=None,
999 name="self.proxy",
1000 from_instance=None,
1001):
1002 """Automates delegation of __specials__ for a proxying type."""
1003
1004 if only:
1005 dunders = only
1006 else:
1007 if skip is None:
1008 skip = (
1009 "__slots__",
1010 "__del__",
1011 "__getattribute__",
1012 "__metaclass__",
1013 "__getstate__",
1014 "__setstate__",
1015 )
1016 dunders = [
1017 m
1018 for m in dir(from_cls)
1019 if (
1020 m.startswith("__")
1021 and m.endswith("__")
1022 and not hasattr(into_cls, m)
1023 and m not in skip
1024 )
1025 ]
1026
1027 for method in dunders:
1028 try:
1029 maybe_fn = getattr(from_cls, method)
1030 if not hasattr(maybe_fn, "__call__"):
1031 continue
1032 maybe_fn = getattr(maybe_fn, "__func__", maybe_fn)
1033 fn = cast(types.FunctionType, maybe_fn)
1034
1035 except AttributeError:
1036 continue
1037 try:
1038 spec = compat.inspect_getfullargspec(fn)
1039 fn_args = compat.inspect_formatargspec(spec[0])
1040 d_args = compat.inspect_formatargspec(spec[0][1:])
1041 except TypeError:
1042 fn_args = "(self, *args, **kw)"
1043 d_args = "(*args, **kw)"
1044
1045 py = (
1046 "def %(method)s%(fn_args)s: "
1047 "return %(name)s.%(method)s%(d_args)s" % locals()
1048 )
1049
1050 env: Dict[str, types.FunctionType] = (
1051 from_instance is not None and {name: from_instance} or {}
1052 )
1053 exec(py, env)
1054 try:
1055 env[method].__defaults__ = fn.__defaults__
1056 except AttributeError:
1057 pass
1058 setattr(into_cls, method, env[method])
1059
1060
1061def methods_equivalent(meth1, meth2):
1062 """Return True if the two methods are the same implementation."""
1063
1064 return getattr(meth1, "__func__", meth1) is getattr(
1065 meth2, "__func__", meth2
1066 )
1067
1068
1069def as_interface(obj, cls=None, methods=None, required=None):
1070 """Ensure basic interface compliance for an instance or dict of callables.
1071
1072 Checks that ``obj`` implements public methods of ``cls`` or has members
1073 listed in ``methods``. If ``required`` is not supplied, implementing at
1074 least one interface method is sufficient. Methods present on ``obj`` that
1075 are not in the interface are ignored.
1076
1077 If ``obj`` is a dict and ``dict`` does not meet the interface
1078 requirements, the keys of the dictionary are inspected. Keys present in
1079 ``obj`` that are not in the interface will raise TypeErrors.
1080
1081 Raises TypeError if ``obj`` does not meet the interface criteria.
1082
1083 In all passing cases, an object with callable members is returned. In the
1084 simple case, ``obj`` is returned as-is; if dict processing kicks in then
1085 an anonymous class is returned.
1086
1087 obj
1088 A type, instance, or dictionary of callables.
1089 cls
1090 Optional, a type. All public methods of cls are considered the
1091 interface. An ``obj`` instance of cls will always pass, ignoring
1092 ``required``..
1093 methods
1094 Optional, a sequence of method names to consider as the interface.
1095 required
1096 Optional, a sequence of mandatory implementations. If omitted, an
1097 ``obj`` that provides at least one interface method is considered
1098 sufficient. As a convenience, required may be a type, in which case
1099 all public methods of the type are required.
1100
1101 """
1102 if not cls and not methods:
1103 raise TypeError("a class or collection of method names are required")
1104
1105 if isinstance(cls, type) and isinstance(obj, cls):
1106 return obj
1107
1108 interface = set(methods or [m for m in dir(cls) if not m.startswith("_")])
1109 implemented = set(dir(obj))
1110
1111 complies = operator.ge
1112 if isinstance(required, type):
1113 required = interface
1114 elif not required:
1115 required = set()
1116 complies = operator.gt
1117 else:
1118 required = set(required)
1119
1120 if complies(implemented.intersection(interface), required):
1121 return obj
1122
1123 # No dict duck typing here.
1124 if not isinstance(obj, dict):
1125 qualifier = complies is operator.gt and "any of" or "all of"
1126 raise TypeError(
1127 "%r does not implement %s: %s"
1128 % (obj, qualifier, ", ".join(interface))
1129 )
1130
1131 class AnonymousInterface:
1132 """A callable-holding shell."""
1133
1134 if cls:
1135 AnonymousInterface.__name__ = "Anonymous" + cls.__name__
1136 found = set()
1137
1138 for method, impl in dictlike_iteritems(obj):
1139 if method not in interface:
1140 raise TypeError("%r: unknown in this interface" % method)
1141 if not callable(impl):
1142 raise TypeError("%r=%r is not callable" % (method, impl))
1143 setattr(AnonymousInterface, method, staticmethod(impl))
1144 found.add(method)
1145
1146 if complies(found, required):
1147 return AnonymousInterface
1148
1149 raise TypeError(
1150 "dictionary does not contain required keys %s"
1151 % ", ".join(required - found)
1152 )
1153
1154
1155_GFD = TypeVar("_GFD", bound="generic_fn_descriptor[Any]")
1156
1157
1158class generic_fn_descriptor(Generic[_T_co]):
1159 """Descriptor which proxies a function when the attribute is not
1160 present in dict
1161
1162 This superclass is organized in a particular way with "memoized" and
1163 "non-memoized" implementation classes that are hidden from type checkers,
1164 as Mypy seems to not be able to handle seeing multiple kinds of descriptor
1165 classes used for the same attribute.
1166
1167 """
1168
1169 fget: Callable[..., _T_co]
1170 __doc__: Optional[str]
1171 __name__: str
1172
1173 def __init__(self, fget: Callable[..., _T_co], doc: Optional[str] = None):
1174 self.fget = fget
1175 self.__doc__ = doc or fget.__doc__
1176 self.__name__ = fget.__name__
1177
1178 @overload
1179 def __get__(self: _GFD, obj: None, cls: Any) -> _GFD: ...
1180
1181 @overload
1182 def __get__(self, obj: object, cls: Any) -> _T_co: ...
1183
1184 def __get__(self: _GFD, obj: Any, cls: Any) -> Union[_GFD, _T_co]:
1185 raise NotImplementedError()
1186
1187 if TYPE_CHECKING:
1188
1189 def __set__(self, instance: Any, value: Any) -> None: ...
1190
1191 def __delete__(self, instance: Any) -> None: ...
1192
1193 def _reset(self, obj: Any) -> None:
1194 raise NotImplementedError()
1195
1196 @classmethod
1197 def reset(cls, obj: Any, name: str) -> None:
1198 raise NotImplementedError()
1199
1200
1201class _non_memoized_property(generic_fn_descriptor[_T_co]):
1202 """a plain descriptor that proxies a function.
1203
1204 primary rationale is to provide a plain attribute that's
1205 compatible with memoized_property which is also recognized as equivalent
1206 by mypy.
1207
1208 """
1209
1210 if not TYPE_CHECKING:
1211
1212 def __get__(self, obj, cls):
1213 if obj is None:
1214 return self
1215 return self.fget(obj)
1216
1217
1218class _memoized_property(generic_fn_descriptor[_T_co]):
1219 """A read-only @property that is only evaluated once."""
1220
1221 if not TYPE_CHECKING:
1222
1223 def __get__(self, obj, cls):
1224 if obj is None:
1225 return self
1226 obj.__dict__[self.__name__] = result = self.fget(obj)
1227 return result
1228
1229 def _reset(self, obj):
1230 _memoized_property.reset(obj, self.__name__)
1231
1232 @classmethod
1233 def reset(cls, obj, name):
1234 obj.__dict__.pop(name, None)
1235
1236
1237# despite many attempts to get Mypy to recognize an overridden descriptor
1238# where one is memoized and the other isn't, there seems to be no reliable
1239# way other than completely deceiving the type checker into thinking there
1240# is just one single descriptor type everywhere. Otherwise, if a superclass
1241# has non-memoized and subclass has memoized, that requires
1242# "class memoized(non_memoized)". but then if a superclass has memoized and
1243# superclass has non-memoized, the class hierarchy of the descriptors
1244# would need to be reversed; "class non_memoized(memoized)". so there's no
1245# way to achieve this.
1246# additional issues, RO properties:
1247# https://github.com/python/mypy/issues/12440
1248if TYPE_CHECKING:
1249 # allow memoized and non-memoized to be freely mixed by having them
1250 # be the same class
1251 memoized_property = generic_fn_descriptor
1252 non_memoized_property = generic_fn_descriptor
1253
1254 # for read only situations, mypy only sees @property as read only.
1255 # read only is needed when a subtype specializes the return type
1256 # of a property, meaning assignment needs to be disallowed
1257 ro_memoized_property = property
1258 ro_non_memoized_property = property
1259
1260else:
1261 memoized_property = ro_memoized_property = _memoized_property
1262 non_memoized_property = ro_non_memoized_property = _non_memoized_property
1263
1264
1265def memoized_instancemethod(fn: _F) -> _F:
1266 """Decorate a method memoize its return value.
1267
1268 Best applied to no-arg methods: memoization is not sensitive to
1269 argument values, and will always return the same value even when
1270 called with different arguments.
1271
1272 """
1273
1274 def oneshot(self, *args, **kw):
1275 result = fn(self, *args, **kw)
1276
1277 def memo(*a, **kw):
1278 return result
1279
1280 memo.__name__ = fn.__name__
1281 memo.__doc__ = fn.__doc__
1282 self.__dict__[fn.__name__] = memo
1283 return result
1284
1285 return update_wrapper(oneshot, fn) # type: ignore
1286
1287
1288class HasMemoized:
1289 """A mixin class that maintains the names of memoized elements in a
1290 collection for easy cache clearing, generative, etc.
1291
1292 """
1293
1294 if not TYPE_CHECKING:
1295 # support classes that want to have __slots__ with an explicit
1296 # slot for __dict__. not sure if that requires base __slots__ here.
1297 __slots__ = ()
1298
1299 _memoized_keys: FrozenSet[str] = frozenset()
1300
1301 def _reset_memoizations(self) -> None:
1302 for elem in self._memoized_keys:
1303 self.__dict__.pop(elem, None)
1304
1305 def _assert_no_memoizations(self) -> None:
1306 for elem in self._memoized_keys:
1307 assert elem not in self.__dict__
1308
1309 def _set_memoized_attribute(self, key: str, value: Any) -> None:
1310 self.__dict__[key] = value
1311 self._memoized_keys |= {key}
1312
1313 class memoized_attribute(memoized_property[_T]):
1314 """A read-only @property that is only evaluated once.
1315
1316 :meta private:
1317
1318 """
1319
1320 fget: Callable[..., _T]
1321 __doc__: Optional[str]
1322 __name__: str
1323
1324 def __init__(self, fget: Callable[..., _T], doc: Optional[str] = None):
1325 self.fget = fget
1326 self.__doc__ = doc or fget.__doc__
1327 self.__name__ = fget.__name__
1328
1329 @overload
1330 def __get__(self: _MA, obj: None, cls: Any) -> _MA: ...
1331
1332 @overload
1333 def __get__(self, obj: Any, cls: Any) -> _T: ...
1334
1335 def __get__(self, obj, cls):
1336 if obj is None:
1337 return self
1338 obj.__dict__[self.__name__] = result = self.fget(obj)
1339 obj._memoized_keys |= {self.__name__}
1340 return result
1341
1342 @classmethod
1343 def memoized_instancemethod(cls, fn: _F) -> _F:
1344 """Decorate a method memoize its return value.
1345
1346 :meta private:
1347
1348 """
1349
1350 def oneshot(self: Any, *args: Any, **kw: Any) -> Any:
1351 result = fn(self, *args, **kw)
1352
1353 def memo(*a, **kw):
1354 return result
1355
1356 memo.__name__ = fn.__name__
1357 memo.__doc__ = fn.__doc__
1358 self.__dict__[fn.__name__] = memo
1359 self._memoized_keys |= {fn.__name__}
1360 return result
1361
1362 return update_wrapper(oneshot, fn) # type: ignore
1363
1364
1365if TYPE_CHECKING:
1366 HasMemoized_ro_memoized_attribute = property
1367else:
1368 HasMemoized_ro_memoized_attribute = HasMemoized.memoized_attribute
1369
1370
1371class MemoizedSlots:
1372 """Apply memoized items to an object using a __getattr__ scheme.
1373
1374 This allows the functionality of memoized_property and
1375 memoized_instancemethod to be available to a class using __slots__.
1376
1377 The memoized get is not threadsafe under freethreading and the
1378 creator method may in extremely rare cases be called more than once.
1379
1380 """
1381
1382 __slots__ = ()
1383
1384 def _fallback_getattr(self, key):
1385 raise AttributeError(key)
1386
1387 def __getattr__(self, key: str) -> Any:
1388 if key.startswith("_memoized_attr_") or key.startswith(
1389 "_memoized_method_"
1390 ):
1391 raise AttributeError(key)
1392 # to avoid recursion errors when interacting with other __getattr__
1393 # schemes that refer to this one, when testing for memoized method
1394 # look at __class__ only rather than going into __getattr__ again.
1395 elif hasattr(self.__class__, f"_memoized_attr_{key}"):
1396 value = getattr(self, f"_memoized_attr_{key}")()
1397 setattr(self, key, value)
1398 return value
1399 elif hasattr(self.__class__, f"_memoized_method_{key}"):
1400 meth = getattr(self, f"_memoized_method_{key}")
1401
1402 def oneshot(*args, **kw):
1403 result = meth(*args, **kw)
1404
1405 def memo(*a, **kw):
1406 return result
1407
1408 memo.__name__ = meth.__name__
1409 memo.__doc__ = meth.__doc__
1410 setattr(self, key, memo)
1411 return result
1412
1413 oneshot.__doc__ = meth.__doc__
1414 return oneshot
1415 else:
1416 return self._fallback_getattr(key)
1417
1418
1419# from paste.deploy.converters
1420def asbool(obj: Any) -> bool:
1421 if isinstance(obj, str):
1422 obj = obj.strip().lower()
1423 if obj in ["true", "yes", "on", "y", "t", "1"]:
1424 return True
1425 elif obj in ["false", "no", "off", "n", "f", "0"]:
1426 return False
1427 else:
1428 raise ValueError("String is not true/false: %r" % obj)
1429 return bool(obj)
1430
1431
1432def bool_or_str(*text: str) -> Callable[[str], Union[str, bool]]:
1433 """Return a callable that will evaluate a string as
1434 boolean, or one of a set of "alternate" string values.
1435
1436 """
1437
1438 def bool_or_value(obj: str) -> Union[str, bool]:
1439 if obj in text:
1440 return obj
1441 else:
1442 return asbool(obj)
1443
1444 return bool_or_value
1445
1446
1447def asint(value: Any) -> Optional[int]:
1448 """Coerce to integer."""
1449
1450 if value is None:
1451 return value
1452 return int(value)
1453
1454
1455def coerce_kw_type(
1456 kw: Dict[str, Any],
1457 key: str,
1458 type_: Type[Any],
1459 flexi_bool: bool = True,
1460 dest: Optional[Dict[str, Any]] = None,
1461) -> None:
1462 r"""If 'key' is present in dict 'kw', coerce its value to type 'type\_' if
1463 necessary. If 'flexi_bool' is True, the string '0' is considered false
1464 when coercing to boolean.
1465 """
1466
1467 if dest is None:
1468 dest = kw
1469
1470 if (
1471 key in kw
1472 and (not isinstance(type_, type) or not isinstance(kw[key], type_))
1473 and kw[key] is not None
1474 ):
1475 if type_ is bool and flexi_bool:
1476 dest[key] = asbool(kw[key])
1477 else:
1478 dest[key] = type_(kw[key])
1479
1480
1481def constructor_key(obj: Any, cls: Type[Any]) -> Tuple[Any, ...]:
1482 """Produce a tuple structure that is cacheable using the __dict__ of
1483 obj to retrieve values
1484
1485 """
1486 names = get_cls_kwargs(cls)
1487 return (cls,) + tuple(
1488 (k, obj.__dict__[k]) for k in names if k in obj.__dict__
1489 )
1490
1491
1492def constructor_copy(obj: _T, cls: Type[_T], *args: Any, **kw: Any) -> _T:
1493 """Instantiate cls using the __dict__ of obj as constructor arguments.
1494
1495 Uses inspect to match the named arguments of ``cls``.
1496
1497 """
1498
1499 names = get_cls_kwargs(cls)
1500 kw.update(
1501 (k, obj.__dict__[k]) for k in names.difference(kw) if k in obj.__dict__
1502 )
1503 return cls(*args, **kw)
1504
1505
1506def counter() -> Callable[[], int]:
1507 """Return a threadsafe counter function."""
1508
1509 lock = threading.Lock()
1510 counter = itertools.count(1)
1511
1512 # avoid the 2to3 "next" transformation...
1513 def _next():
1514 with lock:
1515 return next(counter)
1516
1517 return _next
1518
1519
1520def duck_type_collection(
1521 specimen: Any, default: Optional[Type[Any]] = None
1522) -> Optional[Type[Any]]:
1523 """Given an instance or class, guess if it is or is acting as one of
1524 the basic collection types: list, set and dict. If the __emulates__
1525 property is present, return that preferentially.
1526 """
1527
1528 if hasattr(specimen, "__emulates__"):
1529 # canonicalize set vs sets.Set to a standard: the builtin set
1530 if specimen.__emulates__ is not None and issubclass(
1531 specimen.__emulates__, set
1532 ):
1533 return set
1534 else:
1535 return specimen.__emulates__ # type: ignore
1536
1537 isa = issubclass if isinstance(specimen, type) else isinstance
1538 if isa(specimen, list):
1539 return list
1540 elif isa(specimen, set):
1541 return set
1542 elif isa(specimen, dict):
1543 return dict
1544
1545 if hasattr(specimen, "append"):
1546 return list
1547 elif hasattr(specimen, "add"):
1548 return set
1549 elif hasattr(specimen, "set"):
1550 return dict
1551 else:
1552 return default
1553
1554
1555def assert_arg_type(
1556 arg: Any, argtype: Union[Tuple[Type[Any], ...], Type[Any]], name: str
1557) -> Any:
1558 if isinstance(arg, argtype):
1559 return arg
1560 else:
1561 if isinstance(argtype, tuple):
1562 raise exc.ArgumentError(
1563 "Argument '%s' is expected to be one of type %s, got '%s'"
1564 % (name, " or ".join("'%s'" % a for a in argtype), type(arg))
1565 )
1566 else:
1567 raise exc.ArgumentError(
1568 "Argument '%s' is expected to be of type '%s', got '%s'"
1569 % (name, argtype, type(arg))
1570 )
1571
1572
1573def dictlike_iteritems(dictlike):
1574 """Return a (key, value) iterator for almost any dict-like object."""
1575
1576 if hasattr(dictlike, "items"):
1577 return list(dictlike.items())
1578
1579 getter = getattr(dictlike, "__getitem__", getattr(dictlike, "get", None))
1580 if getter is None:
1581 raise TypeError("Object '%r' is not dict-like" % dictlike)
1582
1583 if hasattr(dictlike, "iterkeys"):
1584
1585 def iterator():
1586 for key in dictlike.iterkeys():
1587 assert getter is not None
1588 yield key, getter(key)
1589
1590 return iterator()
1591 elif hasattr(dictlike, "keys"):
1592 return iter((key, getter(key)) for key in dictlike.keys())
1593 else:
1594 raise TypeError("Object '%r' is not dict-like" % dictlike)
1595
1596
1597class classproperty(property):
1598 """A decorator that behaves like @property except that operates
1599 on classes rather than instances.
1600
1601 The decorator is currently special when using the declarative
1602 module, but note that the
1603 :class:`~.sqlalchemy.ext.declarative.declared_attr`
1604 decorator should be used for this purpose with declarative.
1605
1606 """
1607
1608 fget: Callable[[Any], Any]
1609
1610 def __init__(self, fget: Callable[[Any], Any], *arg: Any, **kw: Any):
1611 super().__init__(fget, *arg, **kw)
1612 self.__doc__ = fget.__doc__
1613
1614 def __get__(self, obj: Any, cls: Optional[type] = None) -> Any:
1615 return self.fget(cls)
1616
1617
1618class hybridproperty(Generic[_T]):
1619 def __init__(self, func: Callable[..., _T]):
1620 self.func = func
1621 self.clslevel = func
1622
1623 def __get__(self, instance: Any, owner: Any) -> _T:
1624 if instance is None:
1625 clsval = self.clslevel(owner)
1626 return clsval
1627 else:
1628 return self.func(instance)
1629
1630 def classlevel(self, func: Callable[..., Any]) -> hybridproperty[_T]:
1631 self.clslevel = func
1632 return self
1633
1634
1635class rw_hybridproperty(Generic[_T]):
1636 def __init__(self, func: Callable[..., _T]):
1637 self.func = func
1638 self.clslevel = func
1639 self.setfn: Optional[Callable[..., Any]] = None
1640
1641 def __get__(self, instance: Any, owner: Any) -> _T:
1642 if instance is None:
1643 clsval = self.clslevel(owner)
1644 return clsval
1645 else:
1646 return self.func(instance)
1647
1648 def __set__(self, instance: Any, value: Any) -> None:
1649 assert self.setfn is not None
1650 self.setfn(instance, value)
1651
1652 def setter(self, func: Callable[..., Any]) -> rw_hybridproperty[_T]:
1653 self.setfn = func
1654 return self
1655
1656 def classlevel(self, func: Callable[..., Any]) -> rw_hybridproperty[_T]:
1657 self.clslevel = func
1658 return self
1659
1660
1661class hybridmethod(Generic[_T]):
1662 """Decorate a function as cls- or instance- level."""
1663
1664 def __init__(self, func: Callable[..., _T]):
1665 self.func = self.__func__ = func
1666 self.clslevel = func
1667
1668 def __get__(self, instance: Any, owner: Any) -> Callable[..., _T]:
1669 if instance is None:
1670 return self.clslevel.__get__(owner, owner.__class__) # type:ignore
1671 else:
1672 return self.func.__get__(instance, owner) # type:ignore
1673
1674 def classlevel(self, func: Callable[..., Any]) -> hybridmethod[_T]:
1675 self.clslevel = func
1676 return self
1677
1678
1679class symbol(int):
1680 """A constant symbol.
1681
1682 >>> symbol("foo") is symbol("foo")
1683 True
1684 >>> symbol("foo")
1685 <symbol 'foo>
1686
1687 A slight refinement of the MAGICCOOKIE=object() pattern. The primary
1688 advantage of symbol() is its repr(). They are also singletons.
1689
1690 Repeated calls of symbol('name') will all return the same instance.
1691
1692 """
1693
1694 name: str
1695
1696 symbols: Dict[str, symbol] = {}
1697 _lock = threading.Lock()
1698
1699 def __new__(
1700 cls,
1701 name: str,
1702 doc: Optional[str] = None,
1703 canonical: Optional[int] = None,
1704 ) -> symbol:
1705 with cls._lock:
1706 sym = cls.symbols.get(name)
1707 if sym is None:
1708 assert isinstance(name, str)
1709 if canonical is None:
1710 canonical = hash(name)
1711 sym = int.__new__(symbol, canonical)
1712 sym.name = name
1713 if doc:
1714 sym.__doc__ = doc
1715
1716 # NOTE: we should ultimately get rid of this global thing,
1717 # however, currently it is to support pickling. The best
1718 # change would be when we are on py3.11 at a minimum, we
1719 # switch to stdlib enum.IntFlag.
1720 cls.symbols[name] = sym
1721 else:
1722 if canonical and canonical != sym:
1723 raise TypeError(
1724 f"Can't replace canonical symbol for {name!r} "
1725 f"with new int value {canonical}"
1726 )
1727 return sym
1728
1729 def __reduce__(self):
1730 return symbol, (self.name, "x", int(self))
1731
1732 def __str__(self):
1733 return repr(self)
1734
1735 def __repr__(self):
1736 return f"symbol({self.name!r})"
1737
1738
1739class _IntFlagMeta(type):
1740 def __init__(
1741 cls,
1742 classname: str,
1743 bases: Tuple[Type[Any], ...],
1744 dict_: Dict[str, Any],
1745 **kw: Any,
1746 ) -> None:
1747 items: List[symbol]
1748 cls._items = items = []
1749 for k, v in dict_.items():
1750 if re.match(r"^__.*__$", k):
1751 continue
1752 if isinstance(v, int):
1753 sym = symbol(k, canonical=v)
1754 elif not k.startswith("_"):
1755 raise TypeError("Expected integer values for IntFlag")
1756 else:
1757 continue
1758 setattr(cls, k, sym)
1759 items.append(sym)
1760
1761 cls.__members__ = _collections.immutabledict(
1762 {sym.name: sym for sym in items}
1763 )
1764
1765 def __iter__(self) -> Iterator[symbol]:
1766 raise NotImplementedError(
1767 "iter not implemented to ensure compatibility with "
1768 "Python 3.11 IntFlag. Please use __members__. See "
1769 "https://github.com/python/cpython/issues/99304"
1770 )
1771
1772
1773class _FastIntFlag(metaclass=_IntFlagMeta):
1774 """An 'IntFlag' copycat that isn't slow when performing bitwise
1775 operations.
1776
1777 the ``FastIntFlag`` class will return ``enum.IntFlag`` under TYPE_CHECKING
1778 and ``_FastIntFlag`` otherwise.
1779
1780 """
1781
1782
1783if TYPE_CHECKING:
1784 from enum import IntFlag
1785
1786 FastIntFlag = IntFlag
1787else:
1788 FastIntFlag = _FastIntFlag
1789
1790
1791_E = TypeVar("_E", bound=enum.Enum)
1792
1793
1794def parse_user_argument_for_enum(
1795 arg: Any,
1796 choices: Dict[_E, List[Any]],
1797 name: str,
1798 resolve_symbol_names: bool = False,
1799) -> Optional[_E]:
1800 """Given a user parameter, parse the parameter into a chosen value
1801 from a list of choice objects, typically Enum values.
1802
1803 The user argument can be a string name that matches the name of a
1804 symbol, or the symbol object itself, or any number of alternate choices
1805 such as True/False/ None etc.
1806
1807 :param arg: the user argument.
1808 :param choices: dictionary of enum values to lists of possible
1809 entries for each.
1810 :param name: name of the argument. Used in an :class:`.ArgumentError`
1811 that is raised if the parameter doesn't match any available argument.
1812
1813 """
1814 for enum_value, choice in choices.items():
1815 if arg is enum_value:
1816 return enum_value
1817 elif resolve_symbol_names and arg == enum_value.name:
1818 return enum_value
1819 elif arg in choice:
1820 return enum_value
1821
1822 if arg is None:
1823 return None
1824
1825 raise exc.ArgumentError(f"Invalid value for '{name}': {arg!r}")
1826
1827
1828_creation_order = 1
1829
1830
1831def set_creation_order(instance: Any) -> None:
1832 """Assign a '_creation_order' sequence to the given instance.
1833
1834 This allows multiple instances to be sorted in order of creation
1835 (typically within a single thread; the counter is not particularly
1836 threadsafe).
1837
1838 """
1839 global _creation_order
1840 instance._creation_order = _creation_order
1841 _creation_order += 1
1842
1843
1844def warn_exception(func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any:
1845 """executes the given function, catches all exceptions and converts to
1846 a warning.
1847
1848 """
1849 try:
1850 return func(*args, **kwargs)
1851 except Exception:
1852 warn("%s('%s') ignored" % sys.exc_info()[0:2])
1853
1854
1855def ellipses_string(value, len_=25):
1856 try:
1857 if len(value) > len_:
1858 return "%s..." % value[0:len_]
1859 else:
1860 return value
1861 except TypeError:
1862 return value
1863
1864
1865class _hash_limit_string(str):
1866 """A string subclass that can only be hashed on a maximum amount
1867 of unique values.
1868
1869 This is used for warnings so that we can send out parameterized warnings
1870 without the __warningregistry__ of the module, or the non-overridable
1871 "once" registry within warnings.py, overloading memory,
1872
1873
1874 """
1875
1876 _hash: int
1877
1878 def __new__(
1879 cls, value: str, num: int, args: Sequence[Any]
1880 ) -> _hash_limit_string:
1881 interpolated = (value % args) + (
1882 " (this warning may be suppressed after %d occurrences)" % num
1883 )
1884 self = super().__new__(cls, interpolated)
1885 self._hash = hash("%s_%d" % (value, hash(interpolated) % num))
1886 return self
1887
1888 def __hash__(self) -> int:
1889 return self._hash
1890
1891 def __eq__(self, other: Any) -> bool:
1892 return hash(self) == hash(other)
1893
1894
1895def warn(msg: str, code: Optional[str] = None) -> None:
1896 """Issue a warning.
1897
1898 If msg is a string, :class:`.exc.SAWarning` is used as
1899 the category.
1900
1901 """
1902 if code:
1903 _warnings_warn(exc.SAWarning(msg, code=code))
1904 else:
1905 _warnings_warn(msg, exc.SAWarning)
1906
1907
1908def warn_limited(msg: str, args: Sequence[Any]) -> None:
1909 """Issue a warning with a parameterized string, limiting the number
1910 of registrations.
1911
1912 """
1913 if args:
1914 msg = _hash_limit_string(msg, 10, args)
1915 _warnings_warn(msg, exc.SAWarning)
1916
1917
1918_warning_tags: Dict[CodeType, Tuple[str, Type[Warning]]] = {}
1919
1920
1921def tag_method_for_warnings(
1922 message: str, category: Type[Warning]
1923) -> Callable[[_F], _F]:
1924 def go(fn):
1925 _warning_tags[fn.__code__] = (message, category)
1926 return fn
1927
1928 return go
1929
1930
1931_not_sa_pattern = re.compile(r"^(?:sqlalchemy\.(?!testing)|alembic\.)")
1932
1933
1934def _warnings_warn(
1935 message: Union[str, Warning],
1936 category: Optional[Type[Warning]] = None,
1937 stacklevel: int = 2,
1938) -> None:
1939 # adjust the given stacklevel to be outside of SQLAlchemy
1940 try:
1941 frame = sys._getframe(stacklevel)
1942 except ValueError:
1943 # being called from less than 3 (or given) stacklevels, weird,
1944 # but don't crash
1945 stacklevel = 0
1946 except:
1947 # _getframe() doesn't work, weird interpreter issue, weird,
1948 # ok, but don't crash
1949 stacklevel = 0
1950 else:
1951 stacklevel_found = warning_tag_found = False
1952 while frame is not None:
1953 # using __name__ here requires that we have __name__ in the
1954 # __globals__ of the decorated string functions we make also.
1955 # we generate this using {"__name__": fn.__module__}
1956 if not stacklevel_found and not re.match(
1957 _not_sa_pattern, frame.f_globals.get("__name__", "")
1958 ):
1959 # stop incrementing stack level if an out-of-SQLA line
1960 # were found.
1961 stacklevel_found = True
1962
1963 # however, for the warning tag thing, we have to keep
1964 # scanning up the whole traceback
1965
1966 if frame.f_code in _warning_tags:
1967 warning_tag_found = True
1968 (_suffix, _category) = _warning_tags[frame.f_code]
1969 category = category or _category
1970 message = f"{message} ({_suffix})"
1971
1972 frame = frame.f_back # type: ignore[assignment]
1973
1974 if not stacklevel_found:
1975 stacklevel += 1
1976 elif stacklevel_found and warning_tag_found:
1977 break
1978
1979 if category is not None:
1980 warnings.warn(message, category, stacklevel=stacklevel + 1)
1981 else:
1982 warnings.warn(message, stacklevel=stacklevel + 1)
1983
1984
1985def only_once(
1986 fn: Callable[..., _T], retry_on_exception: bool
1987) -> Callable[..., Optional[_T]]:
1988 """Decorate the given function to be a no-op after it is called exactly
1989 once."""
1990
1991 once = [fn]
1992
1993 def go(*arg: Any, **kw: Any) -> Optional[_T]:
1994 # strong reference fn so that it isn't garbage collected,
1995 # which interferes with the event system's expectations
1996 strong_fn = fn # noqa
1997 if once:
1998 once_fn = once.pop()
1999 try:
2000 return once_fn(*arg, **kw)
2001 except:
2002 if retry_on_exception:
2003 once.insert(0, once_fn)
2004 raise
2005
2006 return None
2007
2008 return go
2009
2010
2011_SQLA_RE = re.compile(r"sqlalchemy/([a-z_]+/){0,2}[a-z_]+\.py")
2012_UNITTEST_RE = re.compile(r"unit(?:2|test2?/)")
2013
2014
2015def chop_traceback(
2016 tb: List[str],
2017 exclude_prefix: re.Pattern[str] = _UNITTEST_RE,
2018 exclude_suffix: re.Pattern[str] = _SQLA_RE,
2019) -> List[str]:
2020 """Chop extraneous lines off beginning and end of a traceback.
2021
2022 :param tb:
2023 a list of traceback lines as returned by ``traceback.format_stack()``
2024
2025 :param exclude_prefix:
2026 a regular expression object matching lines to skip at beginning of
2027 ``tb``
2028
2029 :param exclude_suffix:
2030 a regular expression object matching lines to skip at end of ``tb``
2031 """
2032 start = 0
2033 end = len(tb) - 1
2034 while start <= end and exclude_prefix.search(tb[start]):
2035 start += 1
2036 while start <= end and exclude_suffix.search(tb[end]):
2037 end -= 1
2038 return tb[start : end + 1]
2039
2040
2041NoneType = type(None)
2042
2043
2044def attrsetter(attrname):
2045 code = "def set(obj, value): obj.%s = value" % attrname
2046 env = locals().copy()
2047 exec(code, env)
2048 return env["set"]
2049
2050
2051_dunders = re.compile("^__.+__$")
2052
2053
2054class TypingOnly:
2055 """A mixin class that marks a class as 'typing only', meaning it has
2056 absolutely no methods, attributes, or runtime functionality whatsoever.
2057
2058 """
2059
2060 __slots__ = ()
2061
2062 def __init_subclass__(cls) -> None:
2063 if TypingOnly in cls.__bases__:
2064 remaining = {
2065 name for name in cls.__dict__ if not _dunders.match(name)
2066 }
2067 if remaining:
2068 raise AssertionError(
2069 f"Class {cls} directly inherits TypingOnly but has "
2070 f"additional attributes {remaining}."
2071 )
2072 super().__init_subclass__()
2073
2074
2075class EnsureKWArg:
2076 r"""Apply translation of functions to accept \**kw arguments if they
2077 don't already.
2078
2079 Used to ensure cross-compatibility with third party legacy code, for things
2080 like compiler visit methods that need to accept ``**kw`` arguments,
2081 but may have been copied from old code that didn't accept them.
2082
2083 """
2084
2085 ensure_kwarg: str
2086 """a regular expression that indicates method names for which the method
2087 should accept ``**kw`` arguments.
2088
2089 The class will scan for methods matching the name template and decorate
2090 them if necessary to ensure ``**kw`` parameters are accepted.
2091
2092 """
2093
2094 def __init_subclass__(cls) -> None:
2095 fn_reg = cls.ensure_kwarg
2096 clsdict = cls.__dict__
2097 if fn_reg:
2098 for key in clsdict:
2099 m = re.match(fn_reg, key)
2100 if m:
2101 fn = clsdict[key]
2102 spec = compat.inspect_getfullargspec(fn)
2103 if not spec.varkw:
2104 wrapped = cls._wrap_w_kw(fn)
2105 setattr(cls, key, wrapped)
2106 super().__init_subclass__()
2107
2108 @classmethod
2109 def _wrap_w_kw(cls, fn: Callable[..., Any]) -> Callable[..., Any]:
2110 def wrap(*arg: Any, **kw: Any) -> Any:
2111 return fn(*arg)
2112
2113 return update_wrapper(wrap, fn)
2114
2115
2116def wrap_callable(wrapper, fn):
2117 """Augment functools.update_wrapper() to work with objects with
2118 a ``__call__()`` method.
2119
2120 :param fn:
2121 object with __call__ method
2122
2123 """
2124 if hasattr(fn, "__name__"):
2125 return update_wrapper(wrapper, fn)
2126 else:
2127 _f = wrapper
2128 _f.__name__ = fn.__class__.__name__
2129 if hasattr(fn, "__module__"):
2130 _f.__module__ = fn.__module__
2131
2132 if hasattr(fn.__call__, "__doc__") and fn.__call__.__doc__:
2133 _f.__doc__ = fn.__call__.__doc__
2134 elif fn.__doc__:
2135 _f.__doc__ = fn.__doc__
2136
2137 return _f
2138
2139
2140def quoted_token_parser(value):
2141 """Parse a dotted identifier with accommodation for quoted names.
2142
2143 Includes support for SQL-style double quotes as a literal character.
2144
2145 E.g.::
2146
2147 >>> quoted_token_parser("name")
2148 ["name"]
2149 >>> quoted_token_parser("schema.name")
2150 ["schema", "name"]
2151 >>> quoted_token_parser('"Schema"."Name"')
2152 ['Schema', 'Name']
2153 >>> quoted_token_parser('"Schema"."Name""Foo"')
2154 ['Schema', 'Name""Foo']
2155
2156 """
2157
2158 if '"' not in value:
2159 return value.split(".")
2160
2161 # 0 = outside of quotes
2162 # 1 = inside of quotes
2163 state = 0
2164 result: List[List[str]] = [[]]
2165 idx = 0
2166 lv = len(value)
2167 while idx < lv:
2168 char = value[idx]
2169 if char == '"':
2170 if state == 1 and idx < lv - 1 and value[idx + 1] == '"':
2171 result[-1].append('"')
2172 idx += 1
2173 else:
2174 state ^= 1
2175 elif char == "." and state == 0:
2176 result.append([])
2177 else:
2178 result[-1].append(char)
2179 idx += 1
2180
2181 return ["".join(token) for token in result]
2182
2183
2184def add_parameter_text(params: Any, text: str) -> Callable[[_F], _F]:
2185 params = _collections.to_list(params)
2186
2187 def decorate(fn):
2188 doc = fn.__doc__ is not None and fn.__doc__ or ""
2189 if doc:
2190 doc = inject_param_text(doc, {param: text for param in params})
2191 fn.__doc__ = doc
2192 return fn
2193
2194 return decorate
2195
2196
2197def _dedent_docstring(text: str) -> str:
2198 split_text = text.split("\n", 1)
2199 if len(split_text) == 1:
2200 return text
2201 else:
2202 firstline, remaining = split_text
2203 if not firstline.startswith(" "):
2204 return firstline + "\n" + textwrap.dedent(remaining)
2205 else:
2206 return textwrap.dedent(text)
2207
2208
2209def inject_docstring_text(
2210 given_doctext: Optional[str], injecttext: str, pos: int
2211) -> str:
2212 doctext: str = _dedent_docstring(given_doctext or "")
2213 lines = doctext.split("\n")
2214 if len(lines) == 1:
2215 lines.append("")
2216 injectlines = textwrap.dedent(injecttext).split("\n")
2217 if injectlines[0]:
2218 injectlines.insert(0, "")
2219
2220 blanks = [num for num, line in enumerate(lines) if not line.strip()]
2221 blanks.insert(0, 0)
2222
2223 inject_pos = blanks[min(pos, len(blanks) - 1)]
2224
2225 lines = lines[0:inject_pos] + injectlines + lines[inject_pos:]
2226 return "\n".join(lines)
2227
2228
2229_param_reg = re.compile(r"(\s+):param (.+?):")
2230
2231
2232def inject_param_text(doctext: str, inject_params: Dict[str, str]) -> str:
2233 doclines = collections.deque(doctext.splitlines())
2234 lines = []
2235
2236 # TODO: this is not working for params like ":param case_sensitive=True:"
2237
2238 to_inject = None
2239 while doclines:
2240 line = doclines.popleft()
2241
2242 m = _param_reg.match(line)
2243
2244 if to_inject is None:
2245 if m:
2246 param = m.group(2).lstrip("*")
2247 if param in inject_params:
2248 # default indent to that of :param: plus one
2249 indent = " " * len(m.group(1)) + " "
2250
2251 # but if the next line has text, use that line's
2252 # indentation
2253 if doclines:
2254 m2 = re.match(r"(\s+)\S", doclines[0])
2255 if m2:
2256 indent = " " * len(m2.group(1))
2257
2258 to_inject = indent + inject_params[param]
2259 elif m:
2260 lines.extend(["\n", to_inject, "\n"])
2261 to_inject = None
2262 elif not line.rstrip():
2263 lines.extend([line, to_inject, "\n"])
2264 to_inject = None
2265 elif line.endswith("::"):
2266 # TODO: this still won't cover if the code example itself has
2267 # blank lines in it, need to detect those via indentation.
2268 lines.extend([line, doclines.popleft()])
2269 continue
2270 lines.append(line)
2271
2272 return "\n".join(lines)
2273
2274
2275def repr_tuple_names(names: List[str]) -> Optional[str]:
2276 """Trims a list of strings from the middle and return a string of up to
2277 four elements. Strings greater than 11 characters will be truncated"""
2278 if len(names) == 0:
2279 return None
2280 flag = len(names) <= 4
2281 names = names[0:4] if flag else names[0:3] + names[-1:]
2282 res = ["%s.." % name[:11] if len(name) > 11 else name for name in names]
2283 if flag:
2284 return ", ".join(res)
2285 else:
2286 return "%s, ..., %s" % (", ".join(res[0:3]), res[-1])
2287
2288
2289def has_compiled_ext(raise_=False):
2290 if HAS_CYEXTENSION:
2291 return True
2292 elif raise_:
2293 raise ImportError(
2294 "cython extensions were expected to be installed, "
2295 "but are not present"
2296 )
2297 else:
2298 return False
2299
2300
2301class _Missing(enum.Enum):
2302 Missing = enum.auto()
2303
2304
2305Missing = _Missing.Missing
2306MissingOr = Union[_T, Literal[_Missing.Missing]]