1# util/langhelpers.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Routines to help with the creation, loading and introspection of
10modules, classes, hierarchies, attributes, functions, and methods.
11
12"""
13
14from __future__ import annotations
15
16import collections
17import enum
18from functools import update_wrapper
19import importlib.util
20import inspect
21import itertools
22import operator
23import re
24import sys
25import textwrap
26import threading
27import types
28from types import CodeType
29from types import ModuleType
30from typing import Any
31from typing import Callable
32from typing import cast
33from typing import Dict
34from typing import FrozenSet
35from typing import Generic
36from typing import Iterator
37from typing import List
38from typing import Literal
39from typing import NoReturn
40from typing import Optional
41from typing import overload
42from typing import Sequence
43from typing import Set
44from typing import Tuple
45from typing import Type
46from typing import TYPE_CHECKING
47from typing import TypeVar
48from typing import Union
49import warnings
50
51from . import _collections
52from . import compat
53from .. import exc
54
55_T = TypeVar("_T")
56_T_co = TypeVar("_T_co", covariant=True)
57_F = TypeVar("_F", bound=Callable[..., Any])
58_MA = TypeVar("_MA", bound="HasMemoized.memoized_attribute[Any]")
59_M = TypeVar("_M", bound=ModuleType)
60
61
62def restore_annotations(
63 cls: type, new_annotations: dict[str, Any]
64) -> Callable[[], None]:
65 """apply alternate annotations to a class, with a callable to restore
66 the pristine state of the former.
67 This is used strictly to provide dataclasses on a mapped class, where
68 in some cases where are making dataclass fields based on an attribute
69 that is actually a python descriptor on a superclass which we called
70 to get a value.
71 if dataclasses were to give us a way to achieve this without swapping
72 __annotations__, that would be much better.
73 """
74 delattr_ = object()
75
76 # pep-649 means classes have "__annotate__", and it's a callable. if it's
77 # there and is None, we're in "legacy future mode", where it's python 3.14
78 # or higher and "from __future__ import annotations" is set. in "legacy
79 # future mode" we have to do the same steps we do for older pythons,
80 # __annotate__ can be ignored
81 is_pep649 = hasattr(cls, "__annotate__") and cls.__annotate__ is not None
82
83 if is_pep649:
84 memoized = {
85 "__annotate__": getattr(cls, "__annotate__", delattr_),
86 }
87 else:
88 memoized = {
89 "__annotations__": getattr(cls, "__annotations__", delattr_)
90 }
91
92 cls.__annotations__ = new_annotations
93
94 def restore():
95 for k, v in memoized.items():
96 if v is delattr_:
97 delattr(cls, k)
98 else:
99 setattr(cls, k, v)
100
101 return restore
102
103
104def md5_hex(x: Any) -> str:
105 x = x.encode("utf-8")
106 m = compat.md5_not_for_security()
107 m.update(x)
108 return cast(str, m.hexdigest())
109
110
111class safe_reraise:
112 """Reraise an exception after invoking some
113 handler code.
114
115 Stores the existing exception info before
116 invoking so that it is maintained across a potential
117 coroutine context switch.
118
119 e.g.::
120
121 try:
122 sess.commit()
123 except:
124 with safe_reraise():
125 sess.rollback()
126
127 TODO: we should at some point evaluate current behaviors in this regard
128 based on current greenlet, gevent/eventlet implementations in Python 3, and
129 also see the degree to which our own asyncio (based on greenlet also) is
130 impacted by this. .rollback() will cause IO / context switch to occur in
131 all these scenarios; what happens to the exception context from an
132 "except:" block if we don't explicitly store it? Original issue was #2703.
133
134 """
135
136 __slots__ = ("_exc_info",)
137
138 _exc_info: Union[
139 None,
140 Tuple[
141 Type[BaseException],
142 BaseException,
143 types.TracebackType,
144 ],
145 Tuple[None, None, None],
146 ]
147
148 def __enter__(self) -> None:
149 self._exc_info = sys.exc_info()
150
151 def __exit__(
152 self,
153 type_: Optional[Type[BaseException]],
154 value: Optional[BaseException],
155 traceback: Optional[types.TracebackType],
156 ) -> NoReturn:
157 assert self._exc_info is not None
158 # see #2703 for notes
159 if type_ is None:
160 exc_type, exc_value, exc_tb = self._exc_info
161 assert exc_value is not None
162 self._exc_info = None # remove potential circular references
163 raise exc_value.with_traceback(exc_tb)
164 else:
165 self._exc_info = None # remove potential circular references
166 assert value is not None
167 raise value.with_traceback(traceback)
168
169
170def walk_subclasses(cls: Type[_T]) -> Iterator[Type[_T]]:
171 seen: Set[Any] = set()
172
173 stack = [cls]
174 while stack:
175 cls = stack.pop()
176 if cls in seen:
177 continue
178 else:
179 seen.add(cls)
180 stack.extend(cls.__subclasses__())
181 yield cls
182
183
184def string_or_unprintable(element: Any) -> str:
185 if isinstance(element, str):
186 return element
187 else:
188 try:
189 return str(element)
190 except Exception:
191 return "unprintable element %r" % element
192
193
194def clsname_as_plain_name(
195 cls: Type[Any], use_name: Optional[str] = None
196) -> str:
197 name = use_name or cls.__name__
198 return " ".join(n.lower() for n in re.findall(r"([A-Z][a-z]+|SQL)", name))
199
200
201def method_is_overridden(
202 instance_or_cls: Union[Type[Any], object],
203 against_method: Callable[..., Any],
204) -> bool:
205 """Return True if the two class methods don't match."""
206
207 if not isinstance(instance_or_cls, type):
208 current_cls = instance_or_cls.__class__
209 else:
210 current_cls = instance_or_cls
211
212 method_name = against_method.__name__
213
214 current_method: types.MethodType = getattr(current_cls, method_name)
215
216 return current_method != against_method
217
218
219def decode_slice(slc: slice) -> Tuple[Any, ...]:
220 """decode a slice object as sent to __getitem__.
221
222 takes into account the 2.5 __index__() method, basically.
223
224 """
225 ret: List[Any] = []
226 for x in slc.start, slc.stop, slc.step:
227 if hasattr(x, "__index__"):
228 x = x.__index__()
229 ret.append(x)
230 return tuple(ret)
231
232
233def _unique_symbols(used: Sequence[str], *bases: str) -> Iterator[str]:
234 used_set = set(used)
235 for base in bases:
236 pool = itertools.chain(
237 (base,),
238 map(lambda i: base + str(i), range(1000)),
239 )
240 for sym in pool:
241 if sym not in used_set:
242 used_set.add(sym)
243 yield sym
244 break
245 else:
246 raise NameError("exhausted namespace for symbol base %s" % base)
247
248
249def map_bits(fn: Callable[[int], Any], n: int) -> Iterator[Any]:
250 """Call the given function given each nonzero bit from n."""
251
252 while n:
253 b = n & (~n + 1)
254 yield fn(b)
255 n ^= b
256
257
258_Fn = TypeVar("_Fn", bound="Callable[..., Any]")
259
260# this seems to be in flux in recent mypy versions
261
262
263def decorator(target: Callable[..., Any]) -> Callable[[_Fn], _Fn]:
264 """A signature-matching decorator factory."""
265
266 def decorate(fn: _Fn) -> _Fn:
267 if not inspect.isfunction(fn) and not inspect.ismethod(fn):
268 raise Exception("not a decoratable function")
269
270 # Python 3.14 defer creating __annotations__ until its used.
271 # We do not want to create __annotations__ now.
272 annofunc = getattr(fn, "__annotate__", None)
273 if annofunc is not None:
274 fn.__annotate__ = None # type: ignore[union-attr]
275 try:
276 spec = compat.inspect_getfullargspec(fn)
277 finally:
278 fn.__annotate__ = annofunc # type: ignore[union-attr]
279 else:
280 spec = compat.inspect_getfullargspec(fn)
281
282 # Do not generate code for annotations.
283 # update_wrapper() copies the annotation from fn to decorated.
284 # We use dummy defaults for code generation to avoid having
285 # copy of large globals for compiling.
286 # We copy __defaults__ and __kwdefaults__ from fn to decorated.
287 empty_defaults = (None,) * len(spec.defaults or ())
288 empty_kwdefaults = dict.fromkeys(spec.kwonlydefaults or ())
289 spec = spec._replace(
290 annotations={},
291 defaults=empty_defaults,
292 kwonlydefaults=empty_kwdefaults,
293 )
294
295 names = (
296 tuple(cast("Tuple[str, ...]", spec[0]))
297 + cast("Tuple[str, ...]", spec[1:3])
298 + (fn.__name__,)
299 )
300 targ_name, fn_name = _unique_symbols(names, "target", "fn")
301
302 metadata: Dict[str, Optional[str]] = dict(target=targ_name, fn=fn_name)
303 metadata.update(format_argspec_plus(spec, grouped=False))
304 metadata["name"] = fn.__name__
305
306 if inspect.iscoroutinefunction(fn):
307 metadata["prefix"] = "async "
308 metadata["target_prefix"] = "await "
309 else:
310 metadata["prefix"] = ""
311 metadata["target_prefix"] = ""
312
313 # look for __ positional arguments. This is a convention in
314 # SQLAlchemy that arguments should be passed positionally
315 # rather than as keyword
316 # arguments. note that apply_pos doesn't currently work in all cases
317 # such as when a kw-only indicator "*" is present, which is why
318 # we limit the use of this to just that case we can detect. As we add
319 # more kinds of methods that use @decorator, things may have to
320 # be further improved in this area
321 if "__" in repr(spec[0]):
322 code = """\
323%(prefix)sdef %(name)s%(grouped_args)s:
324 return %(target_prefix)s%(target)s(%(fn)s, %(apply_pos)s)
325""" % metadata
326 else:
327 code = """\
328%(prefix)sdef %(name)s%(grouped_args)s:
329 return %(target_prefix)s%(target)s(%(fn)s, %(apply_kw)s)
330""" % metadata
331
332 env: Dict[str, Any] = {
333 targ_name: target,
334 fn_name: fn,
335 "__name__": fn.__module__,
336 }
337
338 decorated = cast(
339 types.FunctionType,
340 _exec_code_in_env(code, env, fn.__name__),
341 )
342 decorated.__defaults__ = fn.__defaults__
343 decorated.__kwdefaults__ = fn.__kwdefaults__ # type: ignore
344 return update_wrapper(decorated, fn) # type: ignore[return-value]
345
346 return update_wrapper(decorate, target) # type: ignore[return-value]
347
348
349def _exec_code_in_env(
350 code: Union[str, types.CodeType], env: Dict[str, Any], fn_name: str
351) -> Callable[..., Any]:
352 exec(code, env)
353 return env[fn_name] # type: ignore[no-any-return]
354
355
356_PF = TypeVar("_PF")
357_TE = TypeVar("_TE")
358
359
360class PluginLoader:
361 def __init__(
362 self, group: str, auto_fn: Optional[Callable[..., Any]] = None
363 ):
364 self.group = group
365 self.impls: Dict[str, Any] = {}
366 self.auto_fn = auto_fn
367
368 def clear(self):
369 self.impls.clear()
370
371 def load(self, name: str) -> Any:
372 if name in self.impls:
373 return self.impls[name]()
374
375 if self.auto_fn:
376 loader = self.auto_fn(name)
377 if loader:
378 self.impls[name] = loader
379 return loader()
380
381 for impl in compat.importlib_metadata_get(self.group):
382 if impl.name == name:
383 self.impls[name] = impl.load
384 return impl.load()
385
386 raise exc.NoSuchModuleError(
387 "Can't load plugin: %s:%s" % (self.group, name)
388 )
389
390 def register(self, name: str, modulepath: str, objname: str) -> None:
391 def load():
392 mod = __import__(modulepath)
393 for token in modulepath.split(".")[1:]:
394 mod = getattr(mod, token)
395 return getattr(mod, objname)
396
397 self.impls[name] = load
398
399 def deregister(self, name: str) -> None:
400 del self.impls[name]
401
402
403def _inspect_func_args(fn):
404 try:
405 co_varkeywords = inspect.CO_VARKEYWORDS
406 except AttributeError:
407 # https://docs.python.org/3/library/inspect.html
408 # The flags are specific to CPython, and may not be defined in other
409 # Python implementations. Furthermore, the flags are an implementation
410 # detail, and can be removed or deprecated in future Python releases.
411 spec = compat.inspect_getfullargspec(fn)
412 return spec[0], bool(spec[2])
413 else:
414 # use fn.__code__ plus flags to reduce method call overhead
415 co = fn.__code__
416 nargs = co.co_argcount
417 return (
418 list(co.co_varnames[:nargs]),
419 bool(co.co_flags & co_varkeywords),
420 )
421
422
423@overload
424def get_cls_kwargs(
425 cls: type,
426 *,
427 _set: Optional[Set[str]] = None,
428 raiseerr: Literal[True] = ...,
429) -> Set[str]: ...
430
431
432@overload
433def get_cls_kwargs(
434 cls: type, *, _set: Optional[Set[str]] = None, raiseerr: bool = False
435) -> Optional[Set[str]]: ...
436
437
438def get_cls_kwargs(
439 cls: type, *, _set: Optional[Set[str]] = None, raiseerr: bool = False
440) -> Optional[Set[str]]:
441 r"""Return the full set of inherited kwargs for the given `cls`.
442
443 Probes a class's __init__ method, collecting all named arguments. If the
444 __init__ defines a \**kwargs catch-all, then the constructor is presumed
445 to pass along unrecognized keywords to its base classes, and the
446 collection process is repeated recursively on each of the bases.
447
448 Uses a subset of inspect.getfullargspec() to cut down on method overhead,
449 as this is used within the Core typing system to create copies of type
450 objects which is a performance-sensitive operation.
451
452 No anonymous tuple arguments please !
453
454 """
455 toplevel = _set is None
456 if toplevel:
457 _set = set()
458 assert _set is not None
459
460 ctr = cls.__dict__.get("__init__", False)
461
462 has_init = (
463 ctr
464 and isinstance(ctr, types.FunctionType)
465 and isinstance(ctr.__code__, types.CodeType)
466 )
467
468 if has_init:
469 names, has_kw = _inspect_func_args(ctr)
470 _set.update(names)
471
472 if not has_kw and not toplevel:
473 if raiseerr:
474 raise TypeError(
475 f"given cls {cls} doesn't have an __init__ method"
476 )
477 else:
478 return None
479 else:
480 has_kw = False
481
482 if not has_init or has_kw:
483 for c in cls.__bases__:
484 if get_cls_kwargs(c, _set=_set) is None:
485 break
486
487 _set.discard("self")
488 return _set
489
490
491def get_func_kwargs(func: Callable[..., Any]) -> List[str]:
492 """Return the set of legal kwargs for the given `func`.
493
494 Uses getargspec so is safe to call for methods, functions,
495 etc.
496
497 """
498
499 return compat.inspect_getfullargspec(func)[0]
500
501
502def get_callable_argspec(
503 fn: Callable[..., Any], no_self: bool = False, _is_init: bool = False
504) -> compat.FullArgSpec:
505 """Return the argument signature for any callable.
506
507 All pure-Python callables are accepted, including
508 functions, methods, classes, objects with __call__;
509 builtins and other edge cases like functools.partial() objects
510 raise a TypeError.
511
512 """
513 if inspect.isbuiltin(fn):
514 raise TypeError("Can't inspect builtin: %s" % fn)
515 elif inspect.isfunction(fn) or (
516 hasattr(fn, "__code__")
517 and not inspect.isclass(fn)
518 and not inspect.ismethod(fn)
519 ):
520 if _is_init and no_self:
521 spec = compat.inspect_getfullargspec(fn)
522 return compat.FullArgSpec(
523 spec.args[1:],
524 spec.varargs,
525 spec.varkw,
526 spec.defaults,
527 spec.kwonlyargs,
528 spec.kwonlydefaults,
529 spec.annotations,
530 )
531 else:
532 return compat.inspect_getfullargspec(fn)
533 elif inspect.ismethod(fn):
534 if no_self and (_is_init or fn.__self__):
535 spec = compat.inspect_getfullargspec(fn.__func__)
536 return compat.FullArgSpec(
537 spec.args[1:],
538 spec.varargs,
539 spec.varkw,
540 spec.defaults,
541 spec.kwonlyargs,
542 spec.kwonlydefaults,
543 spec.annotations,
544 )
545 else:
546 return compat.inspect_getfullargspec(fn.__func__)
547 elif inspect.isclass(fn):
548 return get_callable_argspec(
549 fn.__init__, no_self=no_self, _is_init=True
550 )
551 elif hasattr(fn, "__func__"):
552 return compat.inspect_getfullargspec(fn.__func__)
553 elif hasattr(fn, "__call__"):
554 if inspect.ismethod(fn.__call__):
555 return get_callable_argspec(fn.__call__, no_self=no_self)
556 else:
557 raise TypeError("Can't inspect callable: %s" % fn)
558 else:
559 raise TypeError("Can't inspect callable: %s" % fn)
560
561
562def format_argspec_plus(
563 fn: Union[Callable[..., Any], compat.FullArgSpec], grouped: bool = True
564) -> Dict[str, Optional[str]]:
565 """Returns a dictionary of formatted, introspected function arguments.
566
567 A enhanced variant of inspect.formatargspec to support code generation.
568
569 fn
570 An inspectable callable or tuple of inspect getargspec() results.
571 grouped
572 Defaults to True; include (parens, around, argument) lists
573
574 Returns:
575
576 args
577 Full inspect.formatargspec for fn
578 self_arg
579 The name of the first positional argument, varargs[0], or None
580 if the function defines no positional arguments.
581 apply_pos
582 args, re-written in calling rather than receiving syntax. Arguments are
583 passed positionally.
584 apply_kw
585 Like apply_pos, except keyword-ish args are passed as keywords.
586 apply_pos_proxied
587 Like apply_pos but omits the self/cls argument
588
589 Example::
590
591 >>> format_argspec_plus(lambda self, a, b, c=3, **d: 123)
592 {'grouped_args': '(self, a, b, c=3, **d)',
593 'self_arg': 'self',
594 'apply_kw': '(self, a, b, c=c, **d)',
595 'apply_pos': '(self, a, b, c, **d)'}
596
597 """
598 if callable(fn):
599 spec = compat.inspect_getfullargspec(fn)
600 else:
601 spec = fn
602
603 args = compat.inspect_formatargspec(*spec)
604
605 apply_pos = compat.inspect_formatargspec(
606 spec[0], spec[1], spec[2], None, spec[4]
607 )
608
609 if spec[0]:
610 self_arg = spec[0][0]
611
612 apply_pos_proxied = compat.inspect_formatargspec(
613 spec[0][1:], spec[1], spec[2], None, spec[4]
614 )
615
616 elif spec[1]:
617 # I'm not sure what this is
618 self_arg = "%s[0]" % spec[1]
619
620 apply_pos_proxied = apply_pos
621 else:
622 self_arg = None
623 apply_pos_proxied = apply_pos
624
625 num_defaults = 0
626 if spec[3]:
627 num_defaults += len(cast(Tuple[Any], spec[3]))
628 if spec[4]:
629 num_defaults += len(spec[4])
630
631 name_args = spec[0] + spec[4]
632
633 defaulted_vals: Union[List[str], Tuple[()]]
634
635 if num_defaults:
636 defaulted_vals = name_args[0 - num_defaults :]
637 else:
638 defaulted_vals = ()
639
640 apply_kw = compat.inspect_formatargspec(
641 name_args,
642 spec[1],
643 spec[2],
644 defaulted_vals,
645 formatvalue=lambda x: "=" + str(x),
646 )
647
648 if spec[0]:
649 apply_kw_proxied = compat.inspect_formatargspec(
650 name_args[1:],
651 spec[1],
652 spec[2],
653 defaulted_vals,
654 formatvalue=lambda x: "=" + str(x),
655 )
656 else:
657 apply_kw_proxied = apply_kw
658
659 if grouped:
660 return dict(
661 grouped_args=args,
662 self_arg=self_arg,
663 apply_pos=apply_pos,
664 apply_kw=apply_kw,
665 apply_pos_proxied=apply_pos_proxied,
666 apply_kw_proxied=apply_kw_proxied,
667 )
668 else:
669 return dict(
670 grouped_args=args,
671 self_arg=self_arg,
672 apply_pos=apply_pos[1:-1],
673 apply_kw=apply_kw[1:-1],
674 apply_pos_proxied=apply_pos_proxied[1:-1],
675 apply_kw_proxied=apply_kw_proxied[1:-1],
676 )
677
678
679def format_argspec_init(method, grouped=True):
680 """format_argspec_plus with considerations for typical __init__ methods
681
682 Wraps format_argspec_plus with error handling strategies for typical
683 __init__ cases:
684
685 .. sourcecode:: text
686
687 object.__init__ -> (self)
688 other unreflectable (usually C) -> (self, *args, **kwargs)
689
690 """
691 if method is object.__init__:
692 grouped_args = "(self)"
693 args = "(self)" if grouped else "self"
694 proxied = "()" if grouped else ""
695 else:
696 try:
697 return format_argspec_plus(method, grouped=grouped)
698 except TypeError:
699 grouped_args = "(self, *args, **kwargs)"
700 args = grouped_args if grouped else "self, *args, **kwargs"
701 proxied = "(*args, **kwargs)" if grouped else "*args, **kwargs"
702 return dict(
703 self_arg="self",
704 grouped_args=grouped_args,
705 apply_pos=args,
706 apply_kw=args,
707 apply_pos_proxied=proxied,
708 apply_kw_proxied=proxied,
709 )
710
711
712def create_proxy_methods(
713 target_cls: Type[Any],
714 target_cls_sphinx_name: str,
715 proxy_cls_sphinx_name: str,
716 classmethods: Sequence[str] = (),
717 methods: Sequence[str] = (),
718 attributes: Sequence[str] = (),
719 use_intermediate_variable: Sequence[str] = (),
720) -> Callable[[_T], _T]:
721 """A class decorator indicating attributes should refer to a proxy
722 class.
723
724 This decorator is now a "marker" that does nothing at runtime. Instead,
725 it is consumed by the tools/generate_proxy_methods.py script to
726 statically generate proxy methods and attributes that are fully
727 recognized by typing tools such as mypy.
728
729 """
730
731 def decorate(cls):
732 return cls
733
734 return decorate
735
736
737def getargspec_init(method):
738 """inspect.getargspec with considerations for typical __init__ methods
739
740 Wraps inspect.getargspec with error handling for typical __init__ cases:
741
742 .. sourcecode:: text
743
744 object.__init__ -> (self)
745 other unreflectable (usually C) -> (self, *args, **kwargs)
746
747 """
748 try:
749 return compat.inspect_getfullargspec(method)
750 except TypeError:
751 if method is object.__init__:
752 return (["self"], None, None, None)
753 else:
754 return (["self"], "args", "kwargs", None)
755
756
757def unbound_method_to_callable(func_or_cls):
758 """Adjust the incoming callable such that a 'self' argument is not
759 required.
760
761 """
762
763 if isinstance(func_or_cls, types.MethodType) and not func_or_cls.__self__:
764 return func_or_cls.__func__
765 else:
766 return func_or_cls
767
768
769class GenericRepr:
770 """Encapsulates the logic for creating a generic __repr__() string.
771
772 This class allows for the repr structure to be created, then modified
773 (e.g., changing the class name), before being rendered as a string.
774
775 .. versionadded:: 2.1
776 """
777
778 __slots__ = (
779 "_obj",
780 "_additional_kw",
781 "_to_inspect",
782 "_omit_kwarg",
783 "_class_name",
784 )
785
786 _obj: Any
787 _additional_kw: Sequence[Tuple[str, Any]]
788 _to_inspect: List[object]
789 _omit_kwarg: Sequence[str]
790 _class_name: Optional[str]
791
792 def __init__(
793 self,
794 obj: Any,
795 additional_kw: Sequence[Tuple[str, Any]] = (),
796 to_inspect: Optional[Union[object, List[object]]] = None,
797 omit_kwarg: Sequence[str] = (),
798 ):
799 """Create a GenericRepr object.
800
801 :param obj: The object being repr'd
802 :param additional_kw: Additional keyword arguments to check for in
803 the repr, as a sequence of 2-tuples of (name, default_value)
804 :param to_inspect: One or more objects whose __init__ signature
805 should be inspected. If not provided, defaults to [obj].
806 :param omit_kwarg: Sequence of keyword argument names to omit from
807 the repr output
808 """
809 self._obj = obj
810 self._additional_kw = additional_kw
811 self._to_inspect = (
812 [obj] if to_inspect is None else _collections.to_list(to_inspect)
813 )
814 self._omit_kwarg = omit_kwarg
815 self._class_name = None
816
817 def set_class_name(self, class_name: str) -> GenericRepr:
818 """Set the class name to be used in the repr.
819
820 By default, the class name is taken from obj.__class__.__name__.
821 This method allows it to be overridden.
822
823 :param class_name: The class name to use
824 :return: self, for method chaining
825 """
826 self._class_name = class_name
827 return self
828
829 def __str__(self) -> str:
830 """Produce the __repr__() string based on the configured parameters."""
831 obj = self._obj
832 to_inspect = self._to_inspect
833 additional_kw = self._additional_kw
834 omit_kwarg = self._omit_kwarg
835
836 missing = object()
837
838 pos_args = []
839 kw_args: _collections.OrderedDict[str, Any] = (
840 _collections.OrderedDict()
841 )
842 vargs = None
843 for i, insp in enumerate(to_inspect):
844 try:
845 spec = compat.inspect_getfullargspec(insp.__init__) # type: ignore[misc] # noqa: E501
846 except TypeError:
847 continue
848 else:
849 default_len = len(spec.defaults) if spec.defaults else 0
850 if i == 0:
851 if spec.varargs:
852 vargs = spec.varargs
853 if default_len:
854 pos_args.extend(spec.args[1:-default_len])
855 else:
856 pos_args.extend(spec.args[1:])
857 else:
858 kw_args.update(
859 [(arg, missing) for arg in spec.args[1:-default_len]]
860 )
861
862 if default_len:
863 assert spec.defaults
864 kw_args.update(
865 [
866 (arg, default)
867 for arg, default in zip(
868 spec.args[-default_len:], spec.defaults
869 )
870 ]
871 )
872 output: List[str] = []
873
874 output.extend(repr(getattr(obj, arg, None)) for arg in pos_args)
875
876 if vargs is not None and hasattr(obj, vargs):
877 output.extend([repr(val) for val in getattr(obj, vargs)])
878
879 for arg, defval in kw_args.items():
880 if arg in omit_kwarg:
881 continue
882 try:
883 val = getattr(obj, arg, missing)
884 if val is not missing and val != defval:
885 output.append("%s=%r" % (arg, val))
886 except Exception:
887 pass
888
889 if additional_kw:
890 for arg, defval in additional_kw:
891 try:
892 val = getattr(obj, arg, missing)
893 if val is not missing and val != defval:
894 output.append("%s=%r" % (arg, val))
895 except Exception:
896 pass
897
898 class_name = (
899 self._class_name
900 if self._class_name is not None
901 else obj.__class__.__name__
902 )
903 return "%s(%s)" % (class_name, ", ".join(output))
904
905
906def generic_repr(
907 obj: Any,
908 additional_kw: Sequence[Tuple[str, Any]] = (),
909 to_inspect: Optional[Union[object, List[object]]] = None,
910 omit_kwarg: Sequence[str] = (),
911) -> str:
912 """Produce a __repr__() based on direct association of the __init__()
913 specification vs. same-named attributes present.
914
915 """
916 return str(
917 GenericRepr(
918 obj,
919 additional_kw=additional_kw,
920 to_inspect=to_inspect,
921 omit_kwarg=omit_kwarg,
922 )
923 )
924
925
926def class_hierarchy(cls):
927 """Return an unordered sequence of all classes related to cls.
928
929 Traverses diamond hierarchies.
930
931 Fibs slightly: subclasses of builtin types are not returned. Thus
932 class_hierarchy(class A(object)) returns (A, object), not A plus every
933 class systemwide that derives from object.
934
935 """
936
937 hier = {cls}
938 process = list(cls.__mro__)
939 while process:
940 c = process.pop()
941 bases = (_ for _ in c.__bases__ if _ not in hier)
942
943 for b in bases:
944 process.append(b)
945 hier.add(b)
946
947 if c.__module__ == "builtins" or not hasattr(c, "__subclasses__"):
948 continue
949
950 for s in [
951 _
952 for _ in (
953 c.__subclasses__()
954 if not issubclass(c, type)
955 else c.__subclasses__(c)
956 )
957 if _ not in hier
958 ]:
959 process.append(s)
960 hier.add(s)
961 return list(hier)
962
963
964def iterate_attributes(cls):
965 """iterate all the keys and attributes associated
966 with a class, without using getattr().
967
968 Does not use getattr() so that class-sensitive
969 descriptors (i.e. property.__get__()) are not called.
970
971 """
972 keys = dir(cls)
973 for key in keys:
974 for c in cls.__mro__:
975 if key in c.__dict__:
976 yield (key, c.__dict__[key])
977 break
978
979
980def monkeypatch_proxied_specials(
981 into_cls,
982 from_cls,
983 skip=None,
984 only=None,
985 name="self.proxy",
986 from_instance=None,
987):
988 """Automates delegation of __specials__ for a proxying type."""
989
990 if only:
991 dunders = only
992 else:
993 if skip is None:
994 skip = (
995 "__slots__",
996 "__del__",
997 "__getattribute__",
998 "__metaclass__",
999 "__getstate__",
1000 "__setstate__",
1001 )
1002 dunders = [
1003 m
1004 for m in dir(from_cls)
1005 if (
1006 m.startswith("__")
1007 and m.endswith("__")
1008 and not hasattr(into_cls, m)
1009 and m not in skip
1010 )
1011 ]
1012
1013 for method in dunders:
1014 try:
1015 maybe_fn = getattr(from_cls, method)
1016 if not hasattr(maybe_fn, "__call__"):
1017 continue
1018 maybe_fn = getattr(maybe_fn, "__func__", maybe_fn)
1019 fn = cast(types.FunctionType, maybe_fn)
1020
1021 except AttributeError:
1022 continue
1023 try:
1024 spec = compat.inspect_getfullargspec(fn)
1025 fn_args = compat.inspect_formatargspec(spec[0])
1026 d_args = compat.inspect_formatargspec(spec[0][1:])
1027 except TypeError:
1028 fn_args = "(self, *args, **kw)"
1029 d_args = "(*args, **kw)"
1030
1031 py = (
1032 "def %(method)s%(fn_args)s: "
1033 "return %(name)s.%(method)s%(d_args)s" % locals()
1034 )
1035
1036 env: Dict[str, types.FunctionType] = (
1037 from_instance is not None and {name: from_instance} or {}
1038 )
1039 exec(py, env)
1040 try:
1041 env[method].__defaults__ = fn.__defaults__
1042 except AttributeError:
1043 pass
1044 setattr(into_cls, method, env[method])
1045
1046
1047def methods_equivalent(meth1, meth2):
1048 """Return True if the two methods are the same implementation."""
1049
1050 return getattr(meth1, "__func__", meth1) is getattr(
1051 meth2, "__func__", meth2
1052 )
1053
1054
1055def as_interface(obj, cls=None, methods=None, required=None):
1056 """Ensure basic interface compliance for an instance or dict of callables.
1057
1058 Checks that ``obj`` implements public methods of ``cls`` or has members
1059 listed in ``methods``. If ``required`` is not supplied, implementing at
1060 least one interface method is sufficient. Methods present on ``obj`` that
1061 are not in the interface are ignored.
1062
1063 If ``obj`` is a dict and ``dict`` does not meet the interface
1064 requirements, the keys of the dictionary are inspected. Keys present in
1065 ``obj`` that are not in the interface will raise TypeErrors.
1066
1067 Raises TypeError if ``obj`` does not meet the interface criteria.
1068
1069 In all passing cases, an object with callable members is returned. In the
1070 simple case, ``obj`` is returned as-is; if dict processing kicks in then
1071 an anonymous class is returned.
1072
1073 obj
1074 A type, instance, or dictionary of callables.
1075 cls
1076 Optional, a type. All public methods of cls are considered the
1077 interface. An ``obj`` instance of cls will always pass, ignoring
1078 ``required``..
1079 methods
1080 Optional, a sequence of method names to consider as the interface.
1081 required
1082 Optional, a sequence of mandatory implementations. If omitted, an
1083 ``obj`` that provides at least one interface method is considered
1084 sufficient. As a convenience, required may be a type, in which case
1085 all public methods of the type are required.
1086
1087 """
1088 if not cls and not methods:
1089 raise TypeError("a class or collection of method names are required")
1090
1091 if isinstance(cls, type) and isinstance(obj, cls):
1092 return obj
1093
1094 interface = set(methods or [m for m in dir(cls) if not m.startswith("_")])
1095 implemented = set(dir(obj))
1096
1097 complies = operator.ge
1098 if isinstance(required, type):
1099 required = interface
1100 elif not required:
1101 required = set()
1102 complies = operator.gt
1103 else:
1104 required = set(required)
1105
1106 if complies(implemented.intersection(interface), required):
1107 return obj
1108
1109 # No dict duck typing here.
1110 if not isinstance(obj, dict):
1111 qualifier = complies is operator.gt and "any of" or "all of"
1112 raise TypeError(
1113 "%r does not implement %s: %s"
1114 % (obj, qualifier, ", ".join(interface))
1115 )
1116
1117 class AnonymousInterface:
1118 """A callable-holding shell."""
1119
1120 if cls:
1121 AnonymousInterface.__name__ = "Anonymous" + cls.__name__
1122 found = set()
1123
1124 for method, impl in dictlike_iteritems(obj):
1125 if method not in interface:
1126 raise TypeError("%r: unknown in this interface" % method)
1127 if not callable(impl):
1128 raise TypeError("%r=%r is not callable" % (method, impl))
1129 setattr(AnonymousInterface, method, staticmethod(impl))
1130 found.add(method)
1131
1132 if complies(found, required):
1133 return AnonymousInterface
1134
1135 raise TypeError(
1136 "dictionary does not contain required keys %s"
1137 % ", ".join(required - found)
1138 )
1139
1140
1141_GFD = TypeVar("_GFD", bound="generic_fn_descriptor[Any]")
1142
1143
1144class generic_fn_descriptor(Generic[_T_co]):
1145 """Descriptor which proxies a function when the attribute is not
1146 present in dict
1147
1148 This superclass is organized in a particular way with "memoized" and
1149 "non-memoized" implementation classes that are hidden from type checkers,
1150 as Mypy seems to not be able to handle seeing multiple kinds of descriptor
1151 classes used for the same attribute.
1152
1153 """
1154
1155 fget: Callable[..., _T_co]
1156 __doc__: Optional[str]
1157 __name__: str
1158
1159 def __init__(self, fget: Callable[..., _T_co], doc: Optional[str] = None):
1160 self.fget = fget
1161 self.__doc__ = doc or fget.__doc__
1162 self.__name__ = fget.__name__
1163
1164 @overload
1165 def __get__(self: _GFD, obj: None, cls: Any) -> _GFD: ...
1166
1167 @overload
1168 def __get__(self, obj: object, cls: Any) -> _T_co: ...
1169
1170 def __get__(self: _GFD, obj: Any, cls: Any) -> Union[_GFD, _T_co]:
1171 raise NotImplementedError()
1172
1173 if TYPE_CHECKING:
1174
1175 def __set__(self, instance: Any, value: Any) -> None: ...
1176
1177 def __delete__(self, instance: Any) -> None: ...
1178
1179 def _reset(self, obj: Any) -> None:
1180 raise NotImplementedError()
1181
1182 @classmethod
1183 def reset(cls, obj: Any, name: str) -> None:
1184 raise NotImplementedError()
1185
1186
1187class _non_memoized_property(generic_fn_descriptor[_T_co]):
1188 """a plain descriptor that proxies a function.
1189
1190 primary rationale is to provide a plain attribute that's
1191 compatible with memoized_property which is also recognized as equivalent
1192 by mypy.
1193
1194 """
1195
1196 if not TYPE_CHECKING:
1197
1198 def __get__(self, obj, cls):
1199 if obj is None:
1200 return self
1201 return self.fget(obj)
1202
1203
1204class _memoized_property(generic_fn_descriptor[_T_co]):
1205 """A read-only @property that is only evaluated once."""
1206
1207 if not TYPE_CHECKING:
1208
1209 def __get__(self, obj, cls):
1210 if obj is None:
1211 return self
1212 obj.__dict__[self.__name__] = result = self.fget(obj)
1213 return result
1214
1215 def _reset(self, obj):
1216 _memoized_property.reset(obj, self.__name__)
1217
1218 @classmethod
1219 def reset(cls, obj, name):
1220 obj.__dict__.pop(name, None)
1221
1222
1223# despite many attempts to get Mypy to recognize an overridden descriptor
1224# where one is memoized and the other isn't, there seems to be no reliable
1225# way other than completely deceiving the type checker into thinking there
1226# is just one single descriptor type everywhere. Otherwise, if a superclass
1227# has non-memoized and subclass has memoized, that requires
1228# "class memoized(non_memoized)". but then if a superclass has memoized and
1229# superclass has non-memoized, the class hierarchy of the descriptors
1230# would need to be reversed; "class non_memoized(memoized)". so there's no
1231# way to achieve this.
1232# additional issues, RO properties:
1233# https://github.com/python/mypy/issues/12440
1234if TYPE_CHECKING:
1235 # allow memoized and non-memoized to be freely mixed by having them
1236 # be the same class
1237 memoized_property = generic_fn_descriptor
1238 non_memoized_property = generic_fn_descriptor
1239
1240 # for read only situations, mypy only sees @property as read only.
1241 # read only is needed when a subtype specializes the return type
1242 # of a property, meaning assignment needs to be disallowed
1243 ro_memoized_property = property
1244 ro_non_memoized_property = property
1245
1246else:
1247 memoized_property = ro_memoized_property = _memoized_property
1248 non_memoized_property = ro_non_memoized_property = _non_memoized_property
1249
1250
1251def memoized_instancemethod(fn: _F) -> _F:
1252 """Decorate a method memoize its return value.
1253
1254 Best applied to no-arg methods: memoization is not sensitive to
1255 argument values, and will always return the same value even when
1256 called with different arguments.
1257
1258 """
1259
1260 def oneshot(self, *args, **kw):
1261 result = fn(self, *args, **kw)
1262
1263 def memo(*a, **kw):
1264 return result
1265
1266 memo.__name__ = fn.__name__
1267 memo.__doc__ = fn.__doc__
1268 self.__dict__[fn.__name__] = memo
1269 return result
1270
1271 return update_wrapper(oneshot, fn) # type: ignore
1272
1273
1274class HasMemoized:
1275 """A mixin class that maintains the names of memoized elements in a
1276 collection for easy cache clearing, generative, etc.
1277
1278 """
1279
1280 if not TYPE_CHECKING:
1281 # support classes that want to have __slots__ with an explicit
1282 # slot for __dict__. not sure if that requires base __slots__ here.
1283 __slots__ = ()
1284
1285 _memoized_keys: FrozenSet[str] = frozenset()
1286
1287 def _reset_memoizations(self) -> None:
1288 for elem in self._memoized_keys:
1289 self.__dict__.pop(elem, None)
1290
1291 def _assert_no_memoizations(self) -> None:
1292 for elem in self._memoized_keys:
1293 assert elem not in self.__dict__
1294
1295 def _set_memoized_attribute(self, key: str, value: Any) -> None:
1296 self.__dict__[key] = value
1297 self._memoized_keys |= {key}
1298
1299 class memoized_attribute(memoized_property[_T]):
1300 """A read-only @property that is only evaluated once.
1301
1302 :meta private:
1303
1304 """
1305
1306 fget: Callable[..., _T]
1307 __doc__: Optional[str]
1308 __name__: str
1309
1310 def __init__(self, fget: Callable[..., _T], doc: Optional[str] = None):
1311 self.fget = fget
1312 self.__doc__ = doc or fget.__doc__
1313 self.__name__ = fget.__name__
1314
1315 @overload
1316 def __get__(self: _MA, obj: None, cls: Any) -> _MA: ...
1317
1318 @overload
1319 def __get__(self, obj: Any, cls: Any) -> _T: ...
1320
1321 def __get__(self, obj, cls):
1322 if obj is None:
1323 return self
1324 obj.__dict__[self.__name__] = result = self.fget(obj)
1325 obj._memoized_keys |= {self.__name__}
1326 return result
1327
1328 @classmethod
1329 def memoized_instancemethod(cls, fn: _F) -> _F:
1330 """Decorate a method memoize its return value.
1331
1332 :meta private:
1333
1334 """
1335
1336 def oneshot(self: Any, *args: Any, **kw: Any) -> Any:
1337 result = fn(self, *args, **kw)
1338
1339 def memo(*a, **kw):
1340 return result
1341
1342 memo.__name__ = fn.__name__
1343 memo.__doc__ = fn.__doc__
1344 self.__dict__[fn.__name__] = memo
1345 self._memoized_keys |= {fn.__name__}
1346 return result
1347
1348 return update_wrapper(oneshot, fn) # type: ignore
1349
1350
1351if TYPE_CHECKING:
1352 HasMemoized_ro_memoized_attribute = property
1353else:
1354 HasMemoized_ro_memoized_attribute = HasMemoized.memoized_attribute
1355
1356
1357class MemoizedSlots:
1358 """Apply memoized items to an object using a __getattr__ scheme.
1359
1360 This allows the functionality of memoized_property and
1361 memoized_instancemethod to be available to a class using __slots__.
1362
1363 The memoized get is not threadsafe under freethreading and the
1364 creator method may in extremely rare cases be called more than once.
1365
1366 """
1367
1368 __slots__ = ()
1369
1370 def _fallback_getattr(self, key):
1371 raise AttributeError(key)
1372
1373 def __getattr__(self, key: str) -> Any:
1374 if key.startswith("_memoized_attr_") or key.startswith(
1375 "_memoized_method_"
1376 ):
1377 raise AttributeError(key)
1378 # to avoid recursion errors when interacting with other __getattr__
1379 # schemes that refer to this one, when testing for memoized method
1380 # look at __class__ only rather than going into __getattr__ again.
1381 elif hasattr(self.__class__, f"_memoized_attr_{key}"):
1382 value = getattr(self, f"_memoized_attr_{key}")()
1383 setattr(self, key, value)
1384 return value
1385 elif hasattr(self.__class__, f"_memoized_method_{key}"):
1386 meth = getattr(self, f"_memoized_method_{key}")
1387
1388 def oneshot(*args, **kw):
1389 result = meth(*args, **kw)
1390
1391 def memo(*a, **kw):
1392 return result
1393
1394 memo.__name__ = meth.__name__
1395 memo.__doc__ = meth.__doc__
1396 setattr(self, key, memo)
1397 return result
1398
1399 oneshot.__doc__ = meth.__doc__
1400 return oneshot
1401 else:
1402 return self._fallback_getattr(key)
1403
1404
1405# from paste.deploy.converters
1406def asbool(obj: Any) -> bool:
1407 if isinstance(obj, str):
1408 obj = obj.strip().lower()
1409 if obj in ["true", "yes", "on", "y", "t", "1"]:
1410 return True
1411 elif obj in ["false", "no", "off", "n", "f", "0"]:
1412 return False
1413 else:
1414 raise ValueError("String is not true/false: %r" % obj)
1415 return bool(obj)
1416
1417
1418def bool_or_str(*text: str) -> Callable[[str], Union[str, bool]]:
1419 """Return a callable that will evaluate a string as
1420 boolean, or one of a set of "alternate" string values.
1421
1422 """
1423
1424 def bool_or_value(obj: str) -> Union[str, bool]:
1425 if obj in text:
1426 return obj
1427 else:
1428 return asbool(obj)
1429
1430 return bool_or_value
1431
1432
1433def asint(value: Any) -> Optional[int]:
1434 """Coerce to integer."""
1435
1436 if value is None:
1437 return value
1438 return int(value)
1439
1440
1441def coerce_kw_type(
1442 kw: Dict[str, Any],
1443 key: str,
1444 type_: Type[Any],
1445 flexi_bool: bool = True,
1446 dest: Optional[Dict[str, Any]] = None,
1447) -> None:
1448 r"""If 'key' is present in dict 'kw', coerce its value to type 'type\_' if
1449 necessary. If 'flexi_bool' is True, the string '0' is considered false
1450 when coercing to boolean.
1451 """
1452
1453 if dest is None:
1454 dest = kw
1455
1456 if (
1457 key in kw
1458 and (not isinstance(type_, type) or not isinstance(kw[key], type_))
1459 and kw[key] is not None
1460 ):
1461 if type_ is bool and flexi_bool:
1462 dest[key] = asbool(kw[key])
1463 else:
1464 dest[key] = type_(kw[key])
1465
1466
1467def constructor_key(obj: Any, cls: Type[Any]) -> Tuple[Any, ...]:
1468 """Produce a tuple structure that is cacheable using the __dict__ of
1469 obj to retrieve values
1470
1471 """
1472 names = get_cls_kwargs(cls)
1473 return (cls,) + tuple(
1474 (k, obj.__dict__[k]) for k in names if k in obj.__dict__
1475 )
1476
1477
1478def constructor_copy(obj: _T, cls: Type[_T], *args: Any, **kw: Any) -> _T:
1479 """Instantiate cls using the __dict__ of obj as constructor arguments.
1480
1481 Uses inspect to match the named arguments of ``cls``.
1482
1483 """
1484
1485 names = get_cls_kwargs(cls)
1486 kw.update(
1487 (k, obj.__dict__[k]) for k in names.difference(kw) if k in obj.__dict__
1488 )
1489 return cls(*args, **kw)
1490
1491
1492def counter() -> Callable[[], int]:
1493 """Return a threadsafe counter function."""
1494
1495 lock = threading.Lock()
1496 counter = itertools.count(1)
1497
1498 # avoid the 2to3 "next" transformation...
1499 def _next():
1500 with lock:
1501 return next(counter)
1502
1503 return _next
1504
1505
1506def duck_type_collection(
1507 specimen: Any, default: Optional[Type[Any]] = None
1508) -> Optional[Type[Any]]:
1509 """Given an instance or class, guess if it is or is acting as one of
1510 the basic collection types: list, set and dict. If the __emulates__
1511 property is present, return that preferentially.
1512 """
1513
1514 if hasattr(specimen, "__emulates__"):
1515 # canonicalize set vs sets.Set to a standard: the builtin set
1516 if specimen.__emulates__ is not None and issubclass(
1517 specimen.__emulates__, set
1518 ):
1519 return set
1520 else:
1521 return specimen.__emulates__ # type: ignore
1522
1523 isa = issubclass if isinstance(specimen, type) else isinstance
1524 if isa(specimen, list):
1525 return list
1526 elif isa(specimen, set):
1527 return set
1528 elif isa(specimen, dict):
1529 return dict
1530
1531 if hasattr(specimen, "append"):
1532 return list
1533 elif hasattr(specimen, "add"):
1534 return set
1535 elif hasattr(specimen, "set"):
1536 return dict
1537 else:
1538 return default
1539
1540
1541def assert_arg_type(
1542 arg: Any, argtype: Union[Tuple[Type[Any], ...], Type[Any]], name: str
1543) -> Any:
1544 if isinstance(arg, argtype):
1545 return arg
1546 else:
1547 if isinstance(argtype, tuple):
1548 raise exc.ArgumentError(
1549 "Argument '%s' is expected to be one of type %s, got '%s'"
1550 % (name, " or ".join("'%s'" % a for a in argtype), type(arg))
1551 )
1552 else:
1553 raise exc.ArgumentError(
1554 "Argument '%s' is expected to be of type '%s', got '%s'"
1555 % (name, argtype, type(arg))
1556 )
1557
1558
1559def dictlike_iteritems(dictlike):
1560 """Return a (key, value) iterator for almost any dict-like object."""
1561
1562 if hasattr(dictlike, "items"):
1563 return list(dictlike.items())
1564
1565 getter = getattr(dictlike, "__getitem__", getattr(dictlike, "get", None))
1566 if getter is None:
1567 raise TypeError("Object '%r' is not dict-like" % dictlike)
1568
1569 if hasattr(dictlike, "iterkeys"):
1570
1571 def iterator():
1572 for key in dictlike.iterkeys():
1573 assert getter is not None
1574 yield key, getter(key)
1575
1576 return iterator()
1577 elif hasattr(dictlike, "keys"):
1578 return iter((key, getter(key)) for key in dictlike.keys())
1579 else:
1580 raise TypeError("Object '%r' is not dict-like" % dictlike)
1581
1582
1583class classproperty(property):
1584 """A decorator that behaves like @property except that operates
1585 on classes rather than instances.
1586
1587 The decorator is currently special when using the declarative
1588 module, but note that the
1589 :class:`~.sqlalchemy.ext.declarative.declared_attr`
1590 decorator should be used for this purpose with declarative.
1591
1592 """
1593
1594 fget: Callable[[Any], Any]
1595
1596 def __init__(self, fget: Callable[[Any], Any], *arg: Any, **kw: Any):
1597 super().__init__(fget, *arg, **kw)
1598 self.__doc__ = fget.__doc__
1599
1600 def __get__(self, obj: Any, cls: Optional[type] = None) -> Any:
1601 return self.fget(cls)
1602
1603
1604class hybridproperty(Generic[_T]):
1605 def __init__(self, func: Callable[..., _T]):
1606 self.func = func
1607 self.clslevel = func
1608
1609 def __get__(self, instance: Any, owner: Any) -> _T:
1610 if instance is None:
1611 clsval = self.clslevel(owner)
1612 return clsval
1613 else:
1614 return self.func(instance)
1615
1616 def classlevel(self, func: Callable[..., Any]) -> hybridproperty[_T]:
1617 self.clslevel = func
1618 return self
1619
1620
1621class rw_hybridproperty(Generic[_T]):
1622 def __init__(self, func: Callable[..., _T]):
1623 self.func = func
1624 self.clslevel = func
1625 self.setfn: Optional[Callable[..., Any]] = None
1626
1627 def __get__(self, instance: Any, owner: Any) -> _T:
1628 if instance is None:
1629 clsval = self.clslevel(owner)
1630 return clsval
1631 else:
1632 return self.func(instance)
1633
1634 def __set__(self, instance: Any, value: Any) -> None:
1635 assert self.setfn is not None
1636 self.setfn(instance, value)
1637
1638 def setter(self, func: Callable[..., Any]) -> rw_hybridproperty[_T]:
1639 self.setfn = func
1640 return self
1641
1642 def classlevel(self, func: Callable[..., Any]) -> rw_hybridproperty[_T]:
1643 self.clslevel = func
1644 return self
1645
1646
1647class hybridmethod(Generic[_T]):
1648 """Decorate a function as cls- or instance- level."""
1649
1650 def __init__(self, func: Callable[..., _T]):
1651 self.func = self.__func__ = func
1652 self.clslevel = func
1653
1654 def __get__(self, instance: Any, owner: Any) -> Callable[..., _T]:
1655 if instance is None:
1656 return self.clslevel.__get__( # type: ignore[no-any-return]
1657 owner, owner.__class__
1658 )
1659 else:
1660 return self.func.__get__( # type: ignore[no-any-return]
1661 instance, owner
1662 )
1663
1664 def classlevel(self, func: Callable[..., Any]) -> hybridmethod[_T]:
1665 self.clslevel = func
1666 return self
1667
1668
1669class symbol(int):
1670 """A constant symbol.
1671
1672 >>> symbol("foo") is symbol("foo")
1673 True
1674 >>> symbol("foo")
1675 <symbol 'foo>
1676
1677 A slight refinement of the MAGICCOOKIE=object() pattern. The primary
1678 advantage of symbol() is its repr(). They are also singletons.
1679
1680 Repeated calls of symbol('name') will all return the same instance.
1681
1682 """
1683
1684 name: str
1685
1686 symbols: Dict[str, symbol] = {}
1687 _lock = threading.Lock()
1688
1689 def __new__(
1690 cls,
1691 name: str,
1692 doc: Optional[str] = None,
1693 canonical: Optional[int] = None,
1694 ) -> symbol:
1695 with cls._lock:
1696 sym = cls.symbols.get(name)
1697 if sym is None:
1698 assert isinstance(name, str)
1699 if canonical is None:
1700 canonical = hash(name)
1701 sym = int.__new__(symbol, canonical)
1702 sym.name = name
1703 if doc:
1704 sym.__doc__ = doc
1705
1706 # NOTE: we should ultimately get rid of this global thing,
1707 # however, currently it is to support pickling. The best
1708 # change would be when we are on py3.11 at a minimum, we
1709 # switch to stdlib enum.IntFlag.
1710 cls.symbols[name] = sym
1711 else:
1712 if canonical and canonical != sym:
1713 raise TypeError(
1714 f"Can't replace canonical symbol for {name!r} "
1715 f"with new int value {canonical}"
1716 )
1717 return sym
1718
1719 def __reduce__(self):
1720 return symbol, (self.name, "x", int(self))
1721
1722 def __str__(self):
1723 return repr(self)
1724
1725 def __repr__(self):
1726 return f"symbol({self.name!r})"
1727
1728
1729class _IntFlagMeta(type):
1730 def __init__(
1731 cls,
1732 classname: str,
1733 bases: Tuple[Type[Any], ...],
1734 dict_: Dict[str, Any],
1735 **kw: Any,
1736 ) -> None:
1737 items: List[symbol]
1738 cls._items = items = []
1739 for k, v in dict_.items():
1740 if re.match(r"^__.*__$", k):
1741 continue
1742 if isinstance(v, int):
1743 sym = symbol(k, canonical=v)
1744 elif not k.startswith("_"):
1745 raise TypeError("Expected integer values for IntFlag")
1746 else:
1747 continue
1748 setattr(cls, k, sym)
1749 items.append(sym)
1750
1751 cls.__members__ = _collections.immutabledict(
1752 {sym.name: sym for sym in items}
1753 )
1754
1755 def __iter__(self) -> Iterator[symbol]:
1756 raise NotImplementedError(
1757 "iter not implemented to ensure compatibility with "
1758 "Python 3.11 IntFlag. Please use __members__. See "
1759 "https://github.com/python/cpython/issues/99304"
1760 )
1761
1762
1763class _FastIntFlag(metaclass=_IntFlagMeta):
1764 """An 'IntFlag' copycat that isn't slow when performing bitwise
1765 operations.
1766
1767 the ``FastIntFlag`` class will return ``enum.IntFlag`` under TYPE_CHECKING
1768 and ``_FastIntFlag`` otherwise.
1769
1770 """
1771
1772
1773if TYPE_CHECKING:
1774 from enum import IntFlag
1775
1776 FastIntFlag = IntFlag
1777else:
1778 FastIntFlag = _FastIntFlag
1779
1780
1781_E = TypeVar("_E", bound=enum.Enum)
1782
1783
1784def parse_user_argument_for_enum(
1785 arg: Any,
1786 choices: Dict[_E, List[Any]],
1787 name: str,
1788 resolve_symbol_names: bool = False,
1789) -> Optional[_E]:
1790 """Given a user parameter, parse the parameter into a chosen value
1791 from a list of choice objects, typically Enum values.
1792
1793 The user argument can be a string name that matches the name of a
1794 symbol, or the symbol object itself, or any number of alternate choices
1795 such as True/False/ None etc.
1796
1797 :param arg: the user argument.
1798 :param choices: dictionary of enum values to lists of possible
1799 entries for each.
1800 :param name: name of the argument. Used in an :class:`.ArgumentError`
1801 that is raised if the parameter doesn't match any available argument.
1802
1803 """
1804 for enum_value, choice in choices.items():
1805 if arg is enum_value:
1806 return enum_value
1807 elif resolve_symbol_names and arg == enum_value.name:
1808 return enum_value
1809 elif arg in choice:
1810 return enum_value
1811
1812 if arg is None:
1813 return None
1814
1815 raise exc.ArgumentError(f"Invalid value for '{name}': {arg!r}")
1816
1817
1818_creation_order = 1
1819
1820
1821def set_creation_order(instance: Any) -> None:
1822 """Assign a '_creation_order' sequence to the given instance.
1823
1824 This allows multiple instances to be sorted in order of creation
1825 (typically within a single thread; the counter is not particularly
1826 threadsafe).
1827
1828 """
1829 global _creation_order
1830 instance._creation_order = _creation_order
1831 _creation_order += 1
1832
1833
1834def warn_exception(func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any:
1835 """executes the given function, catches all exceptions and converts to
1836 a warning.
1837
1838 """
1839 try:
1840 return func(*args, **kwargs)
1841 except Exception:
1842 warn("%s('%s') ignored" % sys.exc_info()[0:2])
1843
1844
1845def ellipses_string(value, len_=25):
1846 try:
1847 if len(value) > len_:
1848 return "%s..." % value[0:len_]
1849 else:
1850 return value
1851 except TypeError:
1852 return value
1853
1854
1855class _hash_limit_string(str):
1856 """A string subclass that can only be hashed on a maximum amount
1857 of unique values.
1858
1859 This is used for warnings so that we can send out parameterized warnings
1860 without the __warningregistry__ of the module, or the non-overridable
1861 "once" registry within warnings.py, overloading memory,
1862
1863
1864 """
1865
1866 _hash: int
1867
1868 def __new__(
1869 cls, value: str, num: int, args: Sequence[Any]
1870 ) -> _hash_limit_string:
1871 interpolated = (value % args) + (
1872 " (this warning may be suppressed after %d occurrences)" % num
1873 )
1874 self = super().__new__(cls, interpolated)
1875 self._hash = hash("%s_%d" % (value, hash(interpolated) % num))
1876 return self
1877
1878 def __hash__(self) -> int:
1879 return self._hash
1880
1881 def __eq__(self, other: Any) -> bool:
1882 return hash(self) == hash(other)
1883
1884
1885def warn(msg: str, code: Optional[str] = None) -> None:
1886 """Issue a warning.
1887
1888 If msg is a string, :class:`.exc.SAWarning` is used as
1889 the category.
1890
1891 """
1892 if code:
1893 _warnings_warn(exc.SAWarning(msg, code=code))
1894 else:
1895 _warnings_warn(msg, exc.SAWarning)
1896
1897
1898def warn_limited(msg: str, args: Sequence[Any]) -> None:
1899 """Issue a warning with a parameterized string, limiting the number
1900 of registrations.
1901
1902 """
1903 if args:
1904 msg = _hash_limit_string(msg, 10, args)
1905 _warnings_warn(msg, exc.SAWarning)
1906
1907
1908_warning_tags: Dict[CodeType, Tuple[str, Type[Warning]]] = {}
1909
1910
1911def tag_method_for_warnings(
1912 message: str, category: Type[Warning]
1913) -> Callable[[_F], _F]:
1914 def go(fn):
1915 _warning_tags[fn.__code__] = (message, category)
1916 return fn
1917
1918 return go
1919
1920
1921_not_sa_pattern = re.compile(r"^(?:sqlalchemy\.(?!testing)|alembic\.)")
1922
1923
1924def _warnings_warn(
1925 message: Union[str, Warning],
1926 category: Optional[Type[Warning]] = None,
1927 stacklevel: int = 2,
1928) -> None:
1929
1930 if category is None and isinstance(message, Warning):
1931 category = type(message)
1932
1933 # adjust the given stacklevel to be outside of SQLAlchemy
1934 try:
1935 frame = sys._getframe(stacklevel)
1936 except ValueError:
1937 # being called from less than 3 (or given) stacklevels, weird,
1938 # but don't crash
1939 stacklevel = 0
1940 except:
1941 # _getframe() doesn't work, weird interpreter issue, weird,
1942 # ok, but don't crash
1943 stacklevel = 0
1944 else:
1945 stacklevel_found = warning_tag_found = False
1946 while frame is not None:
1947 # using __name__ here requires that we have __name__ in the
1948 # __globals__ of the decorated string functions we make also.
1949 # we generate this using {"__name__": fn.__module__}
1950 if not stacklevel_found and not re.match(
1951 _not_sa_pattern, frame.f_globals.get("__name__", "")
1952 ):
1953 # stop incrementing stack level if an out-of-SQLA line
1954 # were found.
1955 stacklevel_found = True
1956
1957 # however, for the warning tag thing, we have to keep
1958 # scanning up the whole traceback
1959
1960 if frame.f_code in _warning_tags:
1961 warning_tag_found = True
1962 _suffix, _category = _warning_tags[frame.f_code]
1963 category = category or _category
1964 message = f"{message} ({_suffix})"
1965
1966 frame = frame.f_back # type: ignore[assignment]
1967
1968 if not stacklevel_found:
1969 stacklevel += 1
1970 elif stacklevel_found and warning_tag_found:
1971 break
1972
1973 if category is not None:
1974 warnings.warn(message, category, stacklevel=stacklevel + 1)
1975 else:
1976 warnings.warn(message, stacklevel=stacklevel + 1)
1977
1978
1979def only_once(
1980 fn: Callable[..., _T], retry_on_exception: bool
1981) -> Callable[..., Optional[_T]]:
1982 """Decorate the given function to be a no-op after it is called exactly
1983 once."""
1984
1985 once = [fn]
1986
1987 def go(*arg: Any, **kw: Any) -> Optional[_T]:
1988 # strong reference fn so that it isn't garbage collected,
1989 # which interferes with the event system's expectations
1990 strong_fn = fn # noqa
1991 if once:
1992 once_fn = once.pop()
1993 try:
1994 return once_fn(*arg, **kw)
1995 except:
1996 if retry_on_exception:
1997 once.insert(0, once_fn)
1998 raise
1999
2000 return None
2001
2002 return go
2003
2004
2005_SQLA_RE = re.compile(r"sqlalchemy/([a-z_]+/){0,2}[a-z_]+\.py")
2006_UNITTEST_RE = re.compile(r"unit(?:2|test2?/)")
2007
2008
2009def chop_traceback(
2010 tb: List[str],
2011 exclude_prefix: re.Pattern[str] = _UNITTEST_RE,
2012 exclude_suffix: re.Pattern[str] = _SQLA_RE,
2013) -> List[str]:
2014 """Chop extraneous lines off beginning and end of a traceback.
2015
2016 :param tb:
2017 a list of traceback lines as returned by ``traceback.format_stack()``
2018
2019 :param exclude_prefix:
2020 a regular expression object matching lines to skip at beginning of
2021 ``tb``
2022
2023 :param exclude_suffix:
2024 a regular expression object matching lines to skip at end of ``tb``
2025 """
2026 start = 0
2027 end = len(tb) - 1
2028 while start <= end and exclude_prefix.search(tb[start]):
2029 start += 1
2030 while start <= end and exclude_suffix.search(tb[end]):
2031 end -= 1
2032 return tb[start : end + 1]
2033
2034
2035def attrsetter(attrname):
2036 code = "def set(obj, value): obj.%s = value" % attrname
2037 env = locals().copy()
2038 exec(code, env)
2039 return env["set"]
2040
2041
2042dunders_re = re.compile("^__.+__$")
2043
2044
2045class TypingOnly:
2046 """A mixin class that marks a class as 'typing only', meaning it has
2047 absolutely no methods, attributes, or runtime functionality whatsoever.
2048
2049 """
2050
2051 __slots__ = ()
2052
2053 def __init_subclass__(cls, **kw: Any) -> None:
2054 if TypingOnly in cls.__bases__:
2055 remaining = {
2056 name for name in cls.__dict__ if not dunders_re.match(name)
2057 }
2058 if remaining:
2059 raise AssertionError(
2060 f"Class {cls} directly inherits TypingOnly but has "
2061 f"additional attributes {remaining}."
2062 )
2063 super().__init_subclass__(**kw)
2064
2065
2066class EnsureKWArg:
2067 r"""Apply translation of functions to accept \**kw arguments if they
2068 don't already.
2069
2070 Used to ensure cross-compatibility with third party legacy code, for things
2071 like compiler visit methods that need to accept ``**kw`` arguments,
2072 but may have been copied from old code that didn't accept them.
2073
2074 """
2075
2076 ensure_kwarg: str
2077 """a regular expression that indicates method names for which the method
2078 should accept ``**kw`` arguments.
2079
2080 The class will scan for methods matching the name template and decorate
2081 them if necessary to ensure ``**kw`` parameters are accepted.
2082
2083 """
2084
2085 def __init_subclass__(cls) -> None:
2086 fn_reg = cls.ensure_kwarg
2087 clsdict = cls.__dict__
2088 if fn_reg:
2089 for key in clsdict:
2090 m = re.match(fn_reg, key)
2091 if m:
2092 fn = clsdict[key]
2093 spec = compat.inspect_getfullargspec(fn)
2094 if not spec.varkw:
2095 wrapped = cls._wrap_w_kw(fn)
2096 setattr(cls, key, wrapped)
2097 super().__init_subclass__()
2098
2099 @classmethod
2100 def _wrap_w_kw(cls, fn: Callable[..., Any]) -> Callable[..., Any]:
2101 def wrap(*arg: Any, **kw: Any) -> Any:
2102 return fn(*arg)
2103
2104 return update_wrapper(wrap, fn)
2105
2106
2107def wrap_callable(wrapper, fn):
2108 """Augment functools.update_wrapper() to work with objects with
2109 a ``__call__()`` method.
2110
2111 :param fn:
2112 object with __call__ method
2113
2114 """
2115 if hasattr(fn, "__name__"):
2116 return update_wrapper(wrapper, fn)
2117 else:
2118 _f = wrapper
2119 _f.__name__ = fn.__class__.__name__
2120 if hasattr(fn, "__module__"):
2121 _f.__module__ = fn.__module__
2122
2123 if hasattr(fn.__call__, "__doc__") and fn.__call__.__doc__:
2124 _f.__doc__ = fn.__call__.__doc__
2125 elif fn.__doc__:
2126 _f.__doc__ = fn.__doc__
2127
2128 return _f
2129
2130
2131def quoted_token_parser(value):
2132 """Parse a dotted identifier with accommodation for quoted names.
2133
2134 Includes support for SQL-style double quotes as a literal character.
2135
2136 E.g.::
2137
2138 >>> quoted_token_parser("name")
2139 ["name"]
2140 >>> quoted_token_parser("schema.name")
2141 ["schema", "name"]
2142 >>> quoted_token_parser('"Schema"."Name"')
2143 ['Schema', 'Name']
2144 >>> quoted_token_parser('"Schema"."Name""Foo"')
2145 ['Schema', 'Name""Foo']
2146
2147 """
2148
2149 if '"' not in value:
2150 return value.split(".")
2151
2152 # 0 = outside of quotes
2153 # 1 = inside of quotes
2154 state = 0
2155 result: List[List[str]] = [[]]
2156 idx = 0
2157 lv = len(value)
2158 while idx < lv:
2159 char = value[idx]
2160 if char == '"':
2161 if state == 1 and idx < lv - 1 and value[idx + 1] == '"':
2162 result[-1].append('"')
2163 idx += 1
2164 else:
2165 state ^= 1
2166 elif char == "." and state == 0:
2167 result.append([])
2168 else:
2169 result[-1].append(char)
2170 idx += 1
2171
2172 return ["".join(token) for token in result]
2173
2174
2175def add_parameter_text(params: Any, text: str) -> Callable[[_F], _F]:
2176 params = _collections.to_list(params)
2177
2178 def decorate(fn):
2179 doc = fn.__doc__ is not None and fn.__doc__ or ""
2180 if doc:
2181 doc = inject_param_text(doc, {param: text for param in params})
2182 fn.__doc__ = doc
2183 return fn
2184
2185 return decorate
2186
2187
2188def _dedent_docstring(text: str) -> str:
2189 split_text = text.split("\n", 1)
2190 if len(split_text) == 1:
2191 return text
2192 else:
2193 firstline, remaining = split_text
2194 if not firstline.startswith(" "):
2195 return firstline + "\n" + textwrap.dedent(remaining)
2196 else:
2197 return textwrap.dedent(text)
2198
2199
2200def inject_docstring_text(
2201 given_doctext: Optional[str], injecttext: str, pos: int
2202) -> str:
2203 doctext: str = _dedent_docstring(given_doctext or "")
2204 lines = doctext.split("\n")
2205 if len(lines) == 1:
2206 lines.append("")
2207 injectlines = textwrap.dedent(injecttext).split("\n")
2208 if injectlines[0]:
2209 injectlines.insert(0, "")
2210
2211 blanks = [num for num, line in enumerate(lines) if not line.strip()]
2212 blanks.insert(0, 0)
2213
2214 inject_pos = blanks[min(pos, len(blanks) - 1)]
2215
2216 lines = lines[0:inject_pos] + injectlines + lines[inject_pos:]
2217 return "\n".join(lines)
2218
2219
2220_param_reg = re.compile(r"(\s+):param (.+?):")
2221
2222
2223def inject_param_text(doctext: str, inject_params: Dict[str, str]) -> str:
2224 doclines = collections.deque(doctext.splitlines())
2225 lines = []
2226
2227 # TODO: this is not working for params like ":param case_sensitive=True:"
2228
2229 to_inject = None
2230 while doclines:
2231 line = doclines.popleft()
2232
2233 m = _param_reg.match(line)
2234
2235 if to_inject is None:
2236 if m:
2237 param = m.group(2).lstrip("*")
2238 if param in inject_params:
2239 # default indent to that of :param: plus one
2240 indent = " " * len(m.group(1)) + " "
2241
2242 # but if the next line has text, use that line's
2243 # indentation
2244 if doclines:
2245 m2 = re.match(r"(\s+)\S", doclines[0])
2246 if m2:
2247 indent = " " * len(m2.group(1))
2248
2249 to_inject = indent + inject_params[param]
2250 elif m:
2251 lines.extend(["\n", to_inject, "\n"])
2252 to_inject = None
2253 elif not line.rstrip():
2254 lines.extend([line, to_inject, "\n"])
2255 to_inject = None
2256 elif line.endswith("::"):
2257 # TODO: this still won't cover if the code example itself has
2258 # blank lines in it, need to detect those via indentation.
2259 lines.extend([line, doclines.popleft()])
2260 continue
2261 lines.append(line)
2262
2263 return "\n".join(lines)
2264
2265
2266def repr_tuple_names(names: List[str]) -> Optional[str]:
2267 """Trims a list of strings from the middle and return a string of up to
2268 four elements. Strings greater than 11 characters will be truncated"""
2269 if len(names) == 0:
2270 return None
2271 flag = len(names) <= 4
2272 names = names[0:4] if flag else names[0:3] + names[-1:]
2273 res = ["%s.." % name[:11] if len(name) > 11 else name for name in names]
2274 if flag:
2275 return ", ".join(res)
2276 else:
2277 return "%s, ..., %s" % (", ".join(res[0:3]), res[-1])
2278
2279
2280def has_compiled_ext(raise_=False):
2281 from ._has_cython import HAS_CYEXTENSION
2282
2283 if HAS_CYEXTENSION:
2284 return True
2285 elif raise_:
2286 raise ImportError(
2287 "cython extensions were expected to be installed, "
2288 "but are not present"
2289 )
2290 else:
2291 return False
2292
2293
2294def load_uncompiled_module(module: _M) -> _M:
2295 """Load the non-compied version of a module that is also
2296 compiled with cython.
2297 """
2298 full_name = module.__name__
2299 assert module.__spec__
2300 parent_name = module.__spec__.parent
2301 assert parent_name
2302 parent_module = sys.modules[parent_name]
2303 assert parent_module.__spec__
2304 package_path = parent_module.__spec__.origin
2305 assert package_path and package_path.endswith("__init__.py")
2306
2307 name = full_name.split(".")[-1]
2308 module_path = package_path.replace("__init__.py", f"{name}.py")
2309
2310 py_spec = importlib.util.spec_from_file_location(full_name, module_path)
2311 assert py_spec
2312 py_module = importlib.util.module_from_spec(py_spec)
2313 assert py_spec.loader
2314 py_spec.loader.exec_module(py_module)
2315 return cast(_M, py_module)
2316
2317
2318class _Missing(enum.Enum):
2319 Missing = enum.auto()
2320
2321
2322Missing = _Missing.Missing
2323MissingOr = Union[_T, Literal[_Missing.Missing]]