1# util/langhelpers.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Routines to help with the creation, loading and introspection of
10modules, classes, hierarchies, attributes, functions, and methods.
11
12"""
13from __future__ import annotations
14
15import collections
16import enum
17from functools import update_wrapper
18import importlib.util
19import inspect
20import itertools
21import operator
22import re
23import sys
24import textwrap
25import threading
26import types
27from types import CodeType
28from types import ModuleType
29from typing import Any
30from typing import Callable
31from typing import cast
32from typing import Dict
33from typing import FrozenSet
34from typing import Generic
35from typing import Iterator
36from typing import List
37from typing import Literal
38from typing import NoReturn
39from typing import Optional
40from typing import overload
41from typing import Sequence
42from typing import Set
43from typing import Tuple
44from typing import Type
45from typing import TYPE_CHECKING
46from typing import TypeVar
47from typing import Union
48import warnings
49
50from . import _collections
51from . import compat
52from .. import exc
53
54_T = TypeVar("_T")
55_T_co = TypeVar("_T_co", covariant=True)
56_F = TypeVar("_F", bound=Callable[..., Any])
57_MA = TypeVar("_MA", bound="HasMemoized.memoized_attribute[Any]")
58_M = TypeVar("_M", bound=ModuleType)
59
60
61def restore_annotations(
62 cls: type, new_annotations: dict[str, Any]
63) -> Callable[[], None]:
64 """apply alternate annotations to a class, with a callable to restore
65 the pristine state of the former.
66 This is used strictly to provide dataclasses on a mapped class, where
67 in some cases where are making dataclass fields based on an attribute
68 that is actually a python descriptor on a superclass which we called
69 to get a value.
70 if dataclasses were to give us a way to achieve this without swapping
71 __annotations__, that would be much better.
72 """
73 delattr_ = object()
74
75 # pep-649 means classes have "__annotate__", and it's a callable. if it's
76 # there and is None, we're in "legacy future mode", where it's python 3.14
77 # or higher and "from __future__ import annotations" is set. in "legacy
78 # future mode" we have to do the same steps we do for older pythons,
79 # __annotate__ can be ignored
80 is_pep649 = hasattr(cls, "__annotate__") and cls.__annotate__ is not None
81
82 if is_pep649:
83 memoized = {
84 "__annotate__": getattr(cls, "__annotate__", delattr_),
85 }
86 else:
87 memoized = {
88 "__annotations__": getattr(cls, "__annotations__", delattr_)
89 }
90
91 cls.__annotations__ = new_annotations
92
93 def restore():
94 for k, v in memoized.items():
95 if v is delattr_:
96 delattr(cls, k)
97 else:
98 setattr(cls, k, v)
99
100 return restore
101
102
103def md5_hex(x: Any) -> str:
104 x = x.encode("utf-8")
105 m = compat.md5_not_for_security()
106 m.update(x)
107 return cast(str, m.hexdigest())
108
109
110class safe_reraise:
111 """Reraise an exception after invoking some
112 handler code.
113
114 Stores the existing exception info before
115 invoking so that it is maintained across a potential
116 coroutine context switch.
117
118 e.g.::
119
120 try:
121 sess.commit()
122 except:
123 with safe_reraise():
124 sess.rollback()
125
126 TODO: we should at some point evaluate current behaviors in this regard
127 based on current greenlet, gevent/eventlet implementations in Python 3, and
128 also see the degree to which our own asyncio (based on greenlet also) is
129 impacted by this. .rollback() will cause IO / context switch to occur in
130 all these scenarios; what happens to the exception context from an
131 "except:" block if we don't explicitly store it? Original issue was #2703.
132
133 """
134
135 __slots__ = ("_exc_info",)
136
137 _exc_info: Union[
138 None,
139 Tuple[
140 Type[BaseException],
141 BaseException,
142 types.TracebackType,
143 ],
144 Tuple[None, None, None],
145 ]
146
147 def __enter__(self) -> None:
148 self._exc_info = sys.exc_info()
149
150 def __exit__(
151 self,
152 type_: Optional[Type[BaseException]],
153 value: Optional[BaseException],
154 traceback: Optional[types.TracebackType],
155 ) -> NoReturn:
156 assert self._exc_info is not None
157 # see #2703 for notes
158 if type_ is None:
159 exc_type, exc_value, exc_tb = self._exc_info
160 assert exc_value is not None
161 self._exc_info = None # remove potential circular references
162 raise exc_value.with_traceback(exc_tb)
163 else:
164 self._exc_info = None # remove potential circular references
165 assert value is not None
166 raise value.with_traceback(traceback)
167
168
169def walk_subclasses(cls: Type[_T]) -> Iterator[Type[_T]]:
170 seen: Set[Any] = set()
171
172 stack = [cls]
173 while stack:
174 cls = stack.pop()
175 if cls in seen:
176 continue
177 else:
178 seen.add(cls)
179 stack.extend(cls.__subclasses__())
180 yield cls
181
182
183def string_or_unprintable(element: Any) -> str:
184 if isinstance(element, str):
185 return element
186 else:
187 try:
188 return str(element)
189 except Exception:
190 return "unprintable element %r" % element
191
192
193def clsname_as_plain_name(
194 cls: Type[Any], use_name: Optional[str] = None
195) -> str:
196 name = use_name or cls.__name__
197 return " ".join(n.lower() for n in re.findall(r"([A-Z][a-z]+|SQL)", name))
198
199
200def method_is_overridden(
201 instance_or_cls: Union[Type[Any], object],
202 against_method: Callable[..., Any],
203) -> bool:
204 """Return True if the two class methods don't match."""
205
206 if not isinstance(instance_or_cls, type):
207 current_cls = instance_or_cls.__class__
208 else:
209 current_cls = instance_or_cls
210
211 method_name = against_method.__name__
212
213 current_method: types.MethodType = getattr(current_cls, method_name)
214
215 return current_method != against_method
216
217
218def decode_slice(slc: slice) -> Tuple[Any, ...]:
219 """decode a slice object as sent to __getitem__.
220
221 takes into account the 2.5 __index__() method, basically.
222
223 """
224 ret: List[Any] = []
225 for x in slc.start, slc.stop, slc.step:
226 if hasattr(x, "__index__"):
227 x = x.__index__()
228 ret.append(x)
229 return tuple(ret)
230
231
232def _unique_symbols(used: Sequence[str], *bases: str) -> Iterator[str]:
233 used_set = set(used)
234 for base in bases:
235 pool = itertools.chain(
236 (base,),
237 map(lambda i: base + str(i), range(1000)),
238 )
239 for sym in pool:
240 if sym not in used_set:
241 used_set.add(sym)
242 yield sym
243 break
244 else:
245 raise NameError("exhausted namespace for symbol base %s" % base)
246
247
248def map_bits(fn: Callable[[int], Any], n: int) -> Iterator[Any]:
249 """Call the given function given each nonzero bit from n."""
250
251 while n:
252 b = n & (~n + 1)
253 yield fn(b)
254 n ^= b
255
256
257_Fn = TypeVar("_Fn", bound="Callable[..., Any]")
258
259# this seems to be in flux in recent mypy versions
260
261
262def decorator(target: Callable[..., Any]) -> Callable[[_Fn], _Fn]:
263 """A signature-matching decorator factory."""
264
265 def decorate(fn: _Fn) -> _Fn:
266 if not inspect.isfunction(fn) and not inspect.ismethod(fn):
267 raise Exception("not a decoratable function")
268
269 # Python 3.14 defer creating __annotations__ until its used.
270 # We do not want to create __annotations__ now.
271 annofunc = getattr(fn, "__annotate__", None)
272 if annofunc is not None:
273 fn.__annotate__ = None # type: ignore[union-attr]
274 try:
275 spec = compat.inspect_getfullargspec(fn)
276 finally:
277 fn.__annotate__ = annofunc # type: ignore[union-attr]
278 else:
279 spec = compat.inspect_getfullargspec(fn)
280
281 # Do not generate code for annotations.
282 # update_wrapper() copies the annotation from fn to decorated.
283 # We use dummy defaults for code generation to avoid having
284 # copy of large globals for compiling.
285 # We copy __defaults__ and __kwdefaults__ from fn to decorated.
286 empty_defaults = (None,) * len(spec.defaults or ())
287 empty_kwdefaults = dict.fromkeys(spec.kwonlydefaults or ())
288 spec = spec._replace(
289 annotations={},
290 defaults=empty_defaults,
291 kwonlydefaults=empty_kwdefaults,
292 )
293
294 names = (
295 tuple(cast("Tuple[str, ...]", spec[0]))
296 + cast("Tuple[str, ...]", spec[1:3])
297 + (fn.__name__,)
298 )
299 targ_name, fn_name = _unique_symbols(names, "target", "fn")
300
301 metadata: Dict[str, Optional[str]] = dict(target=targ_name, fn=fn_name)
302 metadata.update(format_argspec_plus(spec, grouped=False))
303 metadata["name"] = fn.__name__
304
305 if inspect.iscoroutinefunction(fn):
306 metadata["prefix"] = "async "
307 metadata["target_prefix"] = "await "
308 else:
309 metadata["prefix"] = ""
310 metadata["target_prefix"] = ""
311
312 # look for __ positional arguments. This is a convention in
313 # SQLAlchemy that arguments should be passed positionally
314 # rather than as keyword
315 # arguments. note that apply_pos doesn't currently work in all cases
316 # such as when a kw-only indicator "*" is present, which is why
317 # we limit the use of this to just that case we can detect. As we add
318 # more kinds of methods that use @decorator, things may have to
319 # be further improved in this area
320 if "__" in repr(spec[0]):
321 code = (
322 """\
323%(prefix)sdef %(name)s%(grouped_args)s:
324 return %(target_prefix)s%(target)s(%(fn)s, %(apply_pos)s)
325"""
326 % metadata
327 )
328 else:
329 code = (
330 """\
331%(prefix)sdef %(name)s%(grouped_args)s:
332 return %(target_prefix)s%(target)s(%(fn)s, %(apply_kw)s)
333"""
334 % metadata
335 )
336
337 env: Dict[str, Any] = {
338 targ_name: target,
339 fn_name: fn,
340 "__name__": fn.__module__,
341 }
342
343 decorated = cast(
344 types.FunctionType,
345 _exec_code_in_env(code, env, fn.__name__),
346 )
347 decorated.__defaults__ = fn.__defaults__
348 decorated.__kwdefaults__ = fn.__kwdefaults__ # type: ignore
349 return update_wrapper(decorated, fn) # type: ignore[return-value]
350
351 return update_wrapper(decorate, target) # type: ignore[return-value]
352
353
354def _exec_code_in_env(
355 code: Union[str, types.CodeType], env: Dict[str, Any], fn_name: str
356) -> Callable[..., Any]:
357 exec(code, env)
358 return env[fn_name] # type: ignore[no-any-return]
359
360
361_PF = TypeVar("_PF")
362_TE = TypeVar("_TE")
363
364
365class PluginLoader:
366 def __init__(
367 self, group: str, auto_fn: Optional[Callable[..., Any]] = None
368 ):
369 self.group = group
370 self.impls: Dict[str, Any] = {}
371 self.auto_fn = auto_fn
372
373 def clear(self):
374 self.impls.clear()
375
376 def load(self, name: str) -> Any:
377 if name in self.impls:
378 return self.impls[name]()
379
380 if self.auto_fn:
381 loader = self.auto_fn(name)
382 if loader:
383 self.impls[name] = loader
384 return loader()
385
386 for impl in compat.importlib_metadata_get(self.group):
387 if impl.name == name:
388 self.impls[name] = impl.load
389 return impl.load()
390
391 raise exc.NoSuchModuleError(
392 "Can't load plugin: %s:%s" % (self.group, name)
393 )
394
395 def register(self, name: str, modulepath: str, objname: str) -> None:
396 def load():
397 mod = __import__(modulepath)
398 for token in modulepath.split(".")[1:]:
399 mod = getattr(mod, token)
400 return getattr(mod, objname)
401
402 self.impls[name] = load
403
404 def deregister(self, name: str) -> None:
405 del self.impls[name]
406
407
408def _inspect_func_args(fn):
409 try:
410 co_varkeywords = inspect.CO_VARKEYWORDS
411 except AttributeError:
412 # https://docs.python.org/3/library/inspect.html
413 # The flags are specific to CPython, and may not be defined in other
414 # Python implementations. Furthermore, the flags are an implementation
415 # detail, and can be removed or deprecated in future Python releases.
416 spec = compat.inspect_getfullargspec(fn)
417 return spec[0], bool(spec[2])
418 else:
419 # use fn.__code__ plus flags to reduce method call overhead
420 co = fn.__code__
421 nargs = co.co_argcount
422 return (
423 list(co.co_varnames[:nargs]),
424 bool(co.co_flags & co_varkeywords),
425 )
426
427
428@overload
429def get_cls_kwargs(
430 cls: type,
431 *,
432 _set: Optional[Set[str]] = None,
433 raiseerr: Literal[True] = ...,
434) -> Set[str]: ...
435
436
437@overload
438def get_cls_kwargs(
439 cls: type, *, _set: Optional[Set[str]] = None, raiseerr: bool = False
440) -> Optional[Set[str]]: ...
441
442
443def get_cls_kwargs(
444 cls: type, *, _set: Optional[Set[str]] = None, raiseerr: bool = False
445) -> Optional[Set[str]]:
446 r"""Return the full set of inherited kwargs for the given `cls`.
447
448 Probes a class's __init__ method, collecting all named arguments. If the
449 __init__ defines a \**kwargs catch-all, then the constructor is presumed
450 to pass along unrecognized keywords to its base classes, and the
451 collection process is repeated recursively on each of the bases.
452
453 Uses a subset of inspect.getfullargspec() to cut down on method overhead,
454 as this is used within the Core typing system to create copies of type
455 objects which is a performance-sensitive operation.
456
457 No anonymous tuple arguments please !
458
459 """
460 toplevel = _set is None
461 if toplevel:
462 _set = set()
463 assert _set is not None
464
465 ctr = cls.__dict__.get("__init__", False)
466
467 has_init = (
468 ctr
469 and isinstance(ctr, types.FunctionType)
470 and isinstance(ctr.__code__, types.CodeType)
471 )
472
473 if has_init:
474 names, has_kw = _inspect_func_args(ctr)
475 _set.update(names)
476
477 if not has_kw and not toplevel:
478 if raiseerr:
479 raise TypeError(
480 f"given cls {cls} doesn't have an __init__ method"
481 )
482 else:
483 return None
484 else:
485 has_kw = False
486
487 if not has_init or has_kw:
488 for c in cls.__bases__:
489 if get_cls_kwargs(c, _set=_set) is None:
490 break
491
492 _set.discard("self")
493 return _set
494
495
496def get_func_kwargs(func: Callable[..., Any]) -> List[str]:
497 """Return the set of legal kwargs for the given `func`.
498
499 Uses getargspec so is safe to call for methods, functions,
500 etc.
501
502 """
503
504 return compat.inspect_getfullargspec(func)[0]
505
506
507def get_callable_argspec(
508 fn: Callable[..., Any], no_self: bool = False, _is_init: bool = False
509) -> compat.FullArgSpec:
510 """Return the argument signature for any callable.
511
512 All pure-Python callables are accepted, including
513 functions, methods, classes, objects with __call__;
514 builtins and other edge cases like functools.partial() objects
515 raise a TypeError.
516
517 """
518 if inspect.isbuiltin(fn):
519 raise TypeError("Can't inspect builtin: %s" % fn)
520 elif inspect.isfunction(fn):
521 if _is_init and no_self:
522 spec = compat.inspect_getfullargspec(fn)
523 return compat.FullArgSpec(
524 spec.args[1:],
525 spec.varargs,
526 spec.varkw,
527 spec.defaults,
528 spec.kwonlyargs,
529 spec.kwonlydefaults,
530 spec.annotations,
531 )
532 else:
533 return compat.inspect_getfullargspec(fn)
534 elif inspect.ismethod(fn):
535 if no_self and (_is_init or fn.__self__):
536 spec = compat.inspect_getfullargspec(fn.__func__)
537 return compat.FullArgSpec(
538 spec.args[1:],
539 spec.varargs,
540 spec.varkw,
541 spec.defaults,
542 spec.kwonlyargs,
543 spec.kwonlydefaults,
544 spec.annotations,
545 )
546 else:
547 return compat.inspect_getfullargspec(fn.__func__)
548 elif inspect.isclass(fn):
549 return get_callable_argspec(
550 fn.__init__, no_self=no_self, _is_init=True
551 )
552 elif hasattr(fn, "__func__"):
553 return compat.inspect_getfullargspec(fn.__func__)
554 elif hasattr(fn, "__call__"):
555 if inspect.ismethod(fn.__call__):
556 return get_callable_argspec(fn.__call__, no_self=no_self)
557 else:
558 raise TypeError("Can't inspect callable: %s" % fn)
559 else:
560 raise TypeError("Can't inspect callable: %s" % fn)
561
562
563def format_argspec_plus(
564 fn: Union[Callable[..., Any], compat.FullArgSpec], grouped: bool = True
565) -> Dict[str, Optional[str]]:
566 """Returns a dictionary of formatted, introspected function arguments.
567
568 A enhanced variant of inspect.formatargspec to support code generation.
569
570 fn
571 An inspectable callable or tuple of inspect getargspec() results.
572 grouped
573 Defaults to True; include (parens, around, argument) lists
574
575 Returns:
576
577 args
578 Full inspect.formatargspec for fn
579 self_arg
580 The name of the first positional argument, varargs[0], or None
581 if the function defines no positional arguments.
582 apply_pos
583 args, re-written in calling rather than receiving syntax. Arguments are
584 passed positionally.
585 apply_kw
586 Like apply_pos, except keyword-ish args are passed as keywords.
587 apply_pos_proxied
588 Like apply_pos but omits the self/cls argument
589
590 Example::
591
592 >>> format_argspec_plus(lambda self, a, b, c=3, **d: 123)
593 {'grouped_args': '(self, a, b, c=3, **d)',
594 'self_arg': 'self',
595 'apply_kw': '(self, a, b, c=c, **d)',
596 'apply_pos': '(self, a, b, c, **d)'}
597
598 """
599 if callable(fn):
600 spec = compat.inspect_getfullargspec(fn)
601 else:
602 spec = fn
603
604 args = compat.inspect_formatargspec(*spec)
605
606 apply_pos = compat.inspect_formatargspec(
607 spec[0], spec[1], spec[2], None, spec[4]
608 )
609
610 if spec[0]:
611 self_arg = spec[0][0]
612
613 apply_pos_proxied = compat.inspect_formatargspec(
614 spec[0][1:], spec[1], spec[2], None, spec[4]
615 )
616
617 elif spec[1]:
618 # I'm not sure what this is
619 self_arg = "%s[0]" % spec[1]
620
621 apply_pos_proxied = apply_pos
622 else:
623 self_arg = None
624 apply_pos_proxied = apply_pos
625
626 num_defaults = 0
627 if spec[3]:
628 num_defaults += len(cast(Tuple[Any], spec[3]))
629 if spec[4]:
630 num_defaults += len(spec[4])
631
632 name_args = spec[0] + spec[4]
633
634 defaulted_vals: Union[List[str], Tuple[()]]
635
636 if num_defaults:
637 defaulted_vals = name_args[0 - num_defaults :]
638 else:
639 defaulted_vals = ()
640
641 apply_kw = compat.inspect_formatargspec(
642 name_args,
643 spec[1],
644 spec[2],
645 defaulted_vals,
646 formatvalue=lambda x: "=" + str(x),
647 )
648
649 if spec[0]:
650 apply_kw_proxied = compat.inspect_formatargspec(
651 name_args[1:],
652 spec[1],
653 spec[2],
654 defaulted_vals,
655 formatvalue=lambda x: "=" + str(x),
656 )
657 else:
658 apply_kw_proxied = apply_kw
659
660 if grouped:
661 return dict(
662 grouped_args=args,
663 self_arg=self_arg,
664 apply_pos=apply_pos,
665 apply_kw=apply_kw,
666 apply_pos_proxied=apply_pos_proxied,
667 apply_kw_proxied=apply_kw_proxied,
668 )
669 else:
670 return dict(
671 grouped_args=args,
672 self_arg=self_arg,
673 apply_pos=apply_pos[1:-1],
674 apply_kw=apply_kw[1:-1],
675 apply_pos_proxied=apply_pos_proxied[1:-1],
676 apply_kw_proxied=apply_kw_proxied[1:-1],
677 )
678
679
680def format_argspec_init(method, grouped=True):
681 """format_argspec_plus with considerations for typical __init__ methods
682
683 Wraps format_argspec_plus with error handling strategies for typical
684 __init__ cases:
685
686 .. sourcecode:: text
687
688 object.__init__ -> (self)
689 other unreflectable (usually C) -> (self, *args, **kwargs)
690
691 """
692 if method is object.__init__:
693 grouped_args = "(self)"
694 args = "(self)" if grouped else "self"
695 proxied = "()" if grouped else ""
696 else:
697 try:
698 return format_argspec_plus(method, grouped=grouped)
699 except TypeError:
700 grouped_args = "(self, *args, **kwargs)"
701 args = grouped_args if grouped else "self, *args, **kwargs"
702 proxied = "(*args, **kwargs)" if grouped else "*args, **kwargs"
703 return dict(
704 self_arg="self",
705 grouped_args=grouped_args,
706 apply_pos=args,
707 apply_kw=args,
708 apply_pos_proxied=proxied,
709 apply_kw_proxied=proxied,
710 )
711
712
713def create_proxy_methods(
714 target_cls: Type[Any],
715 target_cls_sphinx_name: str,
716 proxy_cls_sphinx_name: str,
717 classmethods: Sequence[str] = (),
718 methods: Sequence[str] = (),
719 attributes: Sequence[str] = (),
720 use_intermediate_variable: Sequence[str] = (),
721) -> Callable[[_T], _T]:
722 """A class decorator indicating attributes should refer to a proxy
723 class.
724
725 This decorator is now a "marker" that does nothing at runtime. Instead,
726 it is consumed by the tools/generate_proxy_methods.py script to
727 statically generate proxy methods and attributes that are fully
728 recognized by typing tools such as mypy.
729
730 """
731
732 def decorate(cls):
733 return cls
734
735 return decorate
736
737
738def getargspec_init(method):
739 """inspect.getargspec with considerations for typical __init__ methods
740
741 Wraps inspect.getargspec with error handling for typical __init__ cases:
742
743 .. sourcecode:: text
744
745 object.__init__ -> (self)
746 other unreflectable (usually C) -> (self, *args, **kwargs)
747
748 """
749 try:
750 return compat.inspect_getfullargspec(method)
751 except TypeError:
752 if method is object.__init__:
753 return (["self"], None, None, None)
754 else:
755 return (["self"], "args", "kwargs", None)
756
757
758def unbound_method_to_callable(func_or_cls):
759 """Adjust the incoming callable such that a 'self' argument is not
760 required.
761
762 """
763
764 if isinstance(func_or_cls, types.MethodType) and not func_or_cls.__self__:
765 return func_or_cls.__func__
766 else:
767 return func_or_cls
768
769
770class GenericRepr:
771 """Encapsulates the logic for creating a generic __repr__() string.
772
773 This class allows for the repr structure to be created, then modified
774 (e.g., changing the class name), before being rendered as a string.
775
776 .. versionadded:: 2.1
777 """
778
779 __slots__ = (
780 "_obj",
781 "_additional_kw",
782 "_to_inspect",
783 "_omit_kwarg",
784 "_class_name",
785 )
786
787 _obj: Any
788 _additional_kw: Sequence[Tuple[str, Any]]
789 _to_inspect: List[object]
790 _omit_kwarg: Sequence[str]
791 _class_name: Optional[str]
792
793 def __init__(
794 self,
795 obj: Any,
796 additional_kw: Sequence[Tuple[str, Any]] = (),
797 to_inspect: Optional[Union[object, List[object]]] = None,
798 omit_kwarg: Sequence[str] = (),
799 ):
800 """Create a GenericRepr object.
801
802 :param obj: The object being repr'd
803 :param additional_kw: Additional keyword arguments to check for in
804 the repr, as a sequence of 2-tuples of (name, default_value)
805 :param to_inspect: One or more objects whose __init__ signature
806 should be inspected. If not provided, defaults to [obj].
807 :param omit_kwarg: Sequence of keyword argument names to omit from
808 the repr output
809 """
810 self._obj = obj
811 self._additional_kw = additional_kw
812 self._to_inspect = (
813 [obj] if to_inspect is None else _collections.to_list(to_inspect)
814 )
815 self._omit_kwarg = omit_kwarg
816 self._class_name = None
817
818 def set_class_name(self, class_name: str) -> GenericRepr:
819 """Set the class name to be used in the repr.
820
821 By default, the class name is taken from obj.__class__.__name__.
822 This method allows it to be overridden.
823
824 :param class_name: The class name to use
825 :return: self, for method chaining
826 """
827 self._class_name = class_name
828 return self
829
830 def __str__(self) -> str:
831 """Produce the __repr__() string based on the configured parameters."""
832 obj = self._obj
833 to_inspect = self._to_inspect
834 additional_kw = self._additional_kw
835 omit_kwarg = self._omit_kwarg
836
837 missing = object()
838
839 pos_args = []
840 kw_args: _collections.OrderedDict[str, Any] = (
841 _collections.OrderedDict()
842 )
843 vargs = None
844 for i, insp in enumerate(to_inspect):
845 try:
846 spec = compat.inspect_getfullargspec(insp.__init__) # type: ignore[misc] # noqa: E501
847 except TypeError:
848 continue
849 else:
850 default_len = len(spec.defaults) if spec.defaults else 0
851 if i == 0:
852 if spec.varargs:
853 vargs = spec.varargs
854 if default_len:
855 pos_args.extend(spec.args[1:-default_len])
856 else:
857 pos_args.extend(spec.args[1:])
858 else:
859 kw_args.update(
860 [(arg, missing) for arg in spec.args[1:-default_len]]
861 )
862
863 if default_len:
864 assert spec.defaults
865 kw_args.update(
866 [
867 (arg, default)
868 for arg, default in zip(
869 spec.args[-default_len:], spec.defaults
870 )
871 ]
872 )
873 output: List[str] = []
874
875 output.extend(repr(getattr(obj, arg, None)) for arg in pos_args)
876
877 if vargs is not None and hasattr(obj, vargs):
878 output.extend([repr(val) for val in getattr(obj, vargs)])
879
880 for arg, defval in kw_args.items():
881 if arg in omit_kwarg:
882 continue
883 try:
884 val = getattr(obj, arg, missing)
885 if val is not missing and val != defval:
886 output.append("%s=%r" % (arg, val))
887 except Exception:
888 pass
889
890 if additional_kw:
891 for arg, defval in additional_kw:
892 try:
893 val = getattr(obj, arg, missing)
894 if val is not missing and val != defval:
895 output.append("%s=%r" % (arg, val))
896 except Exception:
897 pass
898
899 class_name = (
900 self._class_name
901 if self._class_name is not None
902 else obj.__class__.__name__
903 )
904 return "%s(%s)" % (class_name, ", ".join(output))
905
906
907def generic_repr(
908 obj: Any,
909 additional_kw: Sequence[Tuple[str, Any]] = (),
910 to_inspect: Optional[Union[object, List[object]]] = None,
911 omit_kwarg: Sequence[str] = (),
912) -> str:
913 """Produce a __repr__() based on direct association of the __init__()
914 specification vs. same-named attributes present.
915
916 """
917 return str(
918 GenericRepr(
919 obj,
920 additional_kw=additional_kw,
921 to_inspect=to_inspect,
922 omit_kwarg=omit_kwarg,
923 )
924 )
925
926
927def class_hierarchy(cls):
928 """Return an unordered sequence of all classes related to cls.
929
930 Traverses diamond hierarchies.
931
932 Fibs slightly: subclasses of builtin types are not returned. Thus
933 class_hierarchy(class A(object)) returns (A, object), not A plus every
934 class systemwide that derives from object.
935
936 """
937
938 hier = {cls}
939 process = list(cls.__mro__)
940 while process:
941 c = process.pop()
942 bases = (_ for _ in c.__bases__ if _ not in hier)
943
944 for b in bases:
945 process.append(b)
946 hier.add(b)
947
948 if c.__module__ == "builtins" or not hasattr(c, "__subclasses__"):
949 continue
950
951 for s in [
952 _
953 for _ in (
954 c.__subclasses__()
955 if not issubclass(c, type)
956 else c.__subclasses__(c)
957 )
958 if _ not in hier
959 ]:
960 process.append(s)
961 hier.add(s)
962 return list(hier)
963
964
965def iterate_attributes(cls):
966 """iterate all the keys and attributes associated
967 with a class, without using getattr().
968
969 Does not use getattr() so that class-sensitive
970 descriptors (i.e. property.__get__()) are not called.
971
972 """
973 keys = dir(cls)
974 for key in keys:
975 for c in cls.__mro__:
976 if key in c.__dict__:
977 yield (key, c.__dict__[key])
978 break
979
980
981def monkeypatch_proxied_specials(
982 into_cls,
983 from_cls,
984 skip=None,
985 only=None,
986 name="self.proxy",
987 from_instance=None,
988):
989 """Automates delegation of __specials__ for a proxying type."""
990
991 if only:
992 dunders = only
993 else:
994 if skip is None:
995 skip = (
996 "__slots__",
997 "__del__",
998 "__getattribute__",
999 "__metaclass__",
1000 "__getstate__",
1001 "__setstate__",
1002 )
1003 dunders = [
1004 m
1005 for m in dir(from_cls)
1006 if (
1007 m.startswith("__")
1008 and m.endswith("__")
1009 and not hasattr(into_cls, m)
1010 and m not in skip
1011 )
1012 ]
1013
1014 for method in dunders:
1015 try:
1016 maybe_fn = getattr(from_cls, method)
1017 if not hasattr(maybe_fn, "__call__"):
1018 continue
1019 maybe_fn = getattr(maybe_fn, "__func__", maybe_fn)
1020 fn = cast(types.FunctionType, maybe_fn)
1021
1022 except AttributeError:
1023 continue
1024 try:
1025 spec = compat.inspect_getfullargspec(fn)
1026 fn_args = compat.inspect_formatargspec(spec[0])
1027 d_args = compat.inspect_formatargspec(spec[0][1:])
1028 except TypeError:
1029 fn_args = "(self, *args, **kw)"
1030 d_args = "(*args, **kw)"
1031
1032 py = (
1033 "def %(method)s%(fn_args)s: "
1034 "return %(name)s.%(method)s%(d_args)s" % locals()
1035 )
1036
1037 env: Dict[str, types.FunctionType] = (
1038 from_instance is not None and {name: from_instance} or {}
1039 )
1040 exec(py, env)
1041 try:
1042 env[method].__defaults__ = fn.__defaults__
1043 except AttributeError:
1044 pass
1045 setattr(into_cls, method, env[method])
1046
1047
1048def methods_equivalent(meth1, meth2):
1049 """Return True if the two methods are the same implementation."""
1050
1051 return getattr(meth1, "__func__", meth1) is getattr(
1052 meth2, "__func__", meth2
1053 )
1054
1055
1056def as_interface(obj, cls=None, methods=None, required=None):
1057 """Ensure basic interface compliance for an instance or dict of callables.
1058
1059 Checks that ``obj`` implements public methods of ``cls`` or has members
1060 listed in ``methods``. If ``required`` is not supplied, implementing at
1061 least one interface method is sufficient. Methods present on ``obj`` that
1062 are not in the interface are ignored.
1063
1064 If ``obj`` is a dict and ``dict`` does not meet the interface
1065 requirements, the keys of the dictionary are inspected. Keys present in
1066 ``obj`` that are not in the interface will raise TypeErrors.
1067
1068 Raises TypeError if ``obj`` does not meet the interface criteria.
1069
1070 In all passing cases, an object with callable members is returned. In the
1071 simple case, ``obj`` is returned as-is; if dict processing kicks in then
1072 an anonymous class is returned.
1073
1074 obj
1075 A type, instance, or dictionary of callables.
1076 cls
1077 Optional, a type. All public methods of cls are considered the
1078 interface. An ``obj`` instance of cls will always pass, ignoring
1079 ``required``..
1080 methods
1081 Optional, a sequence of method names to consider as the interface.
1082 required
1083 Optional, a sequence of mandatory implementations. If omitted, an
1084 ``obj`` that provides at least one interface method is considered
1085 sufficient. As a convenience, required may be a type, in which case
1086 all public methods of the type are required.
1087
1088 """
1089 if not cls and not methods:
1090 raise TypeError("a class or collection of method names are required")
1091
1092 if isinstance(cls, type) and isinstance(obj, cls):
1093 return obj
1094
1095 interface = set(methods or [m for m in dir(cls) if not m.startswith("_")])
1096 implemented = set(dir(obj))
1097
1098 complies = operator.ge
1099 if isinstance(required, type):
1100 required = interface
1101 elif not required:
1102 required = set()
1103 complies = operator.gt
1104 else:
1105 required = set(required)
1106
1107 if complies(implemented.intersection(interface), required):
1108 return obj
1109
1110 # No dict duck typing here.
1111 if not isinstance(obj, dict):
1112 qualifier = complies is operator.gt and "any of" or "all of"
1113 raise TypeError(
1114 "%r does not implement %s: %s"
1115 % (obj, qualifier, ", ".join(interface))
1116 )
1117
1118 class AnonymousInterface:
1119 """A callable-holding shell."""
1120
1121 if cls:
1122 AnonymousInterface.__name__ = "Anonymous" + cls.__name__
1123 found = set()
1124
1125 for method, impl in dictlike_iteritems(obj):
1126 if method not in interface:
1127 raise TypeError("%r: unknown in this interface" % method)
1128 if not callable(impl):
1129 raise TypeError("%r=%r is not callable" % (method, impl))
1130 setattr(AnonymousInterface, method, staticmethod(impl))
1131 found.add(method)
1132
1133 if complies(found, required):
1134 return AnonymousInterface
1135
1136 raise TypeError(
1137 "dictionary does not contain required keys %s"
1138 % ", ".join(required - found)
1139 )
1140
1141
1142_GFD = TypeVar("_GFD", bound="generic_fn_descriptor[Any]")
1143
1144
1145class generic_fn_descriptor(Generic[_T_co]):
1146 """Descriptor which proxies a function when the attribute is not
1147 present in dict
1148
1149 This superclass is organized in a particular way with "memoized" and
1150 "non-memoized" implementation classes that are hidden from type checkers,
1151 as Mypy seems to not be able to handle seeing multiple kinds of descriptor
1152 classes used for the same attribute.
1153
1154 """
1155
1156 fget: Callable[..., _T_co]
1157 __doc__: Optional[str]
1158 __name__: str
1159
1160 def __init__(self, fget: Callable[..., _T_co], doc: Optional[str] = None):
1161 self.fget = fget
1162 self.__doc__ = doc or fget.__doc__
1163 self.__name__ = fget.__name__
1164
1165 @overload
1166 def __get__(self: _GFD, obj: None, cls: Any) -> _GFD: ...
1167
1168 @overload
1169 def __get__(self, obj: object, cls: Any) -> _T_co: ...
1170
1171 def __get__(self: _GFD, obj: Any, cls: Any) -> Union[_GFD, _T_co]:
1172 raise NotImplementedError()
1173
1174 if TYPE_CHECKING:
1175
1176 def __set__(self, instance: Any, value: Any) -> None: ...
1177
1178 def __delete__(self, instance: Any) -> None: ...
1179
1180 def _reset(self, obj: Any) -> None:
1181 raise NotImplementedError()
1182
1183 @classmethod
1184 def reset(cls, obj: Any, name: str) -> None:
1185 raise NotImplementedError()
1186
1187
1188class _non_memoized_property(generic_fn_descriptor[_T_co]):
1189 """a plain descriptor that proxies a function.
1190
1191 primary rationale is to provide a plain attribute that's
1192 compatible with memoized_property which is also recognized as equivalent
1193 by mypy.
1194
1195 """
1196
1197 if not TYPE_CHECKING:
1198
1199 def __get__(self, obj, cls):
1200 if obj is None:
1201 return self
1202 return self.fget(obj)
1203
1204
1205class _memoized_property(generic_fn_descriptor[_T_co]):
1206 """A read-only @property that is only evaluated once."""
1207
1208 if not TYPE_CHECKING:
1209
1210 def __get__(self, obj, cls):
1211 if obj is None:
1212 return self
1213 obj.__dict__[self.__name__] = result = self.fget(obj)
1214 return result
1215
1216 def _reset(self, obj):
1217 _memoized_property.reset(obj, self.__name__)
1218
1219 @classmethod
1220 def reset(cls, obj, name):
1221 obj.__dict__.pop(name, None)
1222
1223
1224# despite many attempts to get Mypy to recognize an overridden descriptor
1225# where one is memoized and the other isn't, there seems to be no reliable
1226# way other than completely deceiving the type checker into thinking there
1227# is just one single descriptor type everywhere. Otherwise, if a superclass
1228# has non-memoized and subclass has memoized, that requires
1229# "class memoized(non_memoized)". but then if a superclass has memoized and
1230# superclass has non-memoized, the class hierarchy of the descriptors
1231# would need to be reversed; "class non_memoized(memoized)". so there's no
1232# way to achieve this.
1233# additional issues, RO properties:
1234# https://github.com/python/mypy/issues/12440
1235if TYPE_CHECKING:
1236 # allow memoized and non-memoized to be freely mixed by having them
1237 # be the same class
1238 memoized_property = generic_fn_descriptor
1239 non_memoized_property = generic_fn_descriptor
1240
1241 # for read only situations, mypy only sees @property as read only.
1242 # read only is needed when a subtype specializes the return type
1243 # of a property, meaning assignment needs to be disallowed
1244 ro_memoized_property = property
1245 ro_non_memoized_property = property
1246
1247else:
1248 memoized_property = ro_memoized_property = _memoized_property
1249 non_memoized_property = ro_non_memoized_property = _non_memoized_property
1250
1251
1252def memoized_instancemethod(fn: _F) -> _F:
1253 """Decorate a method memoize its return value.
1254
1255 Best applied to no-arg methods: memoization is not sensitive to
1256 argument values, and will always return the same value even when
1257 called with different arguments.
1258
1259 """
1260
1261 def oneshot(self, *args, **kw):
1262 result = fn(self, *args, **kw)
1263
1264 def memo(*a, **kw):
1265 return result
1266
1267 memo.__name__ = fn.__name__
1268 memo.__doc__ = fn.__doc__
1269 self.__dict__[fn.__name__] = memo
1270 return result
1271
1272 return update_wrapper(oneshot, fn) # type: ignore
1273
1274
1275class HasMemoized:
1276 """A mixin class that maintains the names of memoized elements in a
1277 collection for easy cache clearing, generative, etc.
1278
1279 """
1280
1281 if not TYPE_CHECKING:
1282 # support classes that want to have __slots__ with an explicit
1283 # slot for __dict__. not sure if that requires base __slots__ here.
1284 __slots__ = ()
1285
1286 _memoized_keys: FrozenSet[str] = frozenset()
1287
1288 def _reset_memoizations(self) -> None:
1289 for elem in self._memoized_keys:
1290 self.__dict__.pop(elem, None)
1291
1292 def _assert_no_memoizations(self) -> None:
1293 for elem in self._memoized_keys:
1294 assert elem not in self.__dict__
1295
1296 def _set_memoized_attribute(self, key: str, value: Any) -> None:
1297 self.__dict__[key] = value
1298 self._memoized_keys |= {key}
1299
1300 class memoized_attribute(memoized_property[_T]):
1301 """A read-only @property that is only evaluated once.
1302
1303 :meta private:
1304
1305 """
1306
1307 fget: Callable[..., _T]
1308 __doc__: Optional[str]
1309 __name__: str
1310
1311 def __init__(self, fget: Callable[..., _T], doc: Optional[str] = None):
1312 self.fget = fget
1313 self.__doc__ = doc or fget.__doc__
1314 self.__name__ = fget.__name__
1315
1316 @overload
1317 def __get__(self: _MA, obj: None, cls: Any) -> _MA: ...
1318
1319 @overload
1320 def __get__(self, obj: Any, cls: Any) -> _T: ...
1321
1322 def __get__(self, obj, cls):
1323 if obj is None:
1324 return self
1325 obj.__dict__[self.__name__] = result = self.fget(obj)
1326 obj._memoized_keys |= {self.__name__}
1327 return result
1328
1329 @classmethod
1330 def memoized_instancemethod(cls, fn: _F) -> _F:
1331 """Decorate a method memoize its return value.
1332
1333 :meta private:
1334
1335 """
1336
1337 def oneshot(self: Any, *args: Any, **kw: Any) -> Any:
1338 result = fn(self, *args, **kw)
1339
1340 def memo(*a, **kw):
1341 return result
1342
1343 memo.__name__ = fn.__name__
1344 memo.__doc__ = fn.__doc__
1345 self.__dict__[fn.__name__] = memo
1346 self._memoized_keys |= {fn.__name__}
1347 return result
1348
1349 return update_wrapper(oneshot, fn) # type: ignore
1350
1351
1352if TYPE_CHECKING:
1353 HasMemoized_ro_memoized_attribute = property
1354else:
1355 HasMemoized_ro_memoized_attribute = HasMemoized.memoized_attribute
1356
1357
1358class MemoizedSlots:
1359 """Apply memoized items to an object using a __getattr__ scheme.
1360
1361 This allows the functionality of memoized_property and
1362 memoized_instancemethod to be available to a class using __slots__.
1363
1364 The memoized get is not threadsafe under freethreading and the
1365 creator method may in extremely rare cases be called more than once.
1366
1367 """
1368
1369 __slots__ = ()
1370
1371 def _fallback_getattr(self, key):
1372 raise AttributeError(key)
1373
1374 def __getattr__(self, key: str) -> Any:
1375 if key.startswith("_memoized_attr_") or key.startswith(
1376 "_memoized_method_"
1377 ):
1378 raise AttributeError(key)
1379 # to avoid recursion errors when interacting with other __getattr__
1380 # schemes that refer to this one, when testing for memoized method
1381 # look at __class__ only rather than going into __getattr__ again.
1382 elif hasattr(self.__class__, f"_memoized_attr_{key}"):
1383 value = getattr(self, f"_memoized_attr_{key}")()
1384 setattr(self, key, value)
1385 return value
1386 elif hasattr(self.__class__, f"_memoized_method_{key}"):
1387 meth = getattr(self, f"_memoized_method_{key}")
1388
1389 def oneshot(*args, **kw):
1390 result = meth(*args, **kw)
1391
1392 def memo(*a, **kw):
1393 return result
1394
1395 memo.__name__ = meth.__name__
1396 memo.__doc__ = meth.__doc__
1397 setattr(self, key, memo)
1398 return result
1399
1400 oneshot.__doc__ = meth.__doc__
1401 return oneshot
1402 else:
1403 return self._fallback_getattr(key)
1404
1405
1406# from paste.deploy.converters
1407def asbool(obj: Any) -> bool:
1408 if isinstance(obj, str):
1409 obj = obj.strip().lower()
1410 if obj in ["true", "yes", "on", "y", "t", "1"]:
1411 return True
1412 elif obj in ["false", "no", "off", "n", "f", "0"]:
1413 return False
1414 else:
1415 raise ValueError("String is not true/false: %r" % obj)
1416 return bool(obj)
1417
1418
1419def bool_or_str(*text: str) -> Callable[[str], Union[str, bool]]:
1420 """Return a callable that will evaluate a string as
1421 boolean, or one of a set of "alternate" string values.
1422
1423 """
1424
1425 def bool_or_value(obj: str) -> Union[str, bool]:
1426 if obj in text:
1427 return obj
1428 else:
1429 return asbool(obj)
1430
1431 return bool_or_value
1432
1433
1434def asint(value: Any) -> Optional[int]:
1435 """Coerce to integer."""
1436
1437 if value is None:
1438 return value
1439 return int(value)
1440
1441
1442def coerce_kw_type(
1443 kw: Dict[str, Any],
1444 key: str,
1445 type_: Type[Any],
1446 flexi_bool: bool = True,
1447 dest: Optional[Dict[str, Any]] = None,
1448) -> None:
1449 r"""If 'key' is present in dict 'kw', coerce its value to type 'type\_' if
1450 necessary. If 'flexi_bool' is True, the string '0' is considered false
1451 when coercing to boolean.
1452 """
1453
1454 if dest is None:
1455 dest = kw
1456
1457 if (
1458 key in kw
1459 and (not isinstance(type_, type) or not isinstance(kw[key], type_))
1460 and kw[key] is not None
1461 ):
1462 if type_ is bool and flexi_bool:
1463 dest[key] = asbool(kw[key])
1464 else:
1465 dest[key] = type_(kw[key])
1466
1467
1468def constructor_key(obj: Any, cls: Type[Any]) -> Tuple[Any, ...]:
1469 """Produce a tuple structure that is cacheable using the __dict__ of
1470 obj to retrieve values
1471
1472 """
1473 names = get_cls_kwargs(cls)
1474 return (cls,) + tuple(
1475 (k, obj.__dict__[k]) for k in names if k in obj.__dict__
1476 )
1477
1478
1479def constructor_copy(obj: _T, cls: Type[_T], *args: Any, **kw: Any) -> _T:
1480 """Instantiate cls using the __dict__ of obj as constructor arguments.
1481
1482 Uses inspect to match the named arguments of ``cls``.
1483
1484 """
1485
1486 names = get_cls_kwargs(cls)
1487 kw.update(
1488 (k, obj.__dict__[k]) for k in names.difference(kw) if k in obj.__dict__
1489 )
1490 return cls(*args, **kw)
1491
1492
1493def counter() -> Callable[[], int]:
1494 """Return a threadsafe counter function."""
1495
1496 lock = threading.Lock()
1497 counter = itertools.count(1)
1498
1499 # avoid the 2to3 "next" transformation...
1500 def _next():
1501 with lock:
1502 return next(counter)
1503
1504 return _next
1505
1506
1507def duck_type_collection(
1508 specimen: Any, default: Optional[Type[Any]] = None
1509) -> Optional[Type[Any]]:
1510 """Given an instance or class, guess if it is or is acting as one of
1511 the basic collection types: list, set and dict. If the __emulates__
1512 property is present, return that preferentially.
1513 """
1514
1515 if hasattr(specimen, "__emulates__"):
1516 # canonicalize set vs sets.Set to a standard: the builtin set
1517 if specimen.__emulates__ is not None and issubclass(
1518 specimen.__emulates__, set
1519 ):
1520 return set
1521 else:
1522 return specimen.__emulates__ # type: ignore
1523
1524 isa = issubclass if isinstance(specimen, type) else isinstance
1525 if isa(specimen, list):
1526 return list
1527 elif isa(specimen, set):
1528 return set
1529 elif isa(specimen, dict):
1530 return dict
1531
1532 if hasattr(specimen, "append"):
1533 return list
1534 elif hasattr(specimen, "add"):
1535 return set
1536 elif hasattr(specimen, "set"):
1537 return dict
1538 else:
1539 return default
1540
1541
1542def assert_arg_type(
1543 arg: Any, argtype: Union[Tuple[Type[Any], ...], Type[Any]], name: str
1544) -> Any:
1545 if isinstance(arg, argtype):
1546 return arg
1547 else:
1548 if isinstance(argtype, tuple):
1549 raise exc.ArgumentError(
1550 "Argument '%s' is expected to be one of type %s, got '%s'"
1551 % (name, " or ".join("'%s'" % a for a in argtype), type(arg))
1552 )
1553 else:
1554 raise exc.ArgumentError(
1555 "Argument '%s' is expected to be of type '%s', got '%s'"
1556 % (name, argtype, type(arg))
1557 )
1558
1559
1560def dictlike_iteritems(dictlike):
1561 """Return a (key, value) iterator for almost any dict-like object."""
1562
1563 if hasattr(dictlike, "items"):
1564 return list(dictlike.items())
1565
1566 getter = getattr(dictlike, "__getitem__", getattr(dictlike, "get", None))
1567 if getter is None:
1568 raise TypeError("Object '%r' is not dict-like" % dictlike)
1569
1570 if hasattr(dictlike, "iterkeys"):
1571
1572 def iterator():
1573 for key in dictlike.iterkeys():
1574 assert getter is not None
1575 yield key, getter(key)
1576
1577 return iterator()
1578 elif hasattr(dictlike, "keys"):
1579 return iter((key, getter(key)) for key in dictlike.keys())
1580 else:
1581 raise TypeError("Object '%r' is not dict-like" % dictlike)
1582
1583
1584class classproperty(property):
1585 """A decorator that behaves like @property except that operates
1586 on classes rather than instances.
1587
1588 The decorator is currently special when using the declarative
1589 module, but note that the
1590 :class:`~.sqlalchemy.ext.declarative.declared_attr`
1591 decorator should be used for this purpose with declarative.
1592
1593 """
1594
1595 fget: Callable[[Any], Any]
1596
1597 def __init__(self, fget: Callable[[Any], Any], *arg: Any, **kw: Any):
1598 super().__init__(fget, *arg, **kw)
1599 self.__doc__ = fget.__doc__
1600
1601 def __get__(self, obj: Any, cls: Optional[type] = None) -> Any:
1602 return self.fget(cls)
1603
1604
1605class hybridproperty(Generic[_T]):
1606 def __init__(self, func: Callable[..., _T]):
1607 self.func = func
1608 self.clslevel = func
1609
1610 def __get__(self, instance: Any, owner: Any) -> _T:
1611 if instance is None:
1612 clsval = self.clslevel(owner)
1613 return clsval
1614 else:
1615 return self.func(instance)
1616
1617 def classlevel(self, func: Callable[..., Any]) -> hybridproperty[_T]:
1618 self.clslevel = func
1619 return self
1620
1621
1622class rw_hybridproperty(Generic[_T]):
1623 def __init__(self, func: Callable[..., _T]):
1624 self.func = func
1625 self.clslevel = func
1626 self.setfn: Optional[Callable[..., Any]] = None
1627
1628 def __get__(self, instance: Any, owner: Any) -> _T:
1629 if instance is None:
1630 clsval = self.clslevel(owner)
1631 return clsval
1632 else:
1633 return self.func(instance)
1634
1635 def __set__(self, instance: Any, value: Any) -> None:
1636 assert self.setfn is not None
1637 self.setfn(instance, value)
1638
1639 def setter(self, func: Callable[..., Any]) -> rw_hybridproperty[_T]:
1640 self.setfn = func
1641 return self
1642
1643 def classlevel(self, func: Callable[..., Any]) -> rw_hybridproperty[_T]:
1644 self.clslevel = func
1645 return self
1646
1647
1648class hybridmethod(Generic[_T]):
1649 """Decorate a function as cls- or instance- level."""
1650
1651 def __init__(self, func: Callable[..., _T]):
1652 self.func = self.__func__ = func
1653 self.clslevel = func
1654
1655 def __get__(self, instance: Any, owner: Any) -> Callable[..., _T]:
1656 if instance is None:
1657 return self.clslevel.__get__(owner, owner.__class__) # type:ignore
1658 else:
1659 return self.func.__get__(instance, owner) # type:ignore
1660
1661 def classlevel(self, func: Callable[..., Any]) -> hybridmethod[_T]:
1662 self.clslevel = func
1663 return self
1664
1665
1666class symbol(int):
1667 """A constant symbol.
1668
1669 >>> symbol("foo") is symbol("foo")
1670 True
1671 >>> symbol("foo")
1672 <symbol 'foo>
1673
1674 A slight refinement of the MAGICCOOKIE=object() pattern. The primary
1675 advantage of symbol() is its repr(). They are also singletons.
1676
1677 Repeated calls of symbol('name') will all return the same instance.
1678
1679 """
1680
1681 name: str
1682
1683 symbols: Dict[str, symbol] = {}
1684 _lock = threading.Lock()
1685
1686 def __new__(
1687 cls,
1688 name: str,
1689 doc: Optional[str] = None,
1690 canonical: Optional[int] = None,
1691 ) -> symbol:
1692 with cls._lock:
1693 sym = cls.symbols.get(name)
1694 if sym is None:
1695 assert isinstance(name, str)
1696 if canonical is None:
1697 canonical = hash(name)
1698 sym = int.__new__(symbol, canonical)
1699 sym.name = name
1700 if doc:
1701 sym.__doc__ = doc
1702
1703 # NOTE: we should ultimately get rid of this global thing,
1704 # however, currently it is to support pickling. The best
1705 # change would be when we are on py3.11 at a minimum, we
1706 # switch to stdlib enum.IntFlag.
1707 cls.symbols[name] = sym
1708 else:
1709 if canonical and canonical != sym:
1710 raise TypeError(
1711 f"Can't replace canonical symbol for {name!r} "
1712 f"with new int value {canonical}"
1713 )
1714 return sym
1715
1716 def __reduce__(self):
1717 return symbol, (self.name, "x", int(self))
1718
1719 def __str__(self):
1720 return repr(self)
1721
1722 def __repr__(self):
1723 return f"symbol({self.name!r})"
1724
1725
1726class _IntFlagMeta(type):
1727 def __init__(
1728 cls,
1729 classname: str,
1730 bases: Tuple[Type[Any], ...],
1731 dict_: Dict[str, Any],
1732 **kw: Any,
1733 ) -> None:
1734 items: List[symbol]
1735 cls._items = items = []
1736 for k, v in dict_.items():
1737 if re.match(r"^__.*__$", k):
1738 continue
1739 if isinstance(v, int):
1740 sym = symbol(k, canonical=v)
1741 elif not k.startswith("_"):
1742 raise TypeError("Expected integer values for IntFlag")
1743 else:
1744 continue
1745 setattr(cls, k, sym)
1746 items.append(sym)
1747
1748 cls.__members__ = _collections.immutabledict(
1749 {sym.name: sym for sym in items}
1750 )
1751
1752 def __iter__(self) -> Iterator[symbol]:
1753 raise NotImplementedError(
1754 "iter not implemented to ensure compatibility with "
1755 "Python 3.11 IntFlag. Please use __members__. See "
1756 "https://github.com/python/cpython/issues/99304"
1757 )
1758
1759
1760class _FastIntFlag(metaclass=_IntFlagMeta):
1761 """An 'IntFlag' copycat that isn't slow when performing bitwise
1762 operations.
1763
1764 the ``FastIntFlag`` class will return ``enum.IntFlag`` under TYPE_CHECKING
1765 and ``_FastIntFlag`` otherwise.
1766
1767 """
1768
1769
1770if TYPE_CHECKING:
1771 from enum import IntFlag
1772
1773 FastIntFlag = IntFlag
1774else:
1775 FastIntFlag = _FastIntFlag
1776
1777
1778_E = TypeVar("_E", bound=enum.Enum)
1779
1780
1781def parse_user_argument_for_enum(
1782 arg: Any,
1783 choices: Dict[_E, List[Any]],
1784 name: str,
1785 resolve_symbol_names: bool = False,
1786) -> Optional[_E]:
1787 """Given a user parameter, parse the parameter into a chosen value
1788 from a list of choice objects, typically Enum values.
1789
1790 The user argument can be a string name that matches the name of a
1791 symbol, or the symbol object itself, or any number of alternate choices
1792 such as True/False/ None etc.
1793
1794 :param arg: the user argument.
1795 :param choices: dictionary of enum values to lists of possible
1796 entries for each.
1797 :param name: name of the argument. Used in an :class:`.ArgumentError`
1798 that is raised if the parameter doesn't match any available argument.
1799
1800 """
1801 for enum_value, choice in choices.items():
1802 if arg is enum_value:
1803 return enum_value
1804 elif resolve_symbol_names and arg == enum_value.name:
1805 return enum_value
1806 elif arg in choice:
1807 return enum_value
1808
1809 if arg is None:
1810 return None
1811
1812 raise exc.ArgumentError(f"Invalid value for '{name}': {arg!r}")
1813
1814
1815_creation_order = 1
1816
1817
1818def set_creation_order(instance: Any) -> None:
1819 """Assign a '_creation_order' sequence to the given instance.
1820
1821 This allows multiple instances to be sorted in order of creation
1822 (typically within a single thread; the counter is not particularly
1823 threadsafe).
1824
1825 """
1826 global _creation_order
1827 instance._creation_order = _creation_order
1828 _creation_order += 1
1829
1830
1831def warn_exception(func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any:
1832 """executes the given function, catches all exceptions and converts to
1833 a warning.
1834
1835 """
1836 try:
1837 return func(*args, **kwargs)
1838 except Exception:
1839 warn("%s('%s') ignored" % sys.exc_info()[0:2])
1840
1841
1842def ellipses_string(value, len_=25):
1843 try:
1844 if len(value) > len_:
1845 return "%s..." % value[0:len_]
1846 else:
1847 return value
1848 except TypeError:
1849 return value
1850
1851
1852class _hash_limit_string(str):
1853 """A string subclass that can only be hashed on a maximum amount
1854 of unique values.
1855
1856 This is used for warnings so that we can send out parameterized warnings
1857 without the __warningregistry__ of the module, or the non-overridable
1858 "once" registry within warnings.py, overloading memory,
1859
1860
1861 """
1862
1863 _hash: int
1864
1865 def __new__(
1866 cls, value: str, num: int, args: Sequence[Any]
1867 ) -> _hash_limit_string:
1868 interpolated = (value % args) + (
1869 " (this warning may be suppressed after %d occurrences)" % num
1870 )
1871 self = super().__new__(cls, interpolated)
1872 self._hash = hash("%s_%d" % (value, hash(interpolated) % num))
1873 return self
1874
1875 def __hash__(self) -> int:
1876 return self._hash
1877
1878 def __eq__(self, other: Any) -> bool:
1879 return hash(self) == hash(other)
1880
1881
1882def warn(msg: str, code: Optional[str] = None) -> None:
1883 """Issue a warning.
1884
1885 If msg is a string, :class:`.exc.SAWarning` is used as
1886 the category.
1887
1888 """
1889 if code:
1890 _warnings_warn(exc.SAWarning(msg, code=code))
1891 else:
1892 _warnings_warn(msg, exc.SAWarning)
1893
1894
1895def warn_limited(msg: str, args: Sequence[Any]) -> None:
1896 """Issue a warning with a parameterized string, limiting the number
1897 of registrations.
1898
1899 """
1900 if args:
1901 msg = _hash_limit_string(msg, 10, args)
1902 _warnings_warn(msg, exc.SAWarning)
1903
1904
1905_warning_tags: Dict[CodeType, Tuple[str, Type[Warning]]] = {}
1906
1907
1908def tag_method_for_warnings(
1909 message: str, category: Type[Warning]
1910) -> Callable[[_F], _F]:
1911 def go(fn):
1912 _warning_tags[fn.__code__] = (message, category)
1913 return fn
1914
1915 return go
1916
1917
1918_not_sa_pattern = re.compile(r"^(?:sqlalchemy\.(?!testing)|alembic\.)")
1919
1920
1921def _warnings_warn(
1922 message: Union[str, Warning],
1923 category: Optional[Type[Warning]] = None,
1924 stacklevel: int = 2,
1925) -> None:
1926
1927 if category is None and isinstance(message, Warning):
1928 category = type(message)
1929
1930 # adjust the given stacklevel to be outside of SQLAlchemy
1931 try:
1932 frame = sys._getframe(stacklevel)
1933 except ValueError:
1934 # being called from less than 3 (or given) stacklevels, weird,
1935 # but don't crash
1936 stacklevel = 0
1937 except:
1938 # _getframe() doesn't work, weird interpreter issue, weird,
1939 # ok, but don't crash
1940 stacklevel = 0
1941 else:
1942 stacklevel_found = warning_tag_found = False
1943 while frame is not None:
1944 # using __name__ here requires that we have __name__ in the
1945 # __globals__ of the decorated string functions we make also.
1946 # we generate this using {"__name__": fn.__module__}
1947 if not stacklevel_found and not re.match(
1948 _not_sa_pattern, frame.f_globals.get("__name__", "")
1949 ):
1950 # stop incrementing stack level if an out-of-SQLA line
1951 # were found.
1952 stacklevel_found = True
1953
1954 # however, for the warning tag thing, we have to keep
1955 # scanning up the whole traceback
1956
1957 if frame.f_code in _warning_tags:
1958 warning_tag_found = True
1959 (_suffix, _category) = _warning_tags[frame.f_code]
1960 category = category or _category
1961 message = f"{message} ({_suffix})"
1962
1963 frame = frame.f_back # type: ignore[assignment]
1964
1965 if not stacklevel_found:
1966 stacklevel += 1
1967 elif stacklevel_found and warning_tag_found:
1968 break
1969
1970 if category is not None:
1971 warnings.warn(message, category, stacklevel=stacklevel + 1)
1972 else:
1973 warnings.warn(message, stacklevel=stacklevel + 1)
1974
1975
1976def only_once(
1977 fn: Callable[..., _T], retry_on_exception: bool
1978) -> Callable[..., Optional[_T]]:
1979 """Decorate the given function to be a no-op after it is called exactly
1980 once."""
1981
1982 once = [fn]
1983
1984 def go(*arg: Any, **kw: Any) -> Optional[_T]:
1985 # strong reference fn so that it isn't garbage collected,
1986 # which interferes with the event system's expectations
1987 strong_fn = fn # noqa
1988 if once:
1989 once_fn = once.pop()
1990 try:
1991 return once_fn(*arg, **kw)
1992 except:
1993 if retry_on_exception:
1994 once.insert(0, once_fn)
1995 raise
1996
1997 return None
1998
1999 return go
2000
2001
2002_SQLA_RE = re.compile(r"sqlalchemy/([a-z_]+/){0,2}[a-z_]+\.py")
2003_UNITTEST_RE = re.compile(r"unit(?:2|test2?/)")
2004
2005
2006def chop_traceback(
2007 tb: List[str],
2008 exclude_prefix: re.Pattern[str] = _UNITTEST_RE,
2009 exclude_suffix: re.Pattern[str] = _SQLA_RE,
2010) -> List[str]:
2011 """Chop extraneous lines off beginning and end of a traceback.
2012
2013 :param tb:
2014 a list of traceback lines as returned by ``traceback.format_stack()``
2015
2016 :param exclude_prefix:
2017 a regular expression object matching lines to skip at beginning of
2018 ``tb``
2019
2020 :param exclude_suffix:
2021 a regular expression object matching lines to skip at end of ``tb``
2022 """
2023 start = 0
2024 end = len(tb) - 1
2025 while start <= end and exclude_prefix.search(tb[start]):
2026 start += 1
2027 while start <= end and exclude_suffix.search(tb[end]):
2028 end -= 1
2029 return tb[start : end + 1]
2030
2031
2032def attrsetter(attrname):
2033 code = "def set(obj, value): obj.%s = value" % attrname
2034 env = locals().copy()
2035 exec(code, env)
2036 return env["set"]
2037
2038
2039dunders_re = re.compile("^__.+__$")
2040
2041
2042class TypingOnly:
2043 """A mixin class that marks a class as 'typing only', meaning it has
2044 absolutely no methods, attributes, or runtime functionality whatsoever.
2045
2046 """
2047
2048 __slots__ = ()
2049
2050 def __init_subclass__(cls, **kw: Any) -> None:
2051 if TypingOnly in cls.__bases__:
2052 remaining = {
2053 name for name in cls.__dict__ if not dunders_re.match(name)
2054 }
2055 if remaining:
2056 raise AssertionError(
2057 f"Class {cls} directly inherits TypingOnly but has "
2058 f"additional attributes {remaining}."
2059 )
2060 super().__init_subclass__(**kw)
2061
2062
2063class EnsureKWArg:
2064 r"""Apply translation of functions to accept \**kw arguments if they
2065 don't already.
2066
2067 Used to ensure cross-compatibility with third party legacy code, for things
2068 like compiler visit methods that need to accept ``**kw`` arguments,
2069 but may have been copied from old code that didn't accept them.
2070
2071 """
2072
2073 ensure_kwarg: str
2074 """a regular expression that indicates method names for which the method
2075 should accept ``**kw`` arguments.
2076
2077 The class will scan for methods matching the name template and decorate
2078 them if necessary to ensure ``**kw`` parameters are accepted.
2079
2080 """
2081
2082 def __init_subclass__(cls) -> None:
2083 fn_reg = cls.ensure_kwarg
2084 clsdict = cls.__dict__
2085 if fn_reg:
2086 for key in clsdict:
2087 m = re.match(fn_reg, key)
2088 if m:
2089 fn = clsdict[key]
2090 spec = compat.inspect_getfullargspec(fn)
2091 if not spec.varkw:
2092 wrapped = cls._wrap_w_kw(fn)
2093 setattr(cls, key, wrapped)
2094 super().__init_subclass__()
2095
2096 @classmethod
2097 def _wrap_w_kw(cls, fn: Callable[..., Any]) -> Callable[..., Any]:
2098 def wrap(*arg: Any, **kw: Any) -> Any:
2099 return fn(*arg)
2100
2101 return update_wrapper(wrap, fn)
2102
2103
2104def wrap_callable(wrapper, fn):
2105 """Augment functools.update_wrapper() to work with objects with
2106 a ``__call__()`` method.
2107
2108 :param fn:
2109 object with __call__ method
2110
2111 """
2112 if hasattr(fn, "__name__"):
2113 return update_wrapper(wrapper, fn)
2114 else:
2115 _f = wrapper
2116 _f.__name__ = fn.__class__.__name__
2117 if hasattr(fn, "__module__"):
2118 _f.__module__ = fn.__module__
2119
2120 if hasattr(fn.__call__, "__doc__") and fn.__call__.__doc__:
2121 _f.__doc__ = fn.__call__.__doc__
2122 elif fn.__doc__:
2123 _f.__doc__ = fn.__doc__
2124
2125 return _f
2126
2127
2128def quoted_token_parser(value):
2129 """Parse a dotted identifier with accommodation for quoted names.
2130
2131 Includes support for SQL-style double quotes as a literal character.
2132
2133 E.g.::
2134
2135 >>> quoted_token_parser("name")
2136 ["name"]
2137 >>> quoted_token_parser("schema.name")
2138 ["schema", "name"]
2139 >>> quoted_token_parser('"Schema"."Name"')
2140 ['Schema', 'Name']
2141 >>> quoted_token_parser('"Schema"."Name""Foo"')
2142 ['Schema', 'Name""Foo']
2143
2144 """
2145
2146 if '"' not in value:
2147 return value.split(".")
2148
2149 # 0 = outside of quotes
2150 # 1 = inside of quotes
2151 state = 0
2152 result: List[List[str]] = [[]]
2153 idx = 0
2154 lv = len(value)
2155 while idx < lv:
2156 char = value[idx]
2157 if char == '"':
2158 if state == 1 and idx < lv - 1 and value[idx + 1] == '"':
2159 result[-1].append('"')
2160 idx += 1
2161 else:
2162 state ^= 1
2163 elif char == "." and state == 0:
2164 result.append([])
2165 else:
2166 result[-1].append(char)
2167 idx += 1
2168
2169 return ["".join(token) for token in result]
2170
2171
2172def add_parameter_text(params: Any, text: str) -> Callable[[_F], _F]:
2173 params = _collections.to_list(params)
2174
2175 def decorate(fn):
2176 doc = fn.__doc__ is not None and fn.__doc__ or ""
2177 if doc:
2178 doc = inject_param_text(doc, {param: text for param in params})
2179 fn.__doc__ = doc
2180 return fn
2181
2182 return decorate
2183
2184
2185def _dedent_docstring(text: str) -> str:
2186 split_text = text.split("\n", 1)
2187 if len(split_text) == 1:
2188 return text
2189 else:
2190 firstline, remaining = split_text
2191 if not firstline.startswith(" "):
2192 return firstline + "\n" + textwrap.dedent(remaining)
2193 else:
2194 return textwrap.dedent(text)
2195
2196
2197def inject_docstring_text(
2198 given_doctext: Optional[str], injecttext: str, pos: int
2199) -> str:
2200 doctext: str = _dedent_docstring(given_doctext or "")
2201 lines = doctext.split("\n")
2202 if len(lines) == 1:
2203 lines.append("")
2204 injectlines = textwrap.dedent(injecttext).split("\n")
2205 if injectlines[0]:
2206 injectlines.insert(0, "")
2207
2208 blanks = [num for num, line in enumerate(lines) if not line.strip()]
2209 blanks.insert(0, 0)
2210
2211 inject_pos = blanks[min(pos, len(blanks) - 1)]
2212
2213 lines = lines[0:inject_pos] + injectlines + lines[inject_pos:]
2214 return "\n".join(lines)
2215
2216
2217_param_reg = re.compile(r"(\s+):param (.+?):")
2218
2219
2220def inject_param_text(doctext: str, inject_params: Dict[str, str]) -> str:
2221 doclines = collections.deque(doctext.splitlines())
2222 lines = []
2223
2224 # TODO: this is not working for params like ":param case_sensitive=True:"
2225
2226 to_inject = None
2227 while doclines:
2228 line = doclines.popleft()
2229
2230 m = _param_reg.match(line)
2231
2232 if to_inject is None:
2233 if m:
2234 param = m.group(2).lstrip("*")
2235 if param in inject_params:
2236 # default indent to that of :param: plus one
2237 indent = " " * len(m.group(1)) + " "
2238
2239 # but if the next line has text, use that line's
2240 # indentation
2241 if doclines:
2242 m2 = re.match(r"(\s+)\S", doclines[0])
2243 if m2:
2244 indent = " " * len(m2.group(1))
2245
2246 to_inject = indent + inject_params[param]
2247 elif m:
2248 lines.extend(["\n", to_inject, "\n"])
2249 to_inject = None
2250 elif not line.rstrip():
2251 lines.extend([line, to_inject, "\n"])
2252 to_inject = None
2253 elif line.endswith("::"):
2254 # TODO: this still won't cover if the code example itself has
2255 # blank lines in it, need to detect those via indentation.
2256 lines.extend([line, doclines.popleft()])
2257 continue
2258 lines.append(line)
2259
2260 return "\n".join(lines)
2261
2262
2263def repr_tuple_names(names: List[str]) -> Optional[str]:
2264 """Trims a list of strings from the middle and return a string of up to
2265 four elements. Strings greater than 11 characters will be truncated"""
2266 if len(names) == 0:
2267 return None
2268 flag = len(names) <= 4
2269 names = names[0:4] if flag else names[0:3] + names[-1:]
2270 res = ["%s.." % name[:11] if len(name) > 11 else name for name in names]
2271 if flag:
2272 return ", ".join(res)
2273 else:
2274 return "%s, ..., %s" % (", ".join(res[0:3]), res[-1])
2275
2276
2277def has_compiled_ext(raise_=False):
2278 from ._has_cython import HAS_CYEXTENSION
2279
2280 if HAS_CYEXTENSION:
2281 return True
2282 elif raise_:
2283 raise ImportError(
2284 "cython extensions were expected to be installed, "
2285 "but are not present"
2286 )
2287 else:
2288 return False
2289
2290
2291def load_uncompiled_module(module: _M) -> _M:
2292 """Load the non-compied version of a module that is also
2293 compiled with cython.
2294 """
2295 full_name = module.__name__
2296 assert module.__spec__
2297 parent_name = module.__spec__.parent
2298 assert parent_name
2299 parent_module = sys.modules[parent_name]
2300 assert parent_module.__spec__
2301 package_path = parent_module.__spec__.origin
2302 assert package_path and package_path.endswith("__init__.py")
2303
2304 name = full_name.split(".")[-1]
2305 module_path = package_path.replace("__init__.py", f"{name}.py")
2306
2307 py_spec = importlib.util.spec_from_file_location(full_name, module_path)
2308 assert py_spec
2309 py_module = importlib.util.module_from_spec(py_spec)
2310 assert py_spec.loader
2311 py_spec.loader.exec_module(py_module)
2312 return cast(_M, py_module)
2313
2314
2315class _Missing(enum.Enum):
2316 Missing = enum.auto()
2317
2318
2319Missing = _Missing.Missing
2320MissingOr = Union[_T, Literal[_Missing.Missing]]