1# util/langhelpers.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Routines to help with the creation, loading and introspection of
10modules, classes, hierarchies, attributes, functions, and methods.
11
12"""
13from __future__ import annotations
14
15import collections
16import enum
17from functools import update_wrapper
18import inspect
19import itertools
20import operator
21import re
22import sys
23import textwrap
24import threading
25import types
26from types import CodeType
27from typing import Any
28from typing import Callable
29from typing import cast
30from typing import Dict
31from typing import FrozenSet
32from typing import Generic
33from typing import Iterator
34from typing import List
35from typing import Mapping
36from typing import NoReturn
37from typing import Optional
38from typing import overload
39from typing import Sequence
40from typing import Set
41from typing import Tuple
42from typing import Type
43from typing import TYPE_CHECKING
44from typing import TypeVar
45from typing import Union
46import warnings
47
48from . import _collections
49from . import compat
50from ._has_cy import HAS_CYEXTENSION
51from .typing import Literal
52from .. import exc
53
54_T = TypeVar("_T")
55_T_co = TypeVar("_T_co", covariant=True)
56_F = TypeVar("_F", bound=Callable[..., Any])
57_MP = TypeVar("_MP", bound="memoized_property[Any]")
58_MA = TypeVar("_MA", bound="HasMemoized.memoized_attribute[Any]")
59_HP = TypeVar("_HP", bound="hybridproperty[Any]")
60_HM = TypeVar("_HM", bound="hybridmethod[Any]")
61
62
63if compat.py314:
64 # vendor a minimal form of get_annotations per
65 # https://github.com/python/cpython/issues/133684#issuecomment-2863841891
66
67 from annotationlib import call_annotate_function # type: ignore
68 from annotationlib import Format
69
70 def _get_and_call_annotate(obj, format): # noqa: A002
71 annotate = getattr(obj, "__annotate__", None)
72 if annotate is not None:
73 ann = call_annotate_function(annotate, format, owner=obj)
74 if not isinstance(ann, dict):
75 raise ValueError(f"{obj!r}.__annotate__ returned a non-dict")
76 return ann
77 return None
78
79 # this is ported from py3.13.0a7
80 _BASE_GET_ANNOTATIONS = type.__dict__["__annotations__"].__get__ # type: ignore # noqa: E501
81
82 def _get_dunder_annotations(obj):
83 if isinstance(obj, type):
84 try:
85 ann = _BASE_GET_ANNOTATIONS(obj)
86 except AttributeError:
87 # For static types, the descriptor raises AttributeError.
88 return {}
89 else:
90 ann = getattr(obj, "__annotations__", None)
91 if ann is None:
92 return {}
93
94 if not isinstance(ann, dict):
95 raise ValueError(
96 f"{obj!r}.__annotations__ is neither a dict nor None"
97 )
98 return dict(ann)
99
100 def _vendored_get_annotations(
101 obj: Any, *, format: Format # noqa: A002
102 ) -> Mapping[str, Any]:
103 """A sparse implementation of annotationlib.get_annotations()"""
104
105 try:
106 ann = _get_dunder_annotations(obj)
107 except Exception:
108 pass
109 else:
110 if ann is not None:
111 return dict(ann)
112
113 # But if __annotations__ threw a NameError, we try calling __annotate__
114 ann = _get_and_call_annotate(obj, format)
115 if ann is None:
116 # If that didn't work either, we have a very weird object:
117 # evaluating
118 # __annotations__ threw NameError and there is no __annotate__.
119 # In that case,
120 # we fall back to trying __annotations__ again.
121 ann = _get_dunder_annotations(obj)
122
123 if ann is None:
124 if isinstance(obj, type) or callable(obj):
125 return {}
126 raise TypeError(f"{obj!r} does not have annotations")
127
128 if not ann:
129 return {}
130
131 return dict(ann)
132
133 def get_annotations(obj: Any) -> Mapping[str, Any]:
134 # FORWARDREF has the effect of giving us ForwardRefs and not
135 # actually trying to evaluate the annotations. We need this so
136 # that the annotations act as much like
137 # "from __future__ import annotations" as possible, which is going
138 # away in future python as a separate mode
139 return _vendored_get_annotations(obj, format=Format.FORWARDREF)
140
141elif compat.py310:
142
143 def get_annotations(obj: Any) -> Mapping[str, Any]:
144 return inspect.get_annotations(obj)
145
146else:
147
148 def get_annotations(obj: Any) -> Mapping[str, Any]:
149 # it's been observed that cls.__annotations__ can be non present.
150 # it's not clear what causes this, running under tox py37/38 it
151 # happens, running straight pytest it doesnt
152
153 # https://docs.python.org/3/howto/annotations.html#annotations-howto
154 if isinstance(obj, type):
155 ann = obj.__dict__.get("__annotations__", None)
156 else:
157 ann = getattr(obj, "__annotations__", None)
158
159 if ann is None:
160 return _collections.EMPTY_DICT
161 else:
162 return cast("Mapping[str, Any]", ann)
163
164
165def md5_hex(x: Any) -> str:
166 x = x.encode("utf-8")
167 m = compat.md5_not_for_security()
168 m.update(x)
169 return cast(str, m.hexdigest())
170
171
172class safe_reraise:
173 """Reraise an exception after invoking some
174 handler code.
175
176 Stores the existing exception info before
177 invoking so that it is maintained across a potential
178 coroutine context switch.
179
180 e.g.::
181
182 try:
183 sess.commit()
184 except:
185 with safe_reraise():
186 sess.rollback()
187
188 TODO: we should at some point evaluate current behaviors in this regard
189 based on current greenlet, gevent/eventlet implementations in Python 3, and
190 also see the degree to which our own asyncio (based on greenlet also) is
191 impacted by this. .rollback() will cause IO / context switch to occur in
192 all these scenarios; what happens to the exception context from an
193 "except:" block if we don't explicitly store it? Original issue was #2703.
194
195 """
196
197 __slots__ = ("_exc_info",)
198
199 _exc_info: Union[
200 None,
201 Tuple[
202 Type[BaseException],
203 BaseException,
204 types.TracebackType,
205 ],
206 Tuple[None, None, None],
207 ]
208
209 def __enter__(self) -> None:
210 self._exc_info = sys.exc_info()
211
212 def __exit__(
213 self,
214 type_: Optional[Type[BaseException]],
215 value: Optional[BaseException],
216 traceback: Optional[types.TracebackType],
217 ) -> NoReturn:
218 assert self._exc_info is not None
219 # see #2703 for notes
220 if type_ is None:
221 exc_type, exc_value, exc_tb = self._exc_info
222 assert exc_value is not None
223 self._exc_info = None # remove potential circular references
224 raise exc_value.with_traceback(exc_tb)
225 else:
226 self._exc_info = None # remove potential circular references
227 assert value is not None
228 raise value.with_traceback(traceback)
229
230
231def walk_subclasses(cls: Type[_T]) -> Iterator[Type[_T]]:
232 seen: Set[Any] = set()
233
234 stack = [cls]
235 while stack:
236 cls = stack.pop()
237 if cls in seen:
238 continue
239 else:
240 seen.add(cls)
241 stack.extend(cls.__subclasses__())
242 yield cls
243
244
245def string_or_unprintable(element: Any) -> str:
246 if isinstance(element, str):
247 return element
248 else:
249 try:
250 return str(element)
251 except Exception:
252 return "unprintable element %r" % element
253
254
255def clsname_as_plain_name(
256 cls: Type[Any], use_name: Optional[str] = None
257) -> str:
258 name = use_name or cls.__name__
259 return " ".join(n.lower() for n in re.findall(r"([A-Z][a-z]+|SQL)", name))
260
261
262def method_is_overridden(
263 instance_or_cls: Union[Type[Any], object],
264 against_method: Callable[..., Any],
265) -> bool:
266 """Return True if the two class methods don't match."""
267
268 if not isinstance(instance_or_cls, type):
269 current_cls = instance_or_cls.__class__
270 else:
271 current_cls = instance_or_cls
272
273 method_name = against_method.__name__
274
275 current_method: types.MethodType = getattr(current_cls, method_name)
276
277 return current_method != against_method
278
279
280def decode_slice(slc: slice) -> Tuple[Any, ...]:
281 """decode a slice object as sent to __getitem__.
282
283 takes into account the 2.5 __index__() method, basically.
284
285 """
286 ret: List[Any] = []
287 for x in slc.start, slc.stop, slc.step:
288 if hasattr(x, "__index__"):
289 x = x.__index__()
290 ret.append(x)
291 return tuple(ret)
292
293
294def _unique_symbols(used: Sequence[str], *bases: str) -> Iterator[str]:
295 used_set = set(used)
296 for base in bases:
297 pool = itertools.chain(
298 (base,),
299 map(lambda i: base + str(i), range(1000)),
300 )
301 for sym in pool:
302 if sym not in used_set:
303 used_set.add(sym)
304 yield sym
305 break
306 else:
307 raise NameError("exhausted namespace for symbol base %s" % base)
308
309
310def map_bits(fn: Callable[[int], Any], n: int) -> Iterator[Any]:
311 """Call the given function given each nonzero bit from n."""
312
313 while n:
314 b = n & (~n + 1)
315 yield fn(b)
316 n ^= b
317
318
319_Fn = TypeVar("_Fn", bound="Callable[..., Any]")
320
321# this seems to be in flux in recent mypy versions
322
323
324def decorator(target: Callable[..., Any]) -> Callable[[_Fn], _Fn]:
325 """A signature-matching decorator factory."""
326
327 def decorate(fn: _Fn) -> _Fn:
328 if not inspect.isfunction(fn) and not inspect.ismethod(fn):
329 raise Exception("not a decoratable function")
330
331 # Python 3.14 defer creating __annotations__ until its used.
332 # We do not want to create __annotations__ now.
333 annofunc = getattr(fn, "__annotate__", None)
334 if annofunc is not None:
335 fn.__annotate__ = None # type: ignore[union-attr]
336 try:
337 spec = compat.inspect_getfullargspec(fn)
338 finally:
339 fn.__annotate__ = annofunc # type: ignore[union-attr]
340 else:
341 spec = compat.inspect_getfullargspec(fn)
342
343 # Do not generate code for annotations.
344 # update_wrapper() copies the annotation from fn to decorated.
345 # We use dummy defaults for code generation to avoid having
346 # copy of large globals for compiling.
347 # We copy __defaults__ and __kwdefaults__ from fn to decorated.
348 empty_defaults = (None,) * len(spec.defaults or ())
349 empty_kwdefaults = dict.fromkeys(spec.kwonlydefaults or ())
350 spec = spec._replace(
351 annotations={},
352 defaults=empty_defaults,
353 kwonlydefaults=empty_kwdefaults,
354 )
355
356 names = (
357 tuple(cast("Tuple[str, ...]", spec[0]))
358 + cast("Tuple[str, ...]", spec[1:3])
359 + (fn.__name__,)
360 )
361 targ_name, fn_name = _unique_symbols(names, "target", "fn")
362
363 metadata: Dict[str, Optional[str]] = dict(target=targ_name, fn=fn_name)
364 metadata.update(format_argspec_plus(spec, grouped=False))
365 metadata["name"] = fn.__name__
366
367 if inspect.iscoroutinefunction(fn):
368 metadata["prefix"] = "async "
369 metadata["target_prefix"] = "await "
370 else:
371 metadata["prefix"] = ""
372 metadata["target_prefix"] = ""
373
374 # look for __ positional arguments. This is a convention in
375 # SQLAlchemy that arguments should be passed positionally
376 # rather than as keyword
377 # arguments. note that apply_pos doesn't currently work in all cases
378 # such as when a kw-only indicator "*" is present, which is why
379 # we limit the use of this to just that case we can detect. As we add
380 # more kinds of methods that use @decorator, things may have to
381 # be further improved in this area
382 if "__" in repr(spec[0]):
383 code = (
384 """\
385%(prefix)sdef %(name)s%(grouped_args)s:
386 return %(target_prefix)s%(target)s(%(fn)s, %(apply_pos)s)
387"""
388 % metadata
389 )
390 else:
391 code = (
392 """\
393%(prefix)sdef %(name)s%(grouped_args)s:
394 return %(target_prefix)s%(target)s(%(fn)s, %(apply_kw)s)
395"""
396 % metadata
397 )
398
399 env: Dict[str, Any] = {
400 targ_name: target,
401 fn_name: fn,
402 "__name__": fn.__module__,
403 }
404
405 decorated = cast(
406 types.FunctionType,
407 _exec_code_in_env(code, env, fn.__name__),
408 )
409 decorated.__defaults__ = fn.__defaults__
410 decorated.__kwdefaults__ = fn.__kwdefaults__ # type: ignore
411 return update_wrapper(decorated, fn) # type: ignore[return-value]
412
413 return update_wrapper(decorate, target) # type: ignore[return-value]
414
415
416def _exec_code_in_env(
417 code: Union[str, types.CodeType], env: Dict[str, Any], fn_name: str
418) -> Callable[..., Any]:
419 exec(code, env)
420 return env[fn_name] # type: ignore[no-any-return]
421
422
423_PF = TypeVar("_PF")
424_TE = TypeVar("_TE")
425
426
427class PluginLoader:
428 def __init__(
429 self, group: str, auto_fn: Optional[Callable[..., Any]] = None
430 ):
431 self.group = group
432 self.impls: Dict[str, Any] = {}
433 self.auto_fn = auto_fn
434
435 def clear(self):
436 self.impls.clear()
437
438 def load(self, name: str) -> Any:
439 if name in self.impls:
440 return self.impls[name]()
441
442 if self.auto_fn:
443 loader = self.auto_fn(name)
444 if loader:
445 self.impls[name] = loader
446 return loader()
447
448 for impl in compat.importlib_metadata_get(self.group):
449 if impl.name == name:
450 self.impls[name] = impl.load
451 return impl.load()
452
453 raise exc.NoSuchModuleError(
454 "Can't load plugin: %s:%s" % (self.group, name)
455 )
456
457 def register(self, name: str, modulepath: str, objname: str) -> None:
458 def load():
459 mod = __import__(modulepath)
460 for token in modulepath.split(".")[1:]:
461 mod = getattr(mod, token)
462 return getattr(mod, objname)
463
464 self.impls[name] = load
465
466 def deregister(self, name: str) -> None:
467 del self.impls[name]
468
469
470def _inspect_func_args(fn):
471 try:
472 co_varkeywords = inspect.CO_VARKEYWORDS
473 except AttributeError:
474 # https://docs.python.org/3/library/inspect.html
475 # The flags are specific to CPython, and may not be defined in other
476 # Python implementations. Furthermore, the flags are an implementation
477 # detail, and can be removed or deprecated in future Python releases.
478 spec = compat.inspect_getfullargspec(fn)
479 return spec[0], bool(spec[2])
480 else:
481 # use fn.__code__ plus flags to reduce method call overhead
482 co = fn.__code__
483 nargs = co.co_argcount
484 return (
485 list(co.co_varnames[:nargs]),
486 bool(co.co_flags & co_varkeywords),
487 )
488
489
490@overload
491def get_cls_kwargs(
492 cls: type,
493 *,
494 _set: Optional[Set[str]] = None,
495 raiseerr: Literal[True] = ...,
496) -> Set[str]: ...
497
498
499@overload
500def get_cls_kwargs(
501 cls: type, *, _set: Optional[Set[str]] = None, raiseerr: bool = False
502) -> Optional[Set[str]]: ...
503
504
505def get_cls_kwargs(
506 cls: type, *, _set: Optional[Set[str]] = None, raiseerr: bool = False
507) -> Optional[Set[str]]:
508 r"""Return the full set of inherited kwargs for the given `cls`.
509
510 Probes a class's __init__ method, collecting all named arguments. If the
511 __init__ defines a \**kwargs catch-all, then the constructor is presumed
512 to pass along unrecognized keywords to its base classes, and the
513 collection process is repeated recursively on each of the bases.
514
515 Uses a subset of inspect.getfullargspec() to cut down on method overhead,
516 as this is used within the Core typing system to create copies of type
517 objects which is a performance-sensitive operation.
518
519 No anonymous tuple arguments please !
520
521 """
522 toplevel = _set is None
523 if toplevel:
524 _set = set()
525 assert _set is not None
526
527 ctr = cls.__dict__.get("__init__", False)
528
529 has_init = (
530 ctr
531 and isinstance(ctr, types.FunctionType)
532 and isinstance(ctr.__code__, types.CodeType)
533 )
534
535 if has_init:
536 names, has_kw = _inspect_func_args(ctr)
537 _set.update(names)
538
539 if not has_kw and not toplevel:
540 if raiseerr:
541 raise TypeError(
542 f"given cls {cls} doesn't have an __init__ method"
543 )
544 else:
545 return None
546 else:
547 has_kw = False
548
549 if not has_init or has_kw:
550 for c in cls.__bases__:
551 if get_cls_kwargs(c, _set=_set) is None:
552 break
553
554 _set.discard("self")
555 return _set
556
557
558def get_func_kwargs(func: Callable[..., Any]) -> List[str]:
559 """Return the set of legal kwargs for the given `func`.
560
561 Uses getargspec so is safe to call for methods, functions,
562 etc.
563
564 """
565
566 return compat.inspect_getfullargspec(func)[0]
567
568
569def get_callable_argspec(
570 fn: Callable[..., Any], no_self: bool = False, _is_init: bool = False
571) -> compat.FullArgSpec:
572 """Return the argument signature for any callable.
573
574 All pure-Python callables are accepted, including
575 functions, methods, classes, objects with __call__;
576 builtins and other edge cases like functools.partial() objects
577 raise a TypeError.
578
579 """
580 if inspect.isbuiltin(fn):
581 raise TypeError("Can't inspect builtin: %s" % fn)
582 elif inspect.isfunction(fn):
583 if _is_init and no_self:
584 spec = compat.inspect_getfullargspec(fn)
585 return compat.FullArgSpec(
586 spec.args[1:],
587 spec.varargs,
588 spec.varkw,
589 spec.defaults,
590 spec.kwonlyargs,
591 spec.kwonlydefaults,
592 spec.annotations,
593 )
594 else:
595 return compat.inspect_getfullargspec(fn)
596 elif inspect.ismethod(fn):
597 if no_self and (_is_init or fn.__self__):
598 spec = compat.inspect_getfullargspec(fn.__func__)
599 return compat.FullArgSpec(
600 spec.args[1:],
601 spec.varargs,
602 spec.varkw,
603 spec.defaults,
604 spec.kwonlyargs,
605 spec.kwonlydefaults,
606 spec.annotations,
607 )
608 else:
609 return compat.inspect_getfullargspec(fn.__func__)
610 elif inspect.isclass(fn):
611 return get_callable_argspec(
612 fn.__init__, no_self=no_self, _is_init=True
613 )
614 elif hasattr(fn, "__func__"):
615 return compat.inspect_getfullargspec(fn.__func__)
616 elif hasattr(fn, "__call__"):
617 if inspect.ismethod(fn.__call__):
618 return get_callable_argspec(fn.__call__, no_self=no_self)
619 else:
620 raise TypeError("Can't inspect callable: %s" % fn)
621 else:
622 raise TypeError("Can't inspect callable: %s" % fn)
623
624
625def format_argspec_plus(
626 fn: Union[Callable[..., Any], compat.FullArgSpec], grouped: bool = True
627) -> Dict[str, Optional[str]]:
628 """Returns a dictionary of formatted, introspected function arguments.
629
630 A enhanced variant of inspect.formatargspec to support code generation.
631
632 fn
633 An inspectable callable or tuple of inspect getargspec() results.
634 grouped
635 Defaults to True; include (parens, around, argument) lists
636
637 Returns:
638
639 args
640 Full inspect.formatargspec for fn
641 self_arg
642 The name of the first positional argument, varargs[0], or None
643 if the function defines no positional arguments.
644 apply_pos
645 args, re-written in calling rather than receiving syntax. Arguments are
646 passed positionally.
647 apply_kw
648 Like apply_pos, except keyword-ish args are passed as keywords.
649 apply_pos_proxied
650 Like apply_pos but omits the self/cls argument
651
652 Example::
653
654 >>> format_argspec_plus(lambda self, a, b, c=3, **d: 123)
655 {'grouped_args': '(self, a, b, c=3, **d)',
656 'self_arg': 'self',
657 'apply_kw': '(self, a, b, c=c, **d)',
658 'apply_pos': '(self, a, b, c, **d)'}
659
660 """
661 if callable(fn):
662 spec = compat.inspect_getfullargspec(fn)
663 else:
664 spec = fn
665
666 args = compat.inspect_formatargspec(*spec)
667
668 apply_pos = compat.inspect_formatargspec(
669 spec[0], spec[1], spec[2], None, spec[4]
670 )
671
672 if spec[0]:
673 self_arg = spec[0][0]
674
675 apply_pos_proxied = compat.inspect_formatargspec(
676 spec[0][1:], spec[1], spec[2], None, spec[4]
677 )
678
679 elif spec[1]:
680 # I'm not sure what this is
681 self_arg = "%s[0]" % spec[1]
682
683 apply_pos_proxied = apply_pos
684 else:
685 self_arg = None
686 apply_pos_proxied = apply_pos
687
688 num_defaults = 0
689 if spec[3]:
690 num_defaults += len(cast(Tuple[Any], spec[3]))
691 if spec[4]:
692 num_defaults += len(spec[4])
693
694 name_args = spec[0] + spec[4]
695
696 defaulted_vals: Union[List[str], Tuple[()]]
697
698 if num_defaults:
699 defaulted_vals = name_args[0 - num_defaults :]
700 else:
701 defaulted_vals = ()
702
703 apply_kw = compat.inspect_formatargspec(
704 name_args,
705 spec[1],
706 spec[2],
707 defaulted_vals,
708 formatvalue=lambda x: "=" + str(x),
709 )
710
711 if spec[0]:
712 apply_kw_proxied = compat.inspect_formatargspec(
713 name_args[1:],
714 spec[1],
715 spec[2],
716 defaulted_vals,
717 formatvalue=lambda x: "=" + str(x),
718 )
719 else:
720 apply_kw_proxied = apply_kw
721
722 if grouped:
723 return dict(
724 grouped_args=args,
725 self_arg=self_arg,
726 apply_pos=apply_pos,
727 apply_kw=apply_kw,
728 apply_pos_proxied=apply_pos_proxied,
729 apply_kw_proxied=apply_kw_proxied,
730 )
731 else:
732 return dict(
733 grouped_args=args,
734 self_arg=self_arg,
735 apply_pos=apply_pos[1:-1],
736 apply_kw=apply_kw[1:-1],
737 apply_pos_proxied=apply_pos_proxied[1:-1],
738 apply_kw_proxied=apply_kw_proxied[1:-1],
739 )
740
741
742def format_argspec_init(method, grouped=True):
743 """format_argspec_plus with considerations for typical __init__ methods
744
745 Wraps format_argspec_plus with error handling strategies for typical
746 __init__ cases:
747
748 .. sourcecode:: text
749
750 object.__init__ -> (self)
751 other unreflectable (usually C) -> (self, *args, **kwargs)
752
753 """
754 if method is object.__init__:
755 grouped_args = "(self)"
756 args = "(self)" if grouped else "self"
757 proxied = "()" if grouped else ""
758 else:
759 try:
760 return format_argspec_plus(method, grouped=grouped)
761 except TypeError:
762 grouped_args = "(self, *args, **kwargs)"
763 args = grouped_args if grouped else "self, *args, **kwargs"
764 proxied = "(*args, **kwargs)" if grouped else "*args, **kwargs"
765 return dict(
766 self_arg="self",
767 grouped_args=grouped_args,
768 apply_pos=args,
769 apply_kw=args,
770 apply_pos_proxied=proxied,
771 apply_kw_proxied=proxied,
772 )
773
774
775def create_proxy_methods(
776 target_cls: Type[Any],
777 target_cls_sphinx_name: str,
778 proxy_cls_sphinx_name: str,
779 classmethods: Sequence[str] = (),
780 methods: Sequence[str] = (),
781 attributes: Sequence[str] = (),
782 use_intermediate_variable: Sequence[str] = (),
783) -> Callable[[_T], _T]:
784 """A class decorator indicating attributes should refer to a proxy
785 class.
786
787 This decorator is now a "marker" that does nothing at runtime. Instead,
788 it is consumed by the tools/generate_proxy_methods.py script to
789 statically generate proxy methods and attributes that are fully
790 recognized by typing tools such as mypy.
791
792 """
793
794 def decorate(cls):
795 return cls
796
797 return decorate
798
799
800def getargspec_init(method):
801 """inspect.getargspec with considerations for typical __init__ methods
802
803 Wraps inspect.getargspec with error handling for typical __init__ cases:
804
805 .. sourcecode:: text
806
807 object.__init__ -> (self)
808 other unreflectable (usually C) -> (self, *args, **kwargs)
809
810 """
811 try:
812 return compat.inspect_getfullargspec(method)
813 except TypeError:
814 if method is object.__init__:
815 return (["self"], None, None, None)
816 else:
817 return (["self"], "args", "kwargs", None)
818
819
820def unbound_method_to_callable(func_or_cls):
821 """Adjust the incoming callable such that a 'self' argument is not
822 required.
823
824 """
825
826 if isinstance(func_or_cls, types.MethodType) and not func_or_cls.__self__:
827 return func_or_cls.__func__
828 else:
829 return func_or_cls
830
831
832def generic_repr(
833 obj: Any,
834 additional_kw: Sequence[Tuple[str, Any]] = (),
835 to_inspect: Optional[Union[object, List[object]]] = None,
836 omit_kwarg: Sequence[str] = (),
837) -> str:
838 """Produce a __repr__() based on direct association of the __init__()
839 specification vs. same-named attributes present.
840
841 """
842 if to_inspect is None:
843 to_inspect = [obj]
844 else:
845 to_inspect = _collections.to_list(to_inspect)
846
847 missing = object()
848
849 pos_args = []
850 kw_args: _collections.OrderedDict[str, Any] = _collections.OrderedDict()
851 vargs = None
852 for i, insp in enumerate(to_inspect):
853 try:
854 spec = compat.inspect_getfullargspec(insp.__init__)
855 except TypeError:
856 continue
857 else:
858 default_len = len(spec.defaults) if spec.defaults else 0
859 if i == 0:
860 if spec.varargs:
861 vargs = spec.varargs
862 if default_len:
863 pos_args.extend(spec.args[1:-default_len])
864 else:
865 pos_args.extend(spec.args[1:])
866 else:
867 kw_args.update(
868 [(arg, missing) for arg in spec.args[1:-default_len]]
869 )
870
871 if default_len:
872 assert spec.defaults
873 kw_args.update(
874 [
875 (arg, default)
876 for arg, default in zip(
877 spec.args[-default_len:], spec.defaults
878 )
879 ]
880 )
881 output: List[str] = []
882
883 output.extend(repr(getattr(obj, arg, None)) for arg in pos_args)
884
885 if vargs is not None and hasattr(obj, vargs):
886 output.extend([repr(val) for val in getattr(obj, vargs)])
887
888 for arg, defval in kw_args.items():
889 if arg in omit_kwarg:
890 continue
891 try:
892 val = getattr(obj, arg, missing)
893 if val is not missing and val != defval:
894 output.append("%s=%r" % (arg, val))
895 except Exception:
896 pass
897
898 if additional_kw:
899 for arg, defval in additional_kw:
900 try:
901 val = getattr(obj, arg, missing)
902 if val is not missing and val != defval:
903 output.append("%s=%r" % (arg, val))
904 except Exception:
905 pass
906
907 return "%s(%s)" % (obj.__class__.__name__, ", ".join(output))
908
909
910class portable_instancemethod:
911 """Turn an instancemethod into a (parent, name) pair
912 to produce a serializable callable.
913
914 """
915
916 __slots__ = "target", "name", "kwargs", "__weakref__"
917
918 def __getstate__(self):
919 return {
920 "target": self.target,
921 "name": self.name,
922 "kwargs": self.kwargs,
923 }
924
925 def __setstate__(self, state):
926 self.target = state["target"]
927 self.name = state["name"]
928 self.kwargs = state.get("kwargs", ())
929
930 def __init__(self, meth, kwargs=()):
931 self.target = meth.__self__
932 self.name = meth.__name__
933 self.kwargs = kwargs
934
935 def __call__(self, *arg, **kw):
936 kw.update(self.kwargs)
937 return getattr(self.target, self.name)(*arg, **kw)
938
939
940def class_hierarchy(cls):
941 """Return an unordered sequence of all classes related to cls.
942
943 Traverses diamond hierarchies.
944
945 Fibs slightly: subclasses of builtin types are not returned. Thus
946 class_hierarchy(class A(object)) returns (A, object), not A plus every
947 class systemwide that derives from object.
948
949 """
950
951 hier = {cls}
952 process = list(cls.__mro__)
953 while process:
954 c = process.pop()
955 bases = (_ for _ in c.__bases__ if _ not in hier)
956
957 for b in bases:
958 process.append(b)
959 hier.add(b)
960
961 if c.__module__ == "builtins" or not hasattr(c, "__subclasses__"):
962 continue
963
964 for s in [
965 _
966 for _ in (
967 c.__subclasses__()
968 if not issubclass(c, type)
969 else c.__subclasses__(c)
970 )
971 if _ not in hier
972 ]:
973 process.append(s)
974 hier.add(s)
975 return list(hier)
976
977
978def iterate_attributes(cls):
979 """iterate all the keys and attributes associated
980 with a class, without using getattr().
981
982 Does not use getattr() so that class-sensitive
983 descriptors (i.e. property.__get__()) are not called.
984
985 """
986 keys = dir(cls)
987 for key in keys:
988 for c in cls.__mro__:
989 if key in c.__dict__:
990 yield (key, c.__dict__[key])
991 break
992
993
994def monkeypatch_proxied_specials(
995 into_cls,
996 from_cls,
997 skip=None,
998 only=None,
999 name="self.proxy",
1000 from_instance=None,
1001):
1002 """Automates delegation of __specials__ for a proxying type."""
1003
1004 if only:
1005 dunders = only
1006 else:
1007 if skip is None:
1008 skip = (
1009 "__slots__",
1010 "__del__",
1011 "__getattribute__",
1012 "__metaclass__",
1013 "__getstate__",
1014 "__setstate__",
1015 )
1016 dunders = [
1017 m
1018 for m in dir(from_cls)
1019 if (
1020 m.startswith("__")
1021 and m.endswith("__")
1022 and not hasattr(into_cls, m)
1023 and m not in skip
1024 )
1025 ]
1026
1027 for method in dunders:
1028 try:
1029 maybe_fn = getattr(from_cls, method)
1030 if not hasattr(maybe_fn, "__call__"):
1031 continue
1032 maybe_fn = getattr(maybe_fn, "__func__", maybe_fn)
1033 fn = cast(types.FunctionType, maybe_fn)
1034
1035 except AttributeError:
1036 continue
1037 try:
1038 spec = compat.inspect_getfullargspec(fn)
1039 fn_args = compat.inspect_formatargspec(spec[0])
1040 d_args = compat.inspect_formatargspec(spec[0][1:])
1041 except TypeError:
1042 fn_args = "(self, *args, **kw)"
1043 d_args = "(*args, **kw)"
1044
1045 py = (
1046 "def %(method)s%(fn_args)s: "
1047 "return %(name)s.%(method)s%(d_args)s" % locals()
1048 )
1049
1050 env: Dict[str, types.FunctionType] = (
1051 from_instance is not None and {name: from_instance} or {}
1052 )
1053 exec(py, env)
1054 try:
1055 env[method].__defaults__ = fn.__defaults__
1056 except AttributeError:
1057 pass
1058 setattr(into_cls, method, env[method])
1059
1060
1061def methods_equivalent(meth1, meth2):
1062 """Return True if the two methods are the same implementation."""
1063
1064 return getattr(meth1, "__func__", meth1) is getattr(
1065 meth2, "__func__", meth2
1066 )
1067
1068
1069def as_interface(obj, cls=None, methods=None, required=None):
1070 """Ensure basic interface compliance for an instance or dict of callables.
1071
1072 Checks that ``obj`` implements public methods of ``cls`` or has members
1073 listed in ``methods``. If ``required`` is not supplied, implementing at
1074 least one interface method is sufficient. Methods present on ``obj`` that
1075 are not in the interface are ignored.
1076
1077 If ``obj`` is a dict and ``dict`` does not meet the interface
1078 requirements, the keys of the dictionary are inspected. Keys present in
1079 ``obj`` that are not in the interface will raise TypeErrors.
1080
1081 Raises TypeError if ``obj`` does not meet the interface criteria.
1082
1083 In all passing cases, an object with callable members is returned. In the
1084 simple case, ``obj`` is returned as-is; if dict processing kicks in then
1085 an anonymous class is returned.
1086
1087 obj
1088 A type, instance, or dictionary of callables.
1089 cls
1090 Optional, a type. All public methods of cls are considered the
1091 interface. An ``obj`` instance of cls will always pass, ignoring
1092 ``required``..
1093 methods
1094 Optional, a sequence of method names to consider as the interface.
1095 required
1096 Optional, a sequence of mandatory implementations. If omitted, an
1097 ``obj`` that provides at least one interface method is considered
1098 sufficient. As a convenience, required may be a type, in which case
1099 all public methods of the type are required.
1100
1101 """
1102 if not cls and not methods:
1103 raise TypeError("a class or collection of method names are required")
1104
1105 if isinstance(cls, type) and isinstance(obj, cls):
1106 return obj
1107
1108 interface = set(methods or [m for m in dir(cls) if not m.startswith("_")])
1109 implemented = set(dir(obj))
1110
1111 complies = operator.ge
1112 if isinstance(required, type):
1113 required = interface
1114 elif not required:
1115 required = set()
1116 complies = operator.gt
1117 else:
1118 required = set(required)
1119
1120 if complies(implemented.intersection(interface), required):
1121 return obj
1122
1123 # No dict duck typing here.
1124 if not isinstance(obj, dict):
1125 qualifier = complies is operator.gt and "any of" or "all of"
1126 raise TypeError(
1127 "%r does not implement %s: %s"
1128 % (obj, qualifier, ", ".join(interface))
1129 )
1130
1131 class AnonymousInterface:
1132 """A callable-holding shell."""
1133
1134 if cls:
1135 AnonymousInterface.__name__ = "Anonymous" + cls.__name__
1136 found = set()
1137
1138 for method, impl in dictlike_iteritems(obj):
1139 if method not in interface:
1140 raise TypeError("%r: unknown in this interface" % method)
1141 if not callable(impl):
1142 raise TypeError("%r=%r is not callable" % (method, impl))
1143 setattr(AnonymousInterface, method, staticmethod(impl))
1144 found.add(method)
1145
1146 if complies(found, required):
1147 return AnonymousInterface
1148
1149 raise TypeError(
1150 "dictionary does not contain required keys %s"
1151 % ", ".join(required - found)
1152 )
1153
1154
1155_GFD = TypeVar("_GFD", bound="generic_fn_descriptor[Any]")
1156
1157
1158class generic_fn_descriptor(Generic[_T_co]):
1159 """Descriptor which proxies a function when the attribute is not
1160 present in dict
1161
1162 This superclass is organized in a particular way with "memoized" and
1163 "non-memoized" implementation classes that are hidden from type checkers,
1164 as Mypy seems to not be able to handle seeing multiple kinds of descriptor
1165 classes used for the same attribute.
1166
1167 """
1168
1169 fget: Callable[..., _T_co]
1170 __doc__: Optional[str]
1171 __name__: str
1172
1173 def __init__(self, fget: Callable[..., _T_co], doc: Optional[str] = None):
1174 self.fget = fget
1175 self.__doc__ = doc or fget.__doc__
1176 self.__name__ = fget.__name__
1177
1178 @overload
1179 def __get__(self: _GFD, obj: None, cls: Any) -> _GFD: ...
1180
1181 @overload
1182 def __get__(self, obj: object, cls: Any) -> _T_co: ...
1183
1184 def __get__(self: _GFD, obj: Any, cls: Any) -> Union[_GFD, _T_co]:
1185 raise NotImplementedError()
1186
1187 if TYPE_CHECKING:
1188
1189 def __set__(self, instance: Any, value: Any) -> None: ...
1190
1191 def __delete__(self, instance: Any) -> None: ...
1192
1193 def _reset(self, obj: Any) -> None:
1194 raise NotImplementedError()
1195
1196 @classmethod
1197 def reset(cls, obj: Any, name: str) -> None:
1198 raise NotImplementedError()
1199
1200
1201class _non_memoized_property(generic_fn_descriptor[_T_co]):
1202 """a plain descriptor that proxies a function.
1203
1204 primary rationale is to provide a plain attribute that's
1205 compatible with memoized_property which is also recognized as equivalent
1206 by mypy.
1207
1208 """
1209
1210 if not TYPE_CHECKING:
1211
1212 def __get__(self, obj, cls):
1213 if obj is None:
1214 return self
1215 return self.fget(obj)
1216
1217
1218class _memoized_property(generic_fn_descriptor[_T_co]):
1219 """A read-only @property that is only evaluated once."""
1220
1221 if not TYPE_CHECKING:
1222
1223 def __get__(self, obj, cls):
1224 if obj is None:
1225 return self
1226 obj.__dict__[self.__name__] = result = self.fget(obj)
1227 return result
1228
1229 def _reset(self, obj):
1230 _memoized_property.reset(obj, self.__name__)
1231
1232 @classmethod
1233 def reset(cls, obj, name):
1234 obj.__dict__.pop(name, None)
1235
1236
1237# despite many attempts to get Mypy to recognize an overridden descriptor
1238# where one is memoized and the other isn't, there seems to be no reliable
1239# way other than completely deceiving the type checker into thinking there
1240# is just one single descriptor type everywhere. Otherwise, if a superclass
1241# has non-memoized and subclass has memoized, that requires
1242# "class memoized(non_memoized)". but then if a superclass has memoized and
1243# superclass has non-memoized, the class hierarchy of the descriptors
1244# would need to be reversed; "class non_memoized(memoized)". so there's no
1245# way to achieve this.
1246# additional issues, RO properties:
1247# https://github.com/python/mypy/issues/12440
1248if TYPE_CHECKING:
1249 # allow memoized and non-memoized to be freely mixed by having them
1250 # be the same class
1251 memoized_property = generic_fn_descriptor
1252 non_memoized_property = generic_fn_descriptor
1253
1254 # for read only situations, mypy only sees @property as read only.
1255 # read only is needed when a subtype specializes the return type
1256 # of a property, meaning assignment needs to be disallowed
1257 ro_memoized_property = property
1258 ro_non_memoized_property = property
1259
1260else:
1261 memoized_property = ro_memoized_property = _memoized_property
1262 non_memoized_property = ro_non_memoized_property = _non_memoized_property
1263
1264
1265def memoized_instancemethod(fn: _F) -> _F:
1266 """Decorate a method memoize its return value.
1267
1268 Best applied to no-arg methods: memoization is not sensitive to
1269 argument values, and will always return the same value even when
1270 called with different arguments.
1271
1272 """
1273
1274 def oneshot(self, *args, **kw):
1275 result = fn(self, *args, **kw)
1276
1277 def memo(*a, **kw):
1278 return result
1279
1280 memo.__name__ = fn.__name__
1281 memo.__doc__ = fn.__doc__
1282 self.__dict__[fn.__name__] = memo
1283 return result
1284
1285 return update_wrapper(oneshot, fn) # type: ignore
1286
1287
1288class HasMemoized:
1289 """A mixin class that maintains the names of memoized elements in a
1290 collection for easy cache clearing, generative, etc.
1291
1292 """
1293
1294 if not TYPE_CHECKING:
1295 # support classes that want to have __slots__ with an explicit
1296 # slot for __dict__. not sure if that requires base __slots__ here.
1297 __slots__ = ()
1298
1299 _memoized_keys: FrozenSet[str] = frozenset()
1300
1301 def _reset_memoizations(self) -> None:
1302 for elem in self._memoized_keys:
1303 self.__dict__.pop(elem, None)
1304
1305 def _assert_no_memoizations(self) -> None:
1306 for elem in self._memoized_keys:
1307 assert elem not in self.__dict__
1308
1309 def _set_memoized_attribute(self, key: str, value: Any) -> None:
1310 self.__dict__[key] = value
1311 self._memoized_keys |= {key}
1312
1313 class memoized_attribute(memoized_property[_T]):
1314 """A read-only @property that is only evaluated once.
1315
1316 :meta private:
1317
1318 """
1319
1320 fget: Callable[..., _T]
1321 __doc__: Optional[str]
1322 __name__: str
1323
1324 def __init__(self, fget: Callable[..., _T], doc: Optional[str] = None):
1325 self.fget = fget
1326 self.__doc__ = doc or fget.__doc__
1327 self.__name__ = fget.__name__
1328
1329 @overload
1330 def __get__(self: _MA, obj: None, cls: Any) -> _MA: ...
1331
1332 @overload
1333 def __get__(self, obj: Any, cls: Any) -> _T: ...
1334
1335 def __get__(self, obj, cls):
1336 if obj is None:
1337 return self
1338 obj.__dict__[self.__name__] = result = self.fget(obj)
1339 obj._memoized_keys |= {self.__name__}
1340 return result
1341
1342 @classmethod
1343 def memoized_instancemethod(cls, fn: _F) -> _F:
1344 """Decorate a method memoize its return value.
1345
1346 :meta private:
1347
1348 """
1349
1350 def oneshot(self: Any, *args: Any, **kw: Any) -> Any:
1351 result = fn(self, *args, **kw)
1352
1353 def memo(*a, **kw):
1354 return result
1355
1356 memo.__name__ = fn.__name__
1357 memo.__doc__ = fn.__doc__
1358 self.__dict__[fn.__name__] = memo
1359 self._memoized_keys |= {fn.__name__}
1360 return result
1361
1362 return update_wrapper(oneshot, fn) # type: ignore
1363
1364
1365if TYPE_CHECKING:
1366 HasMemoized_ro_memoized_attribute = property
1367else:
1368 HasMemoized_ro_memoized_attribute = HasMemoized.memoized_attribute
1369
1370
1371class MemoizedSlots:
1372 """Apply memoized items to an object using a __getattr__ scheme.
1373
1374 This allows the functionality of memoized_property and
1375 memoized_instancemethod to be available to a class using __slots__.
1376
1377 """
1378
1379 __slots__ = ()
1380
1381 def _fallback_getattr(self, key):
1382 raise AttributeError(key)
1383
1384 def __getattr__(self, key: str) -> Any:
1385 if key.startswith("_memoized_attr_") or key.startswith(
1386 "_memoized_method_"
1387 ):
1388 raise AttributeError(key)
1389 # to avoid recursion errors when interacting with other __getattr__
1390 # schemes that refer to this one, when testing for memoized method
1391 # look at __class__ only rather than going into __getattr__ again.
1392 elif hasattr(self.__class__, f"_memoized_attr_{key}"):
1393 value = getattr(self, f"_memoized_attr_{key}")()
1394 setattr(self, key, value)
1395 return value
1396 elif hasattr(self.__class__, f"_memoized_method_{key}"):
1397 fn = getattr(self, f"_memoized_method_{key}")
1398
1399 def oneshot(*args, **kw):
1400 result = fn(*args, **kw)
1401
1402 def memo(*a, **kw):
1403 return result
1404
1405 memo.__name__ = fn.__name__
1406 memo.__doc__ = fn.__doc__
1407 setattr(self, key, memo)
1408 return result
1409
1410 oneshot.__doc__ = fn.__doc__
1411 return oneshot
1412 else:
1413 return self._fallback_getattr(key)
1414
1415
1416# from paste.deploy.converters
1417def asbool(obj: Any) -> bool:
1418 if isinstance(obj, str):
1419 obj = obj.strip().lower()
1420 if obj in ["true", "yes", "on", "y", "t", "1"]:
1421 return True
1422 elif obj in ["false", "no", "off", "n", "f", "0"]:
1423 return False
1424 else:
1425 raise ValueError("String is not true/false: %r" % obj)
1426 return bool(obj)
1427
1428
1429def bool_or_str(*text: str) -> Callable[[str], Union[str, bool]]:
1430 """Return a callable that will evaluate a string as
1431 boolean, or one of a set of "alternate" string values.
1432
1433 """
1434
1435 def bool_or_value(obj: str) -> Union[str, bool]:
1436 if obj in text:
1437 return obj
1438 else:
1439 return asbool(obj)
1440
1441 return bool_or_value
1442
1443
1444def asint(value: Any) -> Optional[int]:
1445 """Coerce to integer."""
1446
1447 if value is None:
1448 return value
1449 return int(value)
1450
1451
1452def coerce_kw_type(
1453 kw: Dict[str, Any],
1454 key: str,
1455 type_: Type[Any],
1456 flexi_bool: bool = True,
1457 dest: Optional[Dict[str, Any]] = None,
1458) -> None:
1459 r"""If 'key' is present in dict 'kw', coerce its value to type 'type\_' if
1460 necessary. If 'flexi_bool' is True, the string '0' is considered false
1461 when coercing to boolean.
1462 """
1463
1464 if dest is None:
1465 dest = kw
1466
1467 if (
1468 key in kw
1469 and (not isinstance(type_, type) or not isinstance(kw[key], type_))
1470 and kw[key] is not None
1471 ):
1472 if type_ is bool and flexi_bool:
1473 dest[key] = asbool(kw[key])
1474 else:
1475 dest[key] = type_(kw[key])
1476
1477
1478def constructor_key(obj: Any, cls: Type[Any]) -> Tuple[Any, ...]:
1479 """Produce a tuple structure that is cacheable using the __dict__ of
1480 obj to retrieve values
1481
1482 """
1483 names = get_cls_kwargs(cls)
1484 return (cls,) + tuple(
1485 (k, obj.__dict__[k]) for k in names if k in obj.__dict__
1486 )
1487
1488
1489def constructor_copy(obj: _T, cls: Type[_T], *args: Any, **kw: Any) -> _T:
1490 """Instantiate cls using the __dict__ of obj as constructor arguments.
1491
1492 Uses inspect to match the named arguments of ``cls``.
1493
1494 """
1495
1496 names = get_cls_kwargs(cls)
1497 kw.update(
1498 (k, obj.__dict__[k]) for k in names.difference(kw) if k in obj.__dict__
1499 )
1500 return cls(*args, **kw)
1501
1502
1503def counter() -> Callable[[], int]:
1504 """Return a threadsafe counter function."""
1505
1506 lock = threading.Lock()
1507 counter = itertools.count(1)
1508
1509 # avoid the 2to3 "next" transformation...
1510 def _next():
1511 with lock:
1512 return next(counter)
1513
1514 return _next
1515
1516
1517def duck_type_collection(
1518 specimen: Any, default: Optional[Type[Any]] = None
1519) -> Optional[Type[Any]]:
1520 """Given an instance or class, guess if it is or is acting as one of
1521 the basic collection types: list, set and dict. If the __emulates__
1522 property is present, return that preferentially.
1523 """
1524
1525 if hasattr(specimen, "__emulates__"):
1526 # canonicalize set vs sets.Set to a standard: the builtin set
1527 if specimen.__emulates__ is not None and issubclass(
1528 specimen.__emulates__, set
1529 ):
1530 return set
1531 else:
1532 return specimen.__emulates__ # type: ignore
1533
1534 isa = issubclass if isinstance(specimen, type) else isinstance
1535 if isa(specimen, list):
1536 return list
1537 elif isa(specimen, set):
1538 return set
1539 elif isa(specimen, dict):
1540 return dict
1541
1542 if hasattr(specimen, "append"):
1543 return list
1544 elif hasattr(specimen, "add"):
1545 return set
1546 elif hasattr(specimen, "set"):
1547 return dict
1548 else:
1549 return default
1550
1551
1552def assert_arg_type(
1553 arg: Any, argtype: Union[Tuple[Type[Any], ...], Type[Any]], name: str
1554) -> Any:
1555 if isinstance(arg, argtype):
1556 return arg
1557 else:
1558 if isinstance(argtype, tuple):
1559 raise exc.ArgumentError(
1560 "Argument '%s' is expected to be one of type %s, got '%s'"
1561 % (name, " or ".join("'%s'" % a for a in argtype), type(arg))
1562 )
1563 else:
1564 raise exc.ArgumentError(
1565 "Argument '%s' is expected to be of type '%s', got '%s'"
1566 % (name, argtype, type(arg))
1567 )
1568
1569
1570def dictlike_iteritems(dictlike):
1571 """Return a (key, value) iterator for almost any dict-like object."""
1572
1573 if hasattr(dictlike, "items"):
1574 return list(dictlike.items())
1575
1576 getter = getattr(dictlike, "__getitem__", getattr(dictlike, "get", None))
1577 if getter is None:
1578 raise TypeError("Object '%r' is not dict-like" % dictlike)
1579
1580 if hasattr(dictlike, "iterkeys"):
1581
1582 def iterator():
1583 for key in dictlike.iterkeys():
1584 assert getter is not None
1585 yield key, getter(key)
1586
1587 return iterator()
1588 elif hasattr(dictlike, "keys"):
1589 return iter((key, getter(key)) for key in dictlike.keys())
1590 else:
1591 raise TypeError("Object '%r' is not dict-like" % dictlike)
1592
1593
1594class classproperty(property):
1595 """A decorator that behaves like @property except that operates
1596 on classes rather than instances.
1597
1598 The decorator is currently special when using the declarative
1599 module, but note that the
1600 :class:`~.sqlalchemy.ext.declarative.declared_attr`
1601 decorator should be used for this purpose with declarative.
1602
1603 """
1604
1605 fget: Callable[[Any], Any]
1606
1607 def __init__(self, fget: Callable[[Any], Any], *arg: Any, **kw: Any):
1608 super().__init__(fget, *arg, **kw)
1609 self.__doc__ = fget.__doc__
1610
1611 def __get__(self, obj: Any, cls: Optional[type] = None) -> Any:
1612 return self.fget(cls)
1613
1614
1615class hybridproperty(Generic[_T]):
1616 def __init__(self, func: Callable[..., _T]):
1617 self.func = func
1618 self.clslevel = func
1619
1620 def __get__(self, instance: Any, owner: Any) -> _T:
1621 if instance is None:
1622 clsval = self.clslevel(owner)
1623 return clsval
1624 else:
1625 return self.func(instance)
1626
1627 def classlevel(self, func: Callable[..., Any]) -> hybridproperty[_T]:
1628 self.clslevel = func
1629 return self
1630
1631
1632class rw_hybridproperty(Generic[_T]):
1633 def __init__(self, func: Callable[..., _T]):
1634 self.func = func
1635 self.clslevel = func
1636 self.setfn: Optional[Callable[..., Any]] = None
1637
1638 def __get__(self, instance: Any, owner: Any) -> _T:
1639 if instance is None:
1640 clsval = self.clslevel(owner)
1641 return clsval
1642 else:
1643 return self.func(instance)
1644
1645 def __set__(self, instance: Any, value: Any) -> None:
1646 assert self.setfn is not None
1647 self.setfn(instance, value)
1648
1649 def setter(self, func: Callable[..., Any]) -> rw_hybridproperty[_T]:
1650 self.setfn = func
1651 return self
1652
1653 def classlevel(self, func: Callable[..., Any]) -> rw_hybridproperty[_T]:
1654 self.clslevel = func
1655 return self
1656
1657
1658class hybridmethod(Generic[_T]):
1659 """Decorate a function as cls- or instance- level."""
1660
1661 def __init__(self, func: Callable[..., _T]):
1662 self.func = self.__func__ = func
1663 self.clslevel = func
1664
1665 def __get__(self, instance: Any, owner: Any) -> Callable[..., _T]:
1666 if instance is None:
1667 return self.clslevel.__get__(owner, owner.__class__) # type:ignore
1668 else:
1669 return self.func.__get__(instance, owner) # type:ignore
1670
1671 def classlevel(self, func: Callable[..., Any]) -> hybridmethod[_T]:
1672 self.clslevel = func
1673 return self
1674
1675
1676class symbol(int):
1677 """A constant symbol.
1678
1679 >>> symbol("foo") is symbol("foo")
1680 True
1681 >>> symbol("foo")
1682 <symbol 'foo>
1683
1684 A slight refinement of the MAGICCOOKIE=object() pattern. The primary
1685 advantage of symbol() is its repr(). They are also singletons.
1686
1687 Repeated calls of symbol('name') will all return the same instance.
1688
1689 """
1690
1691 name: str
1692
1693 symbols: Dict[str, symbol] = {}
1694 _lock = threading.Lock()
1695
1696 def __new__(
1697 cls,
1698 name: str,
1699 doc: Optional[str] = None,
1700 canonical: Optional[int] = None,
1701 ) -> symbol:
1702 with cls._lock:
1703 sym = cls.symbols.get(name)
1704 if sym is None:
1705 assert isinstance(name, str)
1706 if canonical is None:
1707 canonical = hash(name)
1708 sym = int.__new__(symbol, canonical)
1709 sym.name = name
1710 if doc:
1711 sym.__doc__ = doc
1712
1713 # NOTE: we should ultimately get rid of this global thing,
1714 # however, currently it is to support pickling. The best
1715 # change would be when we are on py3.11 at a minimum, we
1716 # switch to stdlib enum.IntFlag.
1717 cls.symbols[name] = sym
1718 else:
1719 if canonical and canonical != sym:
1720 raise TypeError(
1721 f"Can't replace canonical symbol for {name!r} "
1722 f"with new int value {canonical}"
1723 )
1724 return sym
1725
1726 def __reduce__(self):
1727 return symbol, (self.name, "x", int(self))
1728
1729 def __str__(self):
1730 return repr(self)
1731
1732 def __repr__(self):
1733 return f"symbol({self.name!r})"
1734
1735
1736class _IntFlagMeta(type):
1737 def __init__(
1738 cls,
1739 classname: str,
1740 bases: Tuple[Type[Any], ...],
1741 dict_: Dict[str, Any],
1742 **kw: Any,
1743 ) -> None:
1744 items: List[symbol]
1745 cls._items = items = []
1746 for k, v in dict_.items():
1747 if re.match(r"^__.*__$", k):
1748 continue
1749 if isinstance(v, int):
1750 sym = symbol(k, canonical=v)
1751 elif not k.startswith("_"):
1752 raise TypeError("Expected integer values for IntFlag")
1753 else:
1754 continue
1755 setattr(cls, k, sym)
1756 items.append(sym)
1757
1758 cls.__members__ = _collections.immutabledict(
1759 {sym.name: sym for sym in items}
1760 )
1761
1762 def __iter__(self) -> Iterator[symbol]:
1763 raise NotImplementedError(
1764 "iter not implemented to ensure compatibility with "
1765 "Python 3.11 IntFlag. Please use __members__. See "
1766 "https://github.com/python/cpython/issues/99304"
1767 )
1768
1769
1770class _FastIntFlag(metaclass=_IntFlagMeta):
1771 """An 'IntFlag' copycat that isn't slow when performing bitwise
1772 operations.
1773
1774 the ``FastIntFlag`` class will return ``enum.IntFlag`` under TYPE_CHECKING
1775 and ``_FastIntFlag`` otherwise.
1776
1777 """
1778
1779
1780if TYPE_CHECKING:
1781 from enum import IntFlag
1782
1783 FastIntFlag = IntFlag
1784else:
1785 FastIntFlag = _FastIntFlag
1786
1787
1788_E = TypeVar("_E", bound=enum.Enum)
1789
1790
1791def parse_user_argument_for_enum(
1792 arg: Any,
1793 choices: Dict[_E, List[Any]],
1794 name: str,
1795 resolve_symbol_names: bool = False,
1796) -> Optional[_E]:
1797 """Given a user parameter, parse the parameter into a chosen value
1798 from a list of choice objects, typically Enum values.
1799
1800 The user argument can be a string name that matches the name of a
1801 symbol, or the symbol object itself, or any number of alternate choices
1802 such as True/False/ None etc.
1803
1804 :param arg: the user argument.
1805 :param choices: dictionary of enum values to lists of possible
1806 entries for each.
1807 :param name: name of the argument. Used in an :class:`.ArgumentError`
1808 that is raised if the parameter doesn't match any available argument.
1809
1810 """
1811 for enum_value, choice in choices.items():
1812 if arg is enum_value:
1813 return enum_value
1814 elif resolve_symbol_names and arg == enum_value.name:
1815 return enum_value
1816 elif arg in choice:
1817 return enum_value
1818
1819 if arg is None:
1820 return None
1821
1822 raise exc.ArgumentError(f"Invalid value for '{name}': {arg!r}")
1823
1824
1825_creation_order = 1
1826
1827
1828def set_creation_order(instance: Any) -> None:
1829 """Assign a '_creation_order' sequence to the given instance.
1830
1831 This allows multiple instances to be sorted in order of creation
1832 (typically within a single thread; the counter is not particularly
1833 threadsafe).
1834
1835 """
1836 global _creation_order
1837 instance._creation_order = _creation_order
1838 _creation_order += 1
1839
1840
1841def warn_exception(func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any:
1842 """executes the given function, catches all exceptions and converts to
1843 a warning.
1844
1845 """
1846 try:
1847 return func(*args, **kwargs)
1848 except Exception:
1849 warn("%s('%s') ignored" % sys.exc_info()[0:2])
1850
1851
1852def ellipses_string(value, len_=25):
1853 try:
1854 if len(value) > len_:
1855 return "%s..." % value[0:len_]
1856 else:
1857 return value
1858 except TypeError:
1859 return value
1860
1861
1862class _hash_limit_string(str):
1863 """A string subclass that can only be hashed on a maximum amount
1864 of unique values.
1865
1866 This is used for warnings so that we can send out parameterized warnings
1867 without the __warningregistry__ of the module, or the non-overridable
1868 "once" registry within warnings.py, overloading memory,
1869
1870
1871 """
1872
1873 _hash: int
1874
1875 def __new__(
1876 cls, value: str, num: int, args: Sequence[Any]
1877 ) -> _hash_limit_string:
1878 interpolated = (value % args) + (
1879 " (this warning may be suppressed after %d occurrences)" % num
1880 )
1881 self = super().__new__(cls, interpolated)
1882 self._hash = hash("%s_%d" % (value, hash(interpolated) % num))
1883 return self
1884
1885 def __hash__(self) -> int:
1886 return self._hash
1887
1888 def __eq__(self, other: Any) -> bool:
1889 return hash(self) == hash(other)
1890
1891
1892def warn(msg: str, code: Optional[str] = None) -> None:
1893 """Issue a warning.
1894
1895 If msg is a string, :class:`.exc.SAWarning` is used as
1896 the category.
1897
1898 """
1899 if code:
1900 _warnings_warn(exc.SAWarning(msg, code=code))
1901 else:
1902 _warnings_warn(msg, exc.SAWarning)
1903
1904
1905def warn_limited(msg: str, args: Sequence[Any]) -> None:
1906 """Issue a warning with a parameterized string, limiting the number
1907 of registrations.
1908
1909 """
1910 if args:
1911 msg = _hash_limit_string(msg, 10, args)
1912 _warnings_warn(msg, exc.SAWarning)
1913
1914
1915_warning_tags: Dict[CodeType, Tuple[str, Type[Warning]]] = {}
1916
1917
1918def tag_method_for_warnings(
1919 message: str, category: Type[Warning]
1920) -> Callable[[_F], _F]:
1921 def go(fn):
1922 _warning_tags[fn.__code__] = (message, category)
1923 return fn
1924
1925 return go
1926
1927
1928_not_sa_pattern = re.compile(r"^(?:sqlalchemy\.(?!testing)|alembic\.)")
1929
1930
1931def _warnings_warn(
1932 message: Union[str, Warning],
1933 category: Optional[Type[Warning]] = None,
1934 stacklevel: int = 2,
1935) -> None:
1936 # adjust the given stacklevel to be outside of SQLAlchemy
1937 try:
1938 frame = sys._getframe(stacklevel)
1939 except ValueError:
1940 # being called from less than 3 (or given) stacklevels, weird,
1941 # but don't crash
1942 stacklevel = 0
1943 except:
1944 # _getframe() doesn't work, weird interpreter issue, weird,
1945 # ok, but don't crash
1946 stacklevel = 0
1947 else:
1948 stacklevel_found = warning_tag_found = False
1949 while frame is not None:
1950 # using __name__ here requires that we have __name__ in the
1951 # __globals__ of the decorated string functions we make also.
1952 # we generate this using {"__name__": fn.__module__}
1953 if not stacklevel_found and not re.match(
1954 _not_sa_pattern, frame.f_globals.get("__name__", "")
1955 ):
1956 # stop incrementing stack level if an out-of-SQLA line
1957 # were found.
1958 stacklevel_found = True
1959
1960 # however, for the warning tag thing, we have to keep
1961 # scanning up the whole traceback
1962
1963 if frame.f_code in _warning_tags:
1964 warning_tag_found = True
1965 (_suffix, _category) = _warning_tags[frame.f_code]
1966 category = category or _category
1967 message = f"{message} ({_suffix})"
1968
1969 frame = frame.f_back # type: ignore[assignment]
1970
1971 if not stacklevel_found:
1972 stacklevel += 1
1973 elif stacklevel_found and warning_tag_found:
1974 break
1975
1976 if category is not None:
1977 warnings.warn(message, category, stacklevel=stacklevel + 1)
1978 else:
1979 warnings.warn(message, stacklevel=stacklevel + 1)
1980
1981
1982def only_once(
1983 fn: Callable[..., _T], retry_on_exception: bool
1984) -> Callable[..., Optional[_T]]:
1985 """Decorate the given function to be a no-op after it is called exactly
1986 once."""
1987
1988 once = [fn]
1989
1990 def go(*arg: Any, **kw: Any) -> Optional[_T]:
1991 # strong reference fn so that it isn't garbage collected,
1992 # which interferes with the event system's expectations
1993 strong_fn = fn # noqa
1994 if once:
1995 once_fn = once.pop()
1996 try:
1997 return once_fn(*arg, **kw)
1998 except:
1999 if retry_on_exception:
2000 once.insert(0, once_fn)
2001 raise
2002
2003 return None
2004
2005 return go
2006
2007
2008_SQLA_RE = re.compile(r"sqlalchemy/([a-z_]+/){0,2}[a-z_]+\.py")
2009_UNITTEST_RE = re.compile(r"unit(?:2|test2?/)")
2010
2011
2012def chop_traceback(
2013 tb: List[str],
2014 exclude_prefix: re.Pattern[str] = _UNITTEST_RE,
2015 exclude_suffix: re.Pattern[str] = _SQLA_RE,
2016) -> List[str]:
2017 """Chop extraneous lines off beginning and end of a traceback.
2018
2019 :param tb:
2020 a list of traceback lines as returned by ``traceback.format_stack()``
2021
2022 :param exclude_prefix:
2023 a regular expression object matching lines to skip at beginning of
2024 ``tb``
2025
2026 :param exclude_suffix:
2027 a regular expression object matching lines to skip at end of ``tb``
2028 """
2029 start = 0
2030 end = len(tb) - 1
2031 while start <= end and exclude_prefix.search(tb[start]):
2032 start += 1
2033 while start <= end and exclude_suffix.search(tb[end]):
2034 end -= 1
2035 return tb[start : end + 1]
2036
2037
2038NoneType = type(None)
2039
2040
2041def attrsetter(attrname):
2042 code = "def set(obj, value): obj.%s = value" % attrname
2043 env = locals().copy()
2044 exec(code, env)
2045 return env["set"]
2046
2047
2048_dunders = re.compile("^__.+__$")
2049
2050
2051class TypingOnly:
2052 """A mixin class that marks a class as 'typing only', meaning it has
2053 absolutely no methods, attributes, or runtime functionality whatsoever.
2054
2055 """
2056
2057 __slots__ = ()
2058
2059 def __init_subclass__(cls) -> None:
2060 if TypingOnly in cls.__bases__:
2061 remaining = {
2062 name for name in cls.__dict__ if not _dunders.match(name)
2063 }
2064 if remaining:
2065 raise AssertionError(
2066 f"Class {cls} directly inherits TypingOnly but has "
2067 f"additional attributes {remaining}."
2068 )
2069 super().__init_subclass__()
2070
2071
2072class EnsureKWArg:
2073 r"""Apply translation of functions to accept \**kw arguments if they
2074 don't already.
2075
2076 Used to ensure cross-compatibility with third party legacy code, for things
2077 like compiler visit methods that need to accept ``**kw`` arguments,
2078 but may have been copied from old code that didn't accept them.
2079
2080 """
2081
2082 ensure_kwarg: str
2083 """a regular expression that indicates method names for which the method
2084 should accept ``**kw`` arguments.
2085
2086 The class will scan for methods matching the name template and decorate
2087 them if necessary to ensure ``**kw`` parameters are accepted.
2088
2089 """
2090
2091 def __init_subclass__(cls) -> None:
2092 fn_reg = cls.ensure_kwarg
2093 clsdict = cls.__dict__
2094 if fn_reg:
2095 for key in clsdict:
2096 m = re.match(fn_reg, key)
2097 if m:
2098 fn = clsdict[key]
2099 spec = compat.inspect_getfullargspec(fn)
2100 if not spec.varkw:
2101 wrapped = cls._wrap_w_kw(fn)
2102 setattr(cls, key, wrapped)
2103 super().__init_subclass__()
2104
2105 @classmethod
2106 def _wrap_w_kw(cls, fn: Callable[..., Any]) -> Callable[..., Any]:
2107 def wrap(*arg: Any, **kw: Any) -> Any:
2108 return fn(*arg)
2109
2110 return update_wrapper(wrap, fn)
2111
2112
2113def wrap_callable(wrapper, fn):
2114 """Augment functools.update_wrapper() to work with objects with
2115 a ``__call__()`` method.
2116
2117 :param fn:
2118 object with __call__ method
2119
2120 """
2121 if hasattr(fn, "__name__"):
2122 return update_wrapper(wrapper, fn)
2123 else:
2124 _f = wrapper
2125 _f.__name__ = fn.__class__.__name__
2126 if hasattr(fn, "__module__"):
2127 _f.__module__ = fn.__module__
2128
2129 if hasattr(fn.__call__, "__doc__") and fn.__call__.__doc__:
2130 _f.__doc__ = fn.__call__.__doc__
2131 elif fn.__doc__:
2132 _f.__doc__ = fn.__doc__
2133
2134 return _f
2135
2136
2137def quoted_token_parser(value):
2138 """Parse a dotted identifier with accommodation for quoted names.
2139
2140 Includes support for SQL-style double quotes as a literal character.
2141
2142 E.g.::
2143
2144 >>> quoted_token_parser("name")
2145 ["name"]
2146 >>> quoted_token_parser("schema.name")
2147 ["schema", "name"]
2148 >>> quoted_token_parser('"Schema"."Name"')
2149 ['Schema', 'Name']
2150 >>> quoted_token_parser('"Schema"."Name""Foo"')
2151 ['Schema', 'Name""Foo']
2152
2153 """
2154
2155 if '"' not in value:
2156 return value.split(".")
2157
2158 # 0 = outside of quotes
2159 # 1 = inside of quotes
2160 state = 0
2161 result: List[List[str]] = [[]]
2162 idx = 0
2163 lv = len(value)
2164 while idx < lv:
2165 char = value[idx]
2166 if char == '"':
2167 if state == 1 and idx < lv - 1 and value[idx + 1] == '"':
2168 result[-1].append('"')
2169 idx += 1
2170 else:
2171 state ^= 1
2172 elif char == "." and state == 0:
2173 result.append([])
2174 else:
2175 result[-1].append(char)
2176 idx += 1
2177
2178 return ["".join(token) for token in result]
2179
2180
2181def add_parameter_text(params: Any, text: str) -> Callable[[_F], _F]:
2182 params = _collections.to_list(params)
2183
2184 def decorate(fn):
2185 doc = fn.__doc__ is not None and fn.__doc__ or ""
2186 if doc:
2187 doc = inject_param_text(doc, {param: text for param in params})
2188 fn.__doc__ = doc
2189 return fn
2190
2191 return decorate
2192
2193
2194def _dedent_docstring(text: str) -> str:
2195 split_text = text.split("\n", 1)
2196 if len(split_text) == 1:
2197 return text
2198 else:
2199 firstline, remaining = split_text
2200 if not firstline.startswith(" "):
2201 return firstline + "\n" + textwrap.dedent(remaining)
2202 else:
2203 return textwrap.dedent(text)
2204
2205
2206def inject_docstring_text(
2207 given_doctext: Optional[str], injecttext: str, pos: int
2208) -> str:
2209 doctext: str = _dedent_docstring(given_doctext or "")
2210 lines = doctext.split("\n")
2211 if len(lines) == 1:
2212 lines.append("")
2213 injectlines = textwrap.dedent(injecttext).split("\n")
2214 if injectlines[0]:
2215 injectlines.insert(0, "")
2216
2217 blanks = [num for num, line in enumerate(lines) if not line.strip()]
2218 blanks.insert(0, 0)
2219
2220 inject_pos = blanks[min(pos, len(blanks) - 1)]
2221
2222 lines = lines[0:inject_pos] + injectlines + lines[inject_pos:]
2223 return "\n".join(lines)
2224
2225
2226_param_reg = re.compile(r"(\s+):param (.+?):")
2227
2228
2229def inject_param_text(doctext: str, inject_params: Dict[str, str]) -> str:
2230 doclines = collections.deque(doctext.splitlines())
2231 lines = []
2232
2233 # TODO: this is not working for params like ":param case_sensitive=True:"
2234
2235 to_inject = None
2236 while doclines:
2237 line = doclines.popleft()
2238
2239 m = _param_reg.match(line)
2240
2241 if to_inject is None:
2242 if m:
2243 param = m.group(2).lstrip("*")
2244 if param in inject_params:
2245 # default indent to that of :param: plus one
2246 indent = " " * len(m.group(1)) + " "
2247
2248 # but if the next line has text, use that line's
2249 # indentation
2250 if doclines:
2251 m2 = re.match(r"(\s+)\S", doclines[0])
2252 if m2:
2253 indent = " " * len(m2.group(1))
2254
2255 to_inject = indent + inject_params[param]
2256 elif m:
2257 lines.extend(["\n", to_inject, "\n"])
2258 to_inject = None
2259 elif not line.rstrip():
2260 lines.extend([line, to_inject, "\n"])
2261 to_inject = None
2262 elif line.endswith("::"):
2263 # TODO: this still won't cover if the code example itself has
2264 # blank lines in it, need to detect those via indentation.
2265 lines.extend([line, doclines.popleft()])
2266 continue
2267 lines.append(line)
2268
2269 return "\n".join(lines)
2270
2271
2272def repr_tuple_names(names: List[str]) -> Optional[str]:
2273 """Trims a list of strings from the middle and return a string of up to
2274 four elements. Strings greater than 11 characters will be truncated"""
2275 if len(names) == 0:
2276 return None
2277 flag = len(names) <= 4
2278 names = names[0:4] if flag else names[0:3] + names[-1:]
2279 res = ["%s.." % name[:11] if len(name) > 11 else name for name in names]
2280 if flag:
2281 return ", ".join(res)
2282 else:
2283 return "%s, ..., %s" % (", ".join(res[0:3]), res[-1])
2284
2285
2286def has_compiled_ext(raise_=False):
2287 if HAS_CYEXTENSION:
2288 return True
2289 elif raise_:
2290 raise ImportError(
2291 "cython extensions were expected to be installed, "
2292 "but are not present"
2293 )
2294 else:
2295 return False
2296
2297
2298class _Missing(enum.Enum):
2299 Missing = enum.auto()
2300
2301
2302Missing = _Missing.Missing
2303MissingOr = Union[_T, Literal[_Missing.Missing]]