1# util/langhelpers.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Routines to help with the creation, loading and introspection of
10modules, classes, hierarchies, attributes, functions, and methods.
11
12"""
13
14from __future__ import annotations
15
16import collections
17import enum
18from functools import update_wrapper
19import importlib.util
20import inspect
21import itertools
22import operator
23import re
24import sys
25import textwrap
26import threading
27import types
28from types import CodeType
29from types import ModuleType
30from typing import Any
31from typing import Callable
32from typing import cast
33from typing import Dict
34from typing import FrozenSet
35from typing import Generic
36from typing import Iterator
37from typing import List
38from typing import Literal
39from typing import NoReturn
40from typing import Optional
41from typing import overload
42from typing import Sequence
43from typing import Set
44from typing import Tuple
45from typing import Type
46from typing import TYPE_CHECKING
47from typing import TypeVar
48from typing import Union
49import warnings
50
51from . import _collections
52from . import compat
53from .. import exc
54
55_T = TypeVar("_T")
56_T_co = TypeVar("_T_co", covariant=True)
57_F = TypeVar("_F", bound=Callable[..., Any])
58_MA = TypeVar("_MA", bound="HasMemoized.memoized_attribute[Any]")
59_M = TypeVar("_M", bound=ModuleType)
60
61
62def restore_annotations(
63 cls: type, new_annotations: dict[str, Any]
64) -> Callable[[], None]:
65 """apply alternate annotations to a class, with a callable to restore
66 the pristine state of the former.
67 This is used strictly to provide dataclasses on a mapped class, where
68 in some cases where are making dataclass fields based on an attribute
69 that is actually a python descriptor on a superclass which we called
70 to get a value.
71 if dataclasses were to give us a way to achieve this without swapping
72 __annotations__, that would be much better.
73 """
74 delattr_ = object()
75
76 # pep-649 means classes have "__annotate__", and it's a callable. if it's
77 # there and is None, we're in "legacy future mode", where it's python 3.14
78 # or higher and "from __future__ import annotations" is set. in "legacy
79 # future mode" we have to do the same steps we do for older pythons,
80 # __annotate__ can be ignored
81 is_pep649 = hasattr(cls, "__annotate__") and cls.__annotate__ is not None
82
83 if is_pep649:
84 memoized = {
85 "__annotate__": getattr(cls, "__annotate__", delattr_),
86 }
87 else:
88 memoized = {
89 "__annotations__": getattr(cls, "__annotations__", delattr_)
90 }
91
92 cls.__annotations__ = new_annotations
93
94 def restore():
95 for k, v in memoized.items():
96 if v is delattr_:
97 delattr(cls, k)
98 else:
99 setattr(cls, k, v)
100
101 return restore
102
103
104def md5_hex(x: Any) -> str:
105 x = x.encode("utf-8")
106 m = compat.md5_not_for_security()
107 m.update(x)
108 return cast(str, m.hexdigest())
109
110
111class safe_reraise:
112 """Reraise an exception after invoking some
113 handler code.
114
115 Stores the existing exception info before
116 invoking so that it is maintained across a potential
117 coroutine context switch.
118
119 e.g.::
120
121 try:
122 sess.commit()
123 except:
124 with safe_reraise():
125 sess.rollback()
126
127 TODO: we should at some point evaluate current behaviors in this regard
128 based on current greenlet, gevent/eventlet implementations in Python 3, and
129 also see the degree to which our own asyncio (based on greenlet also) is
130 impacted by this. .rollback() will cause IO / context switch to occur in
131 all these scenarios; what happens to the exception context from an
132 "except:" block if we don't explicitly store it? Original issue was #2703.
133
134 """
135
136 __slots__ = ("_exc_info",)
137
138 _exc_info: Union[
139 None,
140 Tuple[
141 Type[BaseException],
142 BaseException,
143 types.TracebackType,
144 ],
145 Tuple[None, None, None],
146 ]
147
148 def __enter__(self) -> None:
149 self._exc_info = sys.exc_info()
150
151 def __exit__(
152 self,
153 type_: Optional[Type[BaseException]],
154 value: Optional[BaseException],
155 traceback: Optional[types.TracebackType],
156 ) -> NoReturn:
157 assert self._exc_info is not None
158 # see #2703 for notes
159 if type_ is None:
160 exc_type, exc_value, exc_tb = self._exc_info
161 assert exc_value is not None
162 self._exc_info = None # remove potential circular references
163 raise exc_value.with_traceback(exc_tb)
164 else:
165 self._exc_info = None # remove potential circular references
166 assert value is not None
167 raise value.with_traceback(traceback)
168
169
170def walk_subclasses(cls: Type[_T]) -> Iterator[Type[_T]]:
171 seen: Set[Any] = set()
172
173 stack = [cls]
174 while stack:
175 cls = stack.pop()
176 if cls in seen:
177 continue
178 else:
179 seen.add(cls)
180 stack.extend(cls.__subclasses__())
181 yield cls
182
183
184def string_or_unprintable(element: Any) -> str:
185 if isinstance(element, str):
186 return element
187 else:
188 try:
189 return str(element)
190 except Exception:
191 return "unprintable element %r" % element
192
193
194def clsname_as_plain_name(
195 cls: Type[Any], use_name: Optional[str] = None
196) -> str:
197 name = use_name or cls.__name__
198 return " ".join(n.lower() for n in re.findall(r"([A-Z][a-z]+|SQL)", name))
199
200
201def method_is_overridden(
202 instance_or_cls: Union[Type[Any], object],
203 against_method: Callable[..., Any],
204) -> bool:
205 """Return True if the two class methods don't match."""
206
207 if not isinstance(instance_or_cls, type):
208 current_cls = instance_or_cls.__class__
209 else:
210 current_cls = instance_or_cls
211
212 method_name = against_method.__name__
213
214 current_method: types.MethodType = getattr(current_cls, method_name)
215
216 return current_method != against_method
217
218
219def decode_slice(slc: slice) -> Tuple[Any, ...]:
220 """decode a slice object as sent to __getitem__.
221
222 takes into account the 2.5 __index__() method, basically.
223
224 """
225 ret: List[Any] = []
226 for x in slc.start, slc.stop, slc.step:
227 if hasattr(x, "__index__"):
228 x = x.__index__()
229 ret.append(x)
230 return tuple(ret)
231
232
233def _unique_symbols(used: Sequence[str], *bases: str) -> Iterator[str]:
234 used_set = set(used)
235 for base in bases:
236 pool = itertools.chain(
237 (base,),
238 map(lambda i: base + str(i), range(1000)),
239 )
240 for sym in pool:
241 if sym not in used_set:
242 used_set.add(sym)
243 yield sym
244 break
245 else:
246 raise NameError("exhausted namespace for symbol base %s" % base)
247
248
249def map_bits(fn: Callable[[int], Any], n: int) -> Iterator[Any]:
250 """Call the given function given each nonzero bit from n."""
251
252 while n:
253 b = n & (~n + 1)
254 yield fn(b)
255 n ^= b
256
257
258_Fn = TypeVar("_Fn", bound="Callable[..., Any]")
259
260# this seems to be in flux in recent mypy versions
261
262
263def decorator(target: Callable[..., Any]) -> Callable[[_Fn], _Fn]:
264 """A signature-matching decorator factory."""
265
266 def decorate(fn: _Fn) -> _Fn:
267 if not inspect.isfunction(fn) and not inspect.ismethod(fn):
268 raise Exception("not a decoratable function")
269
270 # Python 3.14 defer creating __annotations__ until its used.
271 # We do not want to create __annotations__ now.
272 annofunc = getattr(fn, "__annotate__", None)
273 if annofunc is not None:
274 fn.__annotate__ = None # type: ignore[union-attr]
275 try:
276 spec = compat.inspect_getfullargspec(fn)
277 finally:
278 fn.__annotate__ = annofunc # type: ignore[union-attr]
279 else:
280 spec = compat.inspect_getfullargspec(fn)
281
282 # Do not generate code for annotations.
283 # update_wrapper() copies the annotation from fn to decorated.
284 # We use dummy defaults for code generation to avoid having
285 # copy of large globals for compiling.
286 # We copy __defaults__ and __kwdefaults__ from fn to decorated.
287 empty_defaults = (None,) * len(spec.defaults or ())
288 empty_kwdefaults = dict.fromkeys(spec.kwonlydefaults or ())
289 spec = spec._replace(
290 annotations={},
291 defaults=empty_defaults,
292 kwonlydefaults=empty_kwdefaults,
293 )
294
295 names = (
296 tuple(cast("Tuple[str, ...]", spec[0]))
297 + cast("Tuple[str, ...]", spec[1:3])
298 + (fn.__name__,)
299 )
300 targ_name, fn_name = _unique_symbols(names, "target", "fn")
301
302 metadata: Dict[str, Optional[str]] = dict(target=targ_name, fn=fn_name)
303 metadata.update(format_argspec_plus(spec, grouped=False))
304 metadata["name"] = fn.__name__
305
306 if inspect.iscoroutinefunction(fn):
307 metadata["prefix"] = "async "
308 metadata["target_prefix"] = "await "
309 else:
310 metadata["prefix"] = ""
311 metadata["target_prefix"] = ""
312
313 # look for __ positional arguments. This is a convention in
314 # SQLAlchemy that arguments should be passed positionally
315 # rather than as keyword
316 # arguments. note that apply_pos doesn't currently work in all cases
317 # such as when a kw-only indicator "*" is present, which is why
318 # we limit the use of this to just that case we can detect. As we add
319 # more kinds of methods that use @decorator, things may have to
320 # be further improved in this area
321 if "__" in repr(spec[0]):
322 code = """\
323%(prefix)sdef %(name)s%(grouped_args)s:
324 return %(target_prefix)s%(target)s(%(fn)s, %(apply_pos)s)
325""" % metadata
326 else:
327 code = """\
328%(prefix)sdef %(name)s%(grouped_args)s:
329 return %(target_prefix)s%(target)s(%(fn)s, %(apply_kw)s)
330""" % metadata
331
332 env: Dict[str, Any] = {
333 targ_name: target,
334 fn_name: fn,
335 "__name__": fn.__module__,
336 }
337
338 decorated = cast(
339 types.FunctionType,
340 _exec_code_in_env(code, env, fn.__name__),
341 )
342 decorated.__defaults__ = fn.__defaults__
343 decorated.__kwdefaults__ = fn.__kwdefaults__ # type: ignore
344 return update_wrapper(decorated, fn) # type: ignore[return-value]
345
346 return update_wrapper(decorate, target) # type: ignore[return-value]
347
348
349def _exec_code_in_env(
350 code: Union[str, types.CodeType], env: Dict[str, Any], fn_name: str
351) -> Callable[..., Any]:
352 exec(code, env)
353 return env[fn_name] # type: ignore[no-any-return]
354
355
356_PF = TypeVar("_PF")
357_TE = TypeVar("_TE")
358
359
360class PluginLoader:
361 def __init__(
362 self, group: str, auto_fn: Optional[Callable[..., Any]] = None
363 ):
364 self.group = group
365 self.impls: Dict[str, Any] = {}
366 self.auto_fn = auto_fn
367
368 def clear(self):
369 self.impls.clear()
370
371 def load(self, name: str) -> Any:
372 if name in self.impls:
373 return self.impls[name]()
374
375 if self.auto_fn:
376 loader = self.auto_fn(name)
377 if loader:
378 self.impls[name] = loader
379 return loader()
380
381 for impl in compat.importlib_metadata_get(self.group):
382 if impl.name == name:
383 self.impls[name] = impl.load
384 return impl.load()
385
386 raise exc.NoSuchModuleError(
387 "Can't load plugin: %s:%s" % (self.group, name)
388 )
389
390 def register(self, name: str, modulepath: str, objname: str) -> None:
391 def load():
392 mod = __import__(modulepath)
393 for token in modulepath.split(".")[1:]:
394 mod = getattr(mod, token)
395 return getattr(mod, objname)
396
397 self.impls[name] = load
398
399 def deregister(self, name: str) -> None:
400 del self.impls[name]
401
402
403def _inspect_func_args(fn):
404 try:
405 co_varkeywords = inspect.CO_VARKEYWORDS
406 except AttributeError:
407 # https://docs.python.org/3/library/inspect.html
408 # The flags are specific to CPython, and may not be defined in other
409 # Python implementations. Furthermore, the flags are an implementation
410 # detail, and can be removed or deprecated in future Python releases.
411 spec = compat.inspect_getfullargspec(fn)
412 return spec[0], bool(spec[2])
413 else:
414 # use fn.__code__ plus flags to reduce method call overhead
415 co = fn.__code__
416 nargs = co.co_argcount
417 return (
418 list(co.co_varnames[:nargs]),
419 bool(co.co_flags & co_varkeywords),
420 )
421
422
423@overload
424def get_cls_kwargs(
425 cls: type,
426 *,
427 _set: Optional[Set[str]] = None,
428 raiseerr: Literal[True] = ...,
429) -> Set[str]: ...
430
431
432@overload
433def get_cls_kwargs(
434 cls: type, *, _set: Optional[Set[str]] = None, raiseerr: bool = False
435) -> Optional[Set[str]]: ...
436
437
438def get_cls_kwargs(
439 cls: type, *, _set: Optional[Set[str]] = None, raiseerr: bool = False
440) -> Optional[Set[str]]:
441 r"""Return the full set of inherited kwargs for the given `cls`.
442
443 Probes a class's __init__ method, collecting all named arguments. If the
444 __init__ defines a \**kwargs catch-all, then the constructor is presumed
445 to pass along unrecognized keywords to its base classes, and the
446 collection process is repeated recursively on each of the bases.
447
448 Uses a subset of inspect.getfullargspec() to cut down on method overhead,
449 as this is used within the Core typing system to create copies of type
450 objects which is a performance-sensitive operation.
451
452 No anonymous tuple arguments please !
453
454 """
455 toplevel = _set is None
456 if toplevel:
457 _set = set()
458 assert _set is not None
459
460 ctr = cls.__dict__.get("__init__", False)
461
462 has_init = (
463 ctr
464 and isinstance(ctr, types.FunctionType)
465 and isinstance(ctr.__code__, types.CodeType)
466 )
467
468 if has_init:
469 names, has_kw = _inspect_func_args(ctr)
470 _set.update(names)
471
472 if not has_kw and not toplevel:
473 if raiseerr:
474 raise TypeError(
475 f"given cls {cls} doesn't have an __init__ method"
476 )
477 else:
478 return None
479 else:
480 has_kw = False
481
482 if not has_init or has_kw:
483 for c in cls.__bases__:
484 if get_cls_kwargs(c, _set=_set) is None:
485 break
486
487 _set.discard("self")
488 return _set
489
490
491def get_func_kwargs(func: Callable[..., Any]) -> List[str]:
492 """Return the set of legal kwargs for the given `func`.
493
494 Uses getargspec so is safe to call for methods, functions,
495 etc.
496
497 """
498
499 return compat.inspect_getfullargspec(func)[0]
500
501
502def get_callable_argspec(
503 fn: Callable[..., Any], no_self: bool = False, _is_init: bool = False
504) -> compat.FullArgSpec:
505 """Return the argument signature for any callable.
506
507 All pure-Python callables are accepted, including
508 functions, methods, classes, objects with __call__;
509 builtins and other edge cases like functools.partial() objects
510 raise a TypeError.
511
512 """
513 if inspect.isbuiltin(fn):
514 raise TypeError("Can't inspect builtin: %s" % fn)
515 elif inspect.isfunction(fn):
516 if _is_init and no_self:
517 spec = compat.inspect_getfullargspec(fn)
518 return compat.FullArgSpec(
519 spec.args[1:],
520 spec.varargs,
521 spec.varkw,
522 spec.defaults,
523 spec.kwonlyargs,
524 spec.kwonlydefaults,
525 spec.annotations,
526 )
527 else:
528 return compat.inspect_getfullargspec(fn)
529 elif inspect.ismethod(fn):
530 if no_self and (_is_init or fn.__self__):
531 spec = compat.inspect_getfullargspec(fn.__func__)
532 return compat.FullArgSpec(
533 spec.args[1:],
534 spec.varargs,
535 spec.varkw,
536 spec.defaults,
537 spec.kwonlyargs,
538 spec.kwonlydefaults,
539 spec.annotations,
540 )
541 else:
542 return compat.inspect_getfullargspec(fn.__func__)
543 elif inspect.isclass(fn):
544 return get_callable_argspec(
545 fn.__init__, no_self=no_self, _is_init=True
546 )
547 elif hasattr(fn, "__func__"):
548 return compat.inspect_getfullargspec(fn.__func__)
549 elif hasattr(fn, "__call__"):
550 if inspect.ismethod(fn.__call__):
551 return get_callable_argspec(fn.__call__, no_self=no_self)
552 else:
553 raise TypeError("Can't inspect callable: %s" % fn)
554 else:
555 raise TypeError("Can't inspect callable: %s" % fn)
556
557
558def format_argspec_plus(
559 fn: Union[Callable[..., Any], compat.FullArgSpec], grouped: bool = True
560) -> Dict[str, Optional[str]]:
561 """Returns a dictionary of formatted, introspected function arguments.
562
563 A enhanced variant of inspect.formatargspec to support code generation.
564
565 fn
566 An inspectable callable or tuple of inspect getargspec() results.
567 grouped
568 Defaults to True; include (parens, around, argument) lists
569
570 Returns:
571
572 args
573 Full inspect.formatargspec for fn
574 self_arg
575 The name of the first positional argument, varargs[0], or None
576 if the function defines no positional arguments.
577 apply_pos
578 args, re-written in calling rather than receiving syntax. Arguments are
579 passed positionally.
580 apply_kw
581 Like apply_pos, except keyword-ish args are passed as keywords.
582 apply_pos_proxied
583 Like apply_pos but omits the self/cls argument
584
585 Example::
586
587 >>> format_argspec_plus(lambda self, a, b, c=3, **d: 123)
588 {'grouped_args': '(self, a, b, c=3, **d)',
589 'self_arg': 'self',
590 'apply_kw': '(self, a, b, c=c, **d)',
591 'apply_pos': '(self, a, b, c, **d)'}
592
593 """
594 if callable(fn):
595 spec = compat.inspect_getfullargspec(fn)
596 else:
597 spec = fn
598
599 args = compat.inspect_formatargspec(*spec)
600
601 apply_pos = compat.inspect_formatargspec(
602 spec[0], spec[1], spec[2], None, spec[4]
603 )
604
605 if spec[0]:
606 self_arg = spec[0][0]
607
608 apply_pos_proxied = compat.inspect_formatargspec(
609 spec[0][1:], spec[1], spec[2], None, spec[4]
610 )
611
612 elif spec[1]:
613 # I'm not sure what this is
614 self_arg = "%s[0]" % spec[1]
615
616 apply_pos_proxied = apply_pos
617 else:
618 self_arg = None
619 apply_pos_proxied = apply_pos
620
621 num_defaults = 0
622 if spec[3]:
623 num_defaults += len(cast(Tuple[Any], spec[3]))
624 if spec[4]:
625 num_defaults += len(spec[4])
626
627 name_args = spec[0] + spec[4]
628
629 defaulted_vals: Union[List[str], Tuple[()]]
630
631 if num_defaults:
632 defaulted_vals = name_args[0 - num_defaults :]
633 else:
634 defaulted_vals = ()
635
636 apply_kw = compat.inspect_formatargspec(
637 name_args,
638 spec[1],
639 spec[2],
640 defaulted_vals,
641 formatvalue=lambda x: "=" + str(x),
642 )
643
644 if spec[0]:
645 apply_kw_proxied = compat.inspect_formatargspec(
646 name_args[1:],
647 spec[1],
648 spec[2],
649 defaulted_vals,
650 formatvalue=lambda x: "=" + str(x),
651 )
652 else:
653 apply_kw_proxied = apply_kw
654
655 if grouped:
656 return dict(
657 grouped_args=args,
658 self_arg=self_arg,
659 apply_pos=apply_pos,
660 apply_kw=apply_kw,
661 apply_pos_proxied=apply_pos_proxied,
662 apply_kw_proxied=apply_kw_proxied,
663 )
664 else:
665 return dict(
666 grouped_args=args,
667 self_arg=self_arg,
668 apply_pos=apply_pos[1:-1],
669 apply_kw=apply_kw[1:-1],
670 apply_pos_proxied=apply_pos_proxied[1:-1],
671 apply_kw_proxied=apply_kw_proxied[1:-1],
672 )
673
674
675def format_argspec_init(method, grouped=True):
676 """format_argspec_plus with considerations for typical __init__ methods
677
678 Wraps format_argspec_plus with error handling strategies for typical
679 __init__ cases:
680
681 .. sourcecode:: text
682
683 object.__init__ -> (self)
684 other unreflectable (usually C) -> (self, *args, **kwargs)
685
686 """
687 if method is object.__init__:
688 grouped_args = "(self)"
689 args = "(self)" if grouped else "self"
690 proxied = "()" if grouped else ""
691 else:
692 try:
693 return format_argspec_plus(method, grouped=grouped)
694 except TypeError:
695 grouped_args = "(self, *args, **kwargs)"
696 args = grouped_args if grouped else "self, *args, **kwargs"
697 proxied = "(*args, **kwargs)" if grouped else "*args, **kwargs"
698 return dict(
699 self_arg="self",
700 grouped_args=grouped_args,
701 apply_pos=args,
702 apply_kw=args,
703 apply_pos_proxied=proxied,
704 apply_kw_proxied=proxied,
705 )
706
707
708def create_proxy_methods(
709 target_cls: Type[Any],
710 target_cls_sphinx_name: str,
711 proxy_cls_sphinx_name: str,
712 classmethods: Sequence[str] = (),
713 methods: Sequence[str] = (),
714 attributes: Sequence[str] = (),
715 use_intermediate_variable: Sequence[str] = (),
716) -> Callable[[_T], _T]:
717 """A class decorator indicating attributes should refer to a proxy
718 class.
719
720 This decorator is now a "marker" that does nothing at runtime. Instead,
721 it is consumed by the tools/generate_proxy_methods.py script to
722 statically generate proxy methods and attributes that are fully
723 recognized by typing tools such as mypy.
724
725 """
726
727 def decorate(cls):
728 return cls
729
730 return decorate
731
732
733def getargspec_init(method):
734 """inspect.getargspec with considerations for typical __init__ methods
735
736 Wraps inspect.getargspec with error handling for typical __init__ cases:
737
738 .. sourcecode:: text
739
740 object.__init__ -> (self)
741 other unreflectable (usually C) -> (self, *args, **kwargs)
742
743 """
744 try:
745 return compat.inspect_getfullargspec(method)
746 except TypeError:
747 if method is object.__init__:
748 return (["self"], None, None, None)
749 else:
750 return (["self"], "args", "kwargs", None)
751
752
753def unbound_method_to_callable(func_or_cls):
754 """Adjust the incoming callable such that a 'self' argument is not
755 required.
756
757 """
758
759 if isinstance(func_or_cls, types.MethodType) and not func_or_cls.__self__:
760 return func_or_cls.__func__
761 else:
762 return func_or_cls
763
764
765class GenericRepr:
766 """Encapsulates the logic for creating a generic __repr__() string.
767
768 This class allows for the repr structure to be created, then modified
769 (e.g., changing the class name), before being rendered as a string.
770
771 .. versionadded:: 2.1
772 """
773
774 __slots__ = (
775 "_obj",
776 "_additional_kw",
777 "_to_inspect",
778 "_omit_kwarg",
779 "_class_name",
780 )
781
782 _obj: Any
783 _additional_kw: Sequence[Tuple[str, Any]]
784 _to_inspect: List[object]
785 _omit_kwarg: Sequence[str]
786 _class_name: Optional[str]
787
788 def __init__(
789 self,
790 obj: Any,
791 additional_kw: Sequence[Tuple[str, Any]] = (),
792 to_inspect: Optional[Union[object, List[object]]] = None,
793 omit_kwarg: Sequence[str] = (),
794 ):
795 """Create a GenericRepr object.
796
797 :param obj: The object being repr'd
798 :param additional_kw: Additional keyword arguments to check for in
799 the repr, as a sequence of 2-tuples of (name, default_value)
800 :param to_inspect: One or more objects whose __init__ signature
801 should be inspected. If not provided, defaults to [obj].
802 :param omit_kwarg: Sequence of keyword argument names to omit from
803 the repr output
804 """
805 self._obj = obj
806 self._additional_kw = additional_kw
807 self._to_inspect = (
808 [obj] if to_inspect is None else _collections.to_list(to_inspect)
809 )
810 self._omit_kwarg = omit_kwarg
811 self._class_name = None
812
813 def set_class_name(self, class_name: str) -> GenericRepr:
814 """Set the class name to be used in the repr.
815
816 By default, the class name is taken from obj.__class__.__name__.
817 This method allows it to be overridden.
818
819 :param class_name: The class name to use
820 :return: self, for method chaining
821 """
822 self._class_name = class_name
823 return self
824
825 def __str__(self) -> str:
826 """Produce the __repr__() string based on the configured parameters."""
827 obj = self._obj
828 to_inspect = self._to_inspect
829 additional_kw = self._additional_kw
830 omit_kwarg = self._omit_kwarg
831
832 missing = object()
833
834 pos_args = []
835 kw_args: _collections.OrderedDict[str, Any] = (
836 _collections.OrderedDict()
837 )
838 vargs = None
839 for i, insp in enumerate(to_inspect):
840 try:
841 spec = compat.inspect_getfullargspec(insp.__init__) # type: ignore[misc] # noqa: E501
842 except TypeError:
843 continue
844 else:
845 default_len = len(spec.defaults) if spec.defaults else 0
846 if i == 0:
847 if spec.varargs:
848 vargs = spec.varargs
849 if default_len:
850 pos_args.extend(spec.args[1:-default_len])
851 else:
852 pos_args.extend(spec.args[1:])
853 else:
854 kw_args.update(
855 [(arg, missing) for arg in spec.args[1:-default_len]]
856 )
857
858 if default_len:
859 assert spec.defaults
860 kw_args.update(
861 [
862 (arg, default)
863 for arg, default in zip(
864 spec.args[-default_len:], spec.defaults
865 )
866 ]
867 )
868 output: List[str] = []
869
870 output.extend(repr(getattr(obj, arg, None)) for arg in pos_args)
871
872 if vargs is not None and hasattr(obj, vargs):
873 output.extend([repr(val) for val in getattr(obj, vargs)])
874
875 for arg, defval in kw_args.items():
876 if arg in omit_kwarg:
877 continue
878 try:
879 val = getattr(obj, arg, missing)
880 if val is not missing and val != defval:
881 output.append("%s=%r" % (arg, val))
882 except Exception:
883 pass
884
885 if additional_kw:
886 for arg, defval in additional_kw:
887 try:
888 val = getattr(obj, arg, missing)
889 if val is not missing and val != defval:
890 output.append("%s=%r" % (arg, val))
891 except Exception:
892 pass
893
894 class_name = (
895 self._class_name
896 if self._class_name is not None
897 else obj.__class__.__name__
898 )
899 return "%s(%s)" % (class_name, ", ".join(output))
900
901
902def generic_repr(
903 obj: Any,
904 additional_kw: Sequence[Tuple[str, Any]] = (),
905 to_inspect: Optional[Union[object, List[object]]] = None,
906 omit_kwarg: Sequence[str] = (),
907) -> str:
908 """Produce a __repr__() based on direct association of the __init__()
909 specification vs. same-named attributes present.
910
911 """
912 return str(
913 GenericRepr(
914 obj,
915 additional_kw=additional_kw,
916 to_inspect=to_inspect,
917 omit_kwarg=omit_kwarg,
918 )
919 )
920
921
922def class_hierarchy(cls):
923 """Return an unordered sequence of all classes related to cls.
924
925 Traverses diamond hierarchies.
926
927 Fibs slightly: subclasses of builtin types are not returned. Thus
928 class_hierarchy(class A(object)) returns (A, object), not A plus every
929 class systemwide that derives from object.
930
931 """
932
933 hier = {cls}
934 process = list(cls.__mro__)
935 while process:
936 c = process.pop()
937 bases = (_ for _ in c.__bases__ if _ not in hier)
938
939 for b in bases:
940 process.append(b)
941 hier.add(b)
942
943 if c.__module__ == "builtins" or not hasattr(c, "__subclasses__"):
944 continue
945
946 for s in [
947 _
948 for _ in (
949 c.__subclasses__()
950 if not issubclass(c, type)
951 else c.__subclasses__(c)
952 )
953 if _ not in hier
954 ]:
955 process.append(s)
956 hier.add(s)
957 return list(hier)
958
959
960def iterate_attributes(cls):
961 """iterate all the keys and attributes associated
962 with a class, without using getattr().
963
964 Does not use getattr() so that class-sensitive
965 descriptors (i.e. property.__get__()) are not called.
966
967 """
968 keys = dir(cls)
969 for key in keys:
970 for c in cls.__mro__:
971 if key in c.__dict__:
972 yield (key, c.__dict__[key])
973 break
974
975
976def monkeypatch_proxied_specials(
977 into_cls,
978 from_cls,
979 skip=None,
980 only=None,
981 name="self.proxy",
982 from_instance=None,
983):
984 """Automates delegation of __specials__ for a proxying type."""
985
986 if only:
987 dunders = only
988 else:
989 if skip is None:
990 skip = (
991 "__slots__",
992 "__del__",
993 "__getattribute__",
994 "__metaclass__",
995 "__getstate__",
996 "__setstate__",
997 )
998 dunders = [
999 m
1000 for m in dir(from_cls)
1001 if (
1002 m.startswith("__")
1003 and m.endswith("__")
1004 and not hasattr(into_cls, m)
1005 and m not in skip
1006 )
1007 ]
1008
1009 for method in dunders:
1010 try:
1011 maybe_fn = getattr(from_cls, method)
1012 if not hasattr(maybe_fn, "__call__"):
1013 continue
1014 maybe_fn = getattr(maybe_fn, "__func__", maybe_fn)
1015 fn = cast(types.FunctionType, maybe_fn)
1016
1017 except AttributeError:
1018 continue
1019 try:
1020 spec = compat.inspect_getfullargspec(fn)
1021 fn_args = compat.inspect_formatargspec(spec[0])
1022 d_args = compat.inspect_formatargspec(spec[0][1:])
1023 except TypeError:
1024 fn_args = "(self, *args, **kw)"
1025 d_args = "(*args, **kw)"
1026
1027 py = (
1028 "def %(method)s%(fn_args)s: "
1029 "return %(name)s.%(method)s%(d_args)s" % locals()
1030 )
1031
1032 env: Dict[str, types.FunctionType] = (
1033 from_instance is not None and {name: from_instance} or {}
1034 )
1035 exec(py, env)
1036 try:
1037 env[method].__defaults__ = fn.__defaults__
1038 except AttributeError:
1039 pass
1040 setattr(into_cls, method, env[method])
1041
1042
1043def methods_equivalent(meth1, meth2):
1044 """Return True if the two methods are the same implementation."""
1045
1046 return getattr(meth1, "__func__", meth1) is getattr(
1047 meth2, "__func__", meth2
1048 )
1049
1050
1051def as_interface(obj, cls=None, methods=None, required=None):
1052 """Ensure basic interface compliance for an instance or dict of callables.
1053
1054 Checks that ``obj`` implements public methods of ``cls`` or has members
1055 listed in ``methods``. If ``required`` is not supplied, implementing at
1056 least one interface method is sufficient. Methods present on ``obj`` that
1057 are not in the interface are ignored.
1058
1059 If ``obj`` is a dict and ``dict`` does not meet the interface
1060 requirements, the keys of the dictionary are inspected. Keys present in
1061 ``obj`` that are not in the interface will raise TypeErrors.
1062
1063 Raises TypeError if ``obj`` does not meet the interface criteria.
1064
1065 In all passing cases, an object with callable members is returned. In the
1066 simple case, ``obj`` is returned as-is; if dict processing kicks in then
1067 an anonymous class is returned.
1068
1069 obj
1070 A type, instance, or dictionary of callables.
1071 cls
1072 Optional, a type. All public methods of cls are considered the
1073 interface. An ``obj`` instance of cls will always pass, ignoring
1074 ``required``..
1075 methods
1076 Optional, a sequence of method names to consider as the interface.
1077 required
1078 Optional, a sequence of mandatory implementations. If omitted, an
1079 ``obj`` that provides at least one interface method is considered
1080 sufficient. As a convenience, required may be a type, in which case
1081 all public methods of the type are required.
1082
1083 """
1084 if not cls and not methods:
1085 raise TypeError("a class or collection of method names are required")
1086
1087 if isinstance(cls, type) and isinstance(obj, cls):
1088 return obj
1089
1090 interface = set(methods or [m for m in dir(cls) if not m.startswith("_")])
1091 implemented = set(dir(obj))
1092
1093 complies = operator.ge
1094 if isinstance(required, type):
1095 required = interface
1096 elif not required:
1097 required = set()
1098 complies = operator.gt
1099 else:
1100 required = set(required)
1101
1102 if complies(implemented.intersection(interface), required):
1103 return obj
1104
1105 # No dict duck typing here.
1106 if not isinstance(obj, dict):
1107 qualifier = complies is operator.gt and "any of" or "all of"
1108 raise TypeError(
1109 "%r does not implement %s: %s"
1110 % (obj, qualifier, ", ".join(interface))
1111 )
1112
1113 class AnonymousInterface:
1114 """A callable-holding shell."""
1115
1116 if cls:
1117 AnonymousInterface.__name__ = "Anonymous" + cls.__name__
1118 found = set()
1119
1120 for method, impl in dictlike_iteritems(obj):
1121 if method not in interface:
1122 raise TypeError("%r: unknown in this interface" % method)
1123 if not callable(impl):
1124 raise TypeError("%r=%r is not callable" % (method, impl))
1125 setattr(AnonymousInterface, method, staticmethod(impl))
1126 found.add(method)
1127
1128 if complies(found, required):
1129 return AnonymousInterface
1130
1131 raise TypeError(
1132 "dictionary does not contain required keys %s"
1133 % ", ".join(required - found)
1134 )
1135
1136
1137_GFD = TypeVar("_GFD", bound="generic_fn_descriptor[Any]")
1138
1139
1140class generic_fn_descriptor(Generic[_T_co]):
1141 """Descriptor which proxies a function when the attribute is not
1142 present in dict
1143
1144 This superclass is organized in a particular way with "memoized" and
1145 "non-memoized" implementation classes that are hidden from type checkers,
1146 as Mypy seems to not be able to handle seeing multiple kinds of descriptor
1147 classes used for the same attribute.
1148
1149 """
1150
1151 fget: Callable[..., _T_co]
1152 __doc__: Optional[str]
1153 __name__: str
1154
1155 def __init__(self, fget: Callable[..., _T_co], doc: Optional[str] = None):
1156 self.fget = fget
1157 self.__doc__ = doc or fget.__doc__
1158 self.__name__ = fget.__name__
1159
1160 @overload
1161 def __get__(self: _GFD, obj: None, cls: Any) -> _GFD: ...
1162
1163 @overload
1164 def __get__(self, obj: object, cls: Any) -> _T_co: ...
1165
1166 def __get__(self: _GFD, obj: Any, cls: Any) -> Union[_GFD, _T_co]:
1167 raise NotImplementedError()
1168
1169 if TYPE_CHECKING:
1170
1171 def __set__(self, instance: Any, value: Any) -> None: ...
1172
1173 def __delete__(self, instance: Any) -> None: ...
1174
1175 def _reset(self, obj: Any) -> None:
1176 raise NotImplementedError()
1177
1178 @classmethod
1179 def reset(cls, obj: Any, name: str) -> None:
1180 raise NotImplementedError()
1181
1182
1183class _non_memoized_property(generic_fn_descriptor[_T_co]):
1184 """a plain descriptor that proxies a function.
1185
1186 primary rationale is to provide a plain attribute that's
1187 compatible with memoized_property which is also recognized as equivalent
1188 by mypy.
1189
1190 """
1191
1192 if not TYPE_CHECKING:
1193
1194 def __get__(self, obj, cls):
1195 if obj is None:
1196 return self
1197 return self.fget(obj)
1198
1199
1200class _memoized_property(generic_fn_descriptor[_T_co]):
1201 """A read-only @property that is only evaluated once."""
1202
1203 if not TYPE_CHECKING:
1204
1205 def __get__(self, obj, cls):
1206 if obj is None:
1207 return self
1208 obj.__dict__[self.__name__] = result = self.fget(obj)
1209 return result
1210
1211 def _reset(self, obj):
1212 _memoized_property.reset(obj, self.__name__)
1213
1214 @classmethod
1215 def reset(cls, obj, name):
1216 obj.__dict__.pop(name, None)
1217
1218
1219# despite many attempts to get Mypy to recognize an overridden descriptor
1220# where one is memoized and the other isn't, there seems to be no reliable
1221# way other than completely deceiving the type checker into thinking there
1222# is just one single descriptor type everywhere. Otherwise, if a superclass
1223# has non-memoized and subclass has memoized, that requires
1224# "class memoized(non_memoized)". but then if a superclass has memoized and
1225# superclass has non-memoized, the class hierarchy of the descriptors
1226# would need to be reversed; "class non_memoized(memoized)". so there's no
1227# way to achieve this.
1228# additional issues, RO properties:
1229# https://github.com/python/mypy/issues/12440
1230if TYPE_CHECKING:
1231 # allow memoized and non-memoized to be freely mixed by having them
1232 # be the same class
1233 memoized_property = generic_fn_descriptor
1234 non_memoized_property = generic_fn_descriptor
1235
1236 # for read only situations, mypy only sees @property as read only.
1237 # read only is needed when a subtype specializes the return type
1238 # of a property, meaning assignment needs to be disallowed
1239 ro_memoized_property = property
1240 ro_non_memoized_property = property
1241
1242else:
1243 memoized_property = ro_memoized_property = _memoized_property
1244 non_memoized_property = ro_non_memoized_property = _non_memoized_property
1245
1246
1247def memoized_instancemethod(fn: _F) -> _F:
1248 """Decorate a method memoize its return value.
1249
1250 Best applied to no-arg methods: memoization is not sensitive to
1251 argument values, and will always return the same value even when
1252 called with different arguments.
1253
1254 """
1255
1256 def oneshot(self, *args, **kw):
1257 result = fn(self, *args, **kw)
1258
1259 def memo(*a, **kw):
1260 return result
1261
1262 memo.__name__ = fn.__name__
1263 memo.__doc__ = fn.__doc__
1264 self.__dict__[fn.__name__] = memo
1265 return result
1266
1267 return update_wrapper(oneshot, fn) # type: ignore
1268
1269
1270class HasMemoized:
1271 """A mixin class that maintains the names of memoized elements in a
1272 collection for easy cache clearing, generative, etc.
1273
1274 """
1275
1276 if not TYPE_CHECKING:
1277 # support classes that want to have __slots__ with an explicit
1278 # slot for __dict__. not sure if that requires base __slots__ here.
1279 __slots__ = ()
1280
1281 _memoized_keys: FrozenSet[str] = frozenset()
1282
1283 def _reset_memoizations(self) -> None:
1284 for elem in self._memoized_keys:
1285 self.__dict__.pop(elem, None)
1286
1287 def _assert_no_memoizations(self) -> None:
1288 for elem in self._memoized_keys:
1289 assert elem not in self.__dict__
1290
1291 def _set_memoized_attribute(self, key: str, value: Any) -> None:
1292 self.__dict__[key] = value
1293 self._memoized_keys |= {key}
1294
1295 class memoized_attribute(memoized_property[_T]):
1296 """A read-only @property that is only evaluated once.
1297
1298 :meta private:
1299
1300 """
1301
1302 fget: Callable[..., _T]
1303 __doc__: Optional[str]
1304 __name__: str
1305
1306 def __init__(self, fget: Callable[..., _T], doc: Optional[str] = None):
1307 self.fget = fget
1308 self.__doc__ = doc or fget.__doc__
1309 self.__name__ = fget.__name__
1310
1311 @overload
1312 def __get__(self: _MA, obj: None, cls: Any) -> _MA: ...
1313
1314 @overload
1315 def __get__(self, obj: Any, cls: Any) -> _T: ...
1316
1317 def __get__(self, obj, cls):
1318 if obj is None:
1319 return self
1320 obj.__dict__[self.__name__] = result = self.fget(obj)
1321 obj._memoized_keys |= {self.__name__}
1322 return result
1323
1324 @classmethod
1325 def memoized_instancemethod(cls, fn: _F) -> _F:
1326 """Decorate a method memoize its return value.
1327
1328 :meta private:
1329
1330 """
1331
1332 def oneshot(self: Any, *args: Any, **kw: Any) -> Any:
1333 result = fn(self, *args, **kw)
1334
1335 def memo(*a, **kw):
1336 return result
1337
1338 memo.__name__ = fn.__name__
1339 memo.__doc__ = fn.__doc__
1340 self.__dict__[fn.__name__] = memo
1341 self._memoized_keys |= {fn.__name__}
1342 return result
1343
1344 return update_wrapper(oneshot, fn) # type: ignore
1345
1346
1347if TYPE_CHECKING:
1348 HasMemoized_ro_memoized_attribute = property
1349else:
1350 HasMemoized_ro_memoized_attribute = HasMemoized.memoized_attribute
1351
1352
1353class MemoizedSlots:
1354 """Apply memoized items to an object using a __getattr__ scheme.
1355
1356 This allows the functionality of memoized_property and
1357 memoized_instancemethod to be available to a class using __slots__.
1358
1359 The memoized get is not threadsafe under freethreading and the
1360 creator method may in extremely rare cases be called more than once.
1361
1362 """
1363
1364 __slots__ = ()
1365
1366 def _fallback_getattr(self, key):
1367 raise AttributeError(key)
1368
1369 def __getattr__(self, key: str) -> Any:
1370 if key.startswith("_memoized_attr_") or key.startswith(
1371 "_memoized_method_"
1372 ):
1373 raise AttributeError(key)
1374 # to avoid recursion errors when interacting with other __getattr__
1375 # schemes that refer to this one, when testing for memoized method
1376 # look at __class__ only rather than going into __getattr__ again.
1377 elif hasattr(self.__class__, f"_memoized_attr_{key}"):
1378 value = getattr(self, f"_memoized_attr_{key}")()
1379 setattr(self, key, value)
1380 return value
1381 elif hasattr(self.__class__, f"_memoized_method_{key}"):
1382 meth = getattr(self, f"_memoized_method_{key}")
1383
1384 def oneshot(*args, **kw):
1385 result = meth(*args, **kw)
1386
1387 def memo(*a, **kw):
1388 return result
1389
1390 memo.__name__ = meth.__name__
1391 memo.__doc__ = meth.__doc__
1392 setattr(self, key, memo)
1393 return result
1394
1395 oneshot.__doc__ = meth.__doc__
1396 return oneshot
1397 else:
1398 return self._fallback_getattr(key)
1399
1400
1401# from paste.deploy.converters
1402def asbool(obj: Any) -> bool:
1403 if isinstance(obj, str):
1404 obj = obj.strip().lower()
1405 if obj in ["true", "yes", "on", "y", "t", "1"]:
1406 return True
1407 elif obj in ["false", "no", "off", "n", "f", "0"]:
1408 return False
1409 else:
1410 raise ValueError("String is not true/false: %r" % obj)
1411 return bool(obj)
1412
1413
1414def bool_or_str(*text: str) -> Callable[[str], Union[str, bool]]:
1415 """Return a callable that will evaluate a string as
1416 boolean, or one of a set of "alternate" string values.
1417
1418 """
1419
1420 def bool_or_value(obj: str) -> Union[str, bool]:
1421 if obj in text:
1422 return obj
1423 else:
1424 return asbool(obj)
1425
1426 return bool_or_value
1427
1428
1429def asint(value: Any) -> Optional[int]:
1430 """Coerce to integer."""
1431
1432 if value is None:
1433 return value
1434 return int(value)
1435
1436
1437def coerce_kw_type(
1438 kw: Dict[str, Any],
1439 key: str,
1440 type_: Type[Any],
1441 flexi_bool: bool = True,
1442 dest: Optional[Dict[str, Any]] = None,
1443) -> None:
1444 r"""If 'key' is present in dict 'kw', coerce its value to type 'type\_' if
1445 necessary. If 'flexi_bool' is True, the string '0' is considered false
1446 when coercing to boolean.
1447 """
1448
1449 if dest is None:
1450 dest = kw
1451
1452 if (
1453 key in kw
1454 and (not isinstance(type_, type) or not isinstance(kw[key], type_))
1455 and kw[key] is not None
1456 ):
1457 if type_ is bool and flexi_bool:
1458 dest[key] = asbool(kw[key])
1459 else:
1460 dest[key] = type_(kw[key])
1461
1462
1463def constructor_key(obj: Any, cls: Type[Any]) -> Tuple[Any, ...]:
1464 """Produce a tuple structure that is cacheable using the __dict__ of
1465 obj to retrieve values
1466
1467 """
1468 names = get_cls_kwargs(cls)
1469 return (cls,) + tuple(
1470 (k, obj.__dict__[k]) for k in names if k in obj.__dict__
1471 )
1472
1473
1474def constructor_copy(obj: _T, cls: Type[_T], *args: Any, **kw: Any) -> _T:
1475 """Instantiate cls using the __dict__ of obj as constructor arguments.
1476
1477 Uses inspect to match the named arguments of ``cls``.
1478
1479 """
1480
1481 names = get_cls_kwargs(cls)
1482 kw.update(
1483 (k, obj.__dict__[k]) for k in names.difference(kw) if k in obj.__dict__
1484 )
1485 return cls(*args, **kw)
1486
1487
1488def counter() -> Callable[[], int]:
1489 """Return a threadsafe counter function."""
1490
1491 lock = threading.Lock()
1492 counter = itertools.count(1)
1493
1494 # avoid the 2to3 "next" transformation...
1495 def _next():
1496 with lock:
1497 return next(counter)
1498
1499 return _next
1500
1501
1502def duck_type_collection(
1503 specimen: Any, default: Optional[Type[Any]] = None
1504) -> Optional[Type[Any]]:
1505 """Given an instance or class, guess if it is or is acting as one of
1506 the basic collection types: list, set and dict. If the __emulates__
1507 property is present, return that preferentially.
1508 """
1509
1510 if hasattr(specimen, "__emulates__"):
1511 # canonicalize set vs sets.Set to a standard: the builtin set
1512 if specimen.__emulates__ is not None and issubclass(
1513 specimen.__emulates__, set
1514 ):
1515 return set
1516 else:
1517 return specimen.__emulates__ # type: ignore
1518
1519 isa = issubclass if isinstance(specimen, type) else isinstance
1520 if isa(specimen, list):
1521 return list
1522 elif isa(specimen, set):
1523 return set
1524 elif isa(specimen, dict):
1525 return dict
1526
1527 if hasattr(specimen, "append"):
1528 return list
1529 elif hasattr(specimen, "add"):
1530 return set
1531 elif hasattr(specimen, "set"):
1532 return dict
1533 else:
1534 return default
1535
1536
1537def assert_arg_type(
1538 arg: Any, argtype: Union[Tuple[Type[Any], ...], Type[Any]], name: str
1539) -> Any:
1540 if isinstance(arg, argtype):
1541 return arg
1542 else:
1543 if isinstance(argtype, tuple):
1544 raise exc.ArgumentError(
1545 "Argument '%s' is expected to be one of type %s, got '%s'"
1546 % (name, " or ".join("'%s'" % a for a in argtype), type(arg))
1547 )
1548 else:
1549 raise exc.ArgumentError(
1550 "Argument '%s' is expected to be of type '%s', got '%s'"
1551 % (name, argtype, type(arg))
1552 )
1553
1554
1555def dictlike_iteritems(dictlike):
1556 """Return a (key, value) iterator for almost any dict-like object."""
1557
1558 if hasattr(dictlike, "items"):
1559 return list(dictlike.items())
1560
1561 getter = getattr(dictlike, "__getitem__", getattr(dictlike, "get", None))
1562 if getter is None:
1563 raise TypeError("Object '%r' is not dict-like" % dictlike)
1564
1565 if hasattr(dictlike, "iterkeys"):
1566
1567 def iterator():
1568 for key in dictlike.iterkeys():
1569 assert getter is not None
1570 yield key, getter(key)
1571
1572 return iterator()
1573 elif hasattr(dictlike, "keys"):
1574 return iter((key, getter(key)) for key in dictlike.keys())
1575 else:
1576 raise TypeError("Object '%r' is not dict-like" % dictlike)
1577
1578
1579class classproperty(property):
1580 """A decorator that behaves like @property except that operates
1581 on classes rather than instances.
1582
1583 The decorator is currently special when using the declarative
1584 module, but note that the
1585 :class:`~.sqlalchemy.ext.declarative.declared_attr`
1586 decorator should be used for this purpose with declarative.
1587
1588 """
1589
1590 fget: Callable[[Any], Any]
1591
1592 def __init__(self, fget: Callable[[Any], Any], *arg: Any, **kw: Any):
1593 super().__init__(fget, *arg, **kw)
1594 self.__doc__ = fget.__doc__
1595
1596 def __get__(self, obj: Any, cls: Optional[type] = None) -> Any:
1597 return self.fget(cls)
1598
1599
1600class hybridproperty(Generic[_T]):
1601 def __init__(self, func: Callable[..., _T]):
1602 self.func = func
1603 self.clslevel = func
1604
1605 def __get__(self, instance: Any, owner: Any) -> _T:
1606 if instance is None:
1607 clsval = self.clslevel(owner)
1608 return clsval
1609 else:
1610 return self.func(instance)
1611
1612 def classlevel(self, func: Callable[..., Any]) -> hybridproperty[_T]:
1613 self.clslevel = func
1614 return self
1615
1616
1617class rw_hybridproperty(Generic[_T]):
1618 def __init__(self, func: Callable[..., _T]):
1619 self.func = func
1620 self.clslevel = func
1621 self.setfn: Optional[Callable[..., Any]] = None
1622
1623 def __get__(self, instance: Any, owner: Any) -> _T:
1624 if instance is None:
1625 clsval = self.clslevel(owner)
1626 return clsval
1627 else:
1628 return self.func(instance)
1629
1630 def __set__(self, instance: Any, value: Any) -> None:
1631 assert self.setfn is not None
1632 self.setfn(instance, value)
1633
1634 def setter(self, func: Callable[..., Any]) -> rw_hybridproperty[_T]:
1635 self.setfn = func
1636 return self
1637
1638 def classlevel(self, func: Callable[..., Any]) -> rw_hybridproperty[_T]:
1639 self.clslevel = func
1640 return self
1641
1642
1643class hybridmethod(Generic[_T]):
1644 """Decorate a function as cls- or instance- level."""
1645
1646 def __init__(self, func: Callable[..., _T]):
1647 self.func = self.__func__ = func
1648 self.clslevel = func
1649
1650 def __get__(self, instance: Any, owner: Any) -> Callable[..., _T]:
1651 if instance is None:
1652 return self.clslevel.__get__( # type: ignore[no-any-return]
1653 owner, owner.__class__
1654 )
1655 else:
1656 return self.func.__get__( # type: ignore[no-any-return]
1657 instance, owner
1658 )
1659
1660 def classlevel(self, func: Callable[..., Any]) -> hybridmethod[_T]:
1661 self.clslevel = func
1662 return self
1663
1664
1665class symbol(int):
1666 """A constant symbol.
1667
1668 >>> symbol("foo") is symbol("foo")
1669 True
1670 >>> symbol("foo")
1671 <symbol 'foo>
1672
1673 A slight refinement of the MAGICCOOKIE=object() pattern. The primary
1674 advantage of symbol() is its repr(). They are also singletons.
1675
1676 Repeated calls of symbol('name') will all return the same instance.
1677
1678 """
1679
1680 name: str
1681
1682 symbols: Dict[str, symbol] = {}
1683 _lock = threading.Lock()
1684
1685 def __new__(
1686 cls,
1687 name: str,
1688 doc: Optional[str] = None,
1689 canonical: Optional[int] = None,
1690 ) -> symbol:
1691 with cls._lock:
1692 sym = cls.symbols.get(name)
1693 if sym is None:
1694 assert isinstance(name, str)
1695 if canonical is None:
1696 canonical = hash(name)
1697 sym = int.__new__(symbol, canonical)
1698 sym.name = name
1699 if doc:
1700 sym.__doc__ = doc
1701
1702 # NOTE: we should ultimately get rid of this global thing,
1703 # however, currently it is to support pickling. The best
1704 # change would be when we are on py3.11 at a minimum, we
1705 # switch to stdlib enum.IntFlag.
1706 cls.symbols[name] = sym
1707 else:
1708 if canonical and canonical != sym:
1709 raise TypeError(
1710 f"Can't replace canonical symbol for {name!r} "
1711 f"with new int value {canonical}"
1712 )
1713 return sym
1714
1715 def __reduce__(self):
1716 return symbol, (self.name, "x", int(self))
1717
1718 def __str__(self):
1719 return repr(self)
1720
1721 def __repr__(self):
1722 return f"symbol({self.name!r})"
1723
1724
1725class _IntFlagMeta(type):
1726 def __init__(
1727 cls,
1728 classname: str,
1729 bases: Tuple[Type[Any], ...],
1730 dict_: Dict[str, Any],
1731 **kw: Any,
1732 ) -> None:
1733 items: List[symbol]
1734 cls._items = items = []
1735 for k, v in dict_.items():
1736 if re.match(r"^__.*__$", k):
1737 continue
1738 if isinstance(v, int):
1739 sym = symbol(k, canonical=v)
1740 elif not k.startswith("_"):
1741 raise TypeError("Expected integer values for IntFlag")
1742 else:
1743 continue
1744 setattr(cls, k, sym)
1745 items.append(sym)
1746
1747 cls.__members__ = _collections.immutabledict(
1748 {sym.name: sym for sym in items}
1749 )
1750
1751 def __iter__(self) -> Iterator[symbol]:
1752 raise NotImplementedError(
1753 "iter not implemented to ensure compatibility with "
1754 "Python 3.11 IntFlag. Please use __members__. See "
1755 "https://github.com/python/cpython/issues/99304"
1756 )
1757
1758
1759class _FastIntFlag(metaclass=_IntFlagMeta):
1760 """An 'IntFlag' copycat that isn't slow when performing bitwise
1761 operations.
1762
1763 the ``FastIntFlag`` class will return ``enum.IntFlag`` under TYPE_CHECKING
1764 and ``_FastIntFlag`` otherwise.
1765
1766 """
1767
1768
1769if TYPE_CHECKING:
1770 from enum import IntFlag
1771
1772 FastIntFlag = IntFlag
1773else:
1774 FastIntFlag = _FastIntFlag
1775
1776
1777_E = TypeVar("_E", bound=enum.Enum)
1778
1779
1780def parse_user_argument_for_enum(
1781 arg: Any,
1782 choices: Dict[_E, List[Any]],
1783 name: str,
1784 resolve_symbol_names: bool = False,
1785) -> Optional[_E]:
1786 """Given a user parameter, parse the parameter into a chosen value
1787 from a list of choice objects, typically Enum values.
1788
1789 The user argument can be a string name that matches the name of a
1790 symbol, or the symbol object itself, or any number of alternate choices
1791 such as True/False/ None etc.
1792
1793 :param arg: the user argument.
1794 :param choices: dictionary of enum values to lists of possible
1795 entries for each.
1796 :param name: name of the argument. Used in an :class:`.ArgumentError`
1797 that is raised if the parameter doesn't match any available argument.
1798
1799 """
1800 for enum_value, choice in choices.items():
1801 if arg is enum_value:
1802 return enum_value
1803 elif resolve_symbol_names and arg == enum_value.name:
1804 return enum_value
1805 elif arg in choice:
1806 return enum_value
1807
1808 if arg is None:
1809 return None
1810
1811 raise exc.ArgumentError(f"Invalid value for '{name}': {arg!r}")
1812
1813
1814_creation_order = 1
1815
1816
1817def set_creation_order(instance: Any) -> None:
1818 """Assign a '_creation_order' sequence to the given instance.
1819
1820 This allows multiple instances to be sorted in order of creation
1821 (typically within a single thread; the counter is not particularly
1822 threadsafe).
1823
1824 """
1825 global _creation_order
1826 instance._creation_order = _creation_order
1827 _creation_order += 1
1828
1829
1830def warn_exception(func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any:
1831 """executes the given function, catches all exceptions and converts to
1832 a warning.
1833
1834 """
1835 try:
1836 return func(*args, **kwargs)
1837 except Exception:
1838 warn("%s('%s') ignored" % sys.exc_info()[0:2])
1839
1840
1841def ellipses_string(value, len_=25):
1842 try:
1843 if len(value) > len_:
1844 return "%s..." % value[0:len_]
1845 else:
1846 return value
1847 except TypeError:
1848 return value
1849
1850
1851class _hash_limit_string(str):
1852 """A string subclass that can only be hashed on a maximum amount
1853 of unique values.
1854
1855 This is used for warnings so that we can send out parameterized warnings
1856 without the __warningregistry__ of the module, or the non-overridable
1857 "once" registry within warnings.py, overloading memory,
1858
1859
1860 """
1861
1862 _hash: int
1863
1864 def __new__(
1865 cls, value: str, num: int, args: Sequence[Any]
1866 ) -> _hash_limit_string:
1867 interpolated = (value % args) + (
1868 " (this warning may be suppressed after %d occurrences)" % num
1869 )
1870 self = super().__new__(cls, interpolated)
1871 self._hash = hash("%s_%d" % (value, hash(interpolated) % num))
1872 return self
1873
1874 def __hash__(self) -> int:
1875 return self._hash
1876
1877 def __eq__(self, other: Any) -> bool:
1878 return hash(self) == hash(other)
1879
1880
1881def warn(msg: str, code: Optional[str] = None) -> None:
1882 """Issue a warning.
1883
1884 If msg is a string, :class:`.exc.SAWarning` is used as
1885 the category.
1886
1887 """
1888 if code:
1889 _warnings_warn(exc.SAWarning(msg, code=code))
1890 else:
1891 _warnings_warn(msg, exc.SAWarning)
1892
1893
1894def warn_limited(msg: str, args: Sequence[Any]) -> None:
1895 """Issue a warning with a parameterized string, limiting the number
1896 of registrations.
1897
1898 """
1899 if args:
1900 msg = _hash_limit_string(msg, 10, args)
1901 _warnings_warn(msg, exc.SAWarning)
1902
1903
1904_warning_tags: Dict[CodeType, Tuple[str, Type[Warning]]] = {}
1905
1906
1907def tag_method_for_warnings(
1908 message: str, category: Type[Warning]
1909) -> Callable[[_F], _F]:
1910 def go(fn):
1911 _warning_tags[fn.__code__] = (message, category)
1912 return fn
1913
1914 return go
1915
1916
1917_not_sa_pattern = re.compile(r"^(?:sqlalchemy\.(?!testing)|alembic\.)")
1918
1919
1920def _warnings_warn(
1921 message: Union[str, Warning],
1922 category: Optional[Type[Warning]] = None,
1923 stacklevel: int = 2,
1924) -> None:
1925
1926 if category is None and isinstance(message, Warning):
1927 category = type(message)
1928
1929 # adjust the given stacklevel to be outside of SQLAlchemy
1930 try:
1931 frame = sys._getframe(stacklevel)
1932 except ValueError:
1933 # being called from less than 3 (or given) stacklevels, weird,
1934 # but don't crash
1935 stacklevel = 0
1936 except:
1937 # _getframe() doesn't work, weird interpreter issue, weird,
1938 # ok, but don't crash
1939 stacklevel = 0
1940 else:
1941 stacklevel_found = warning_tag_found = False
1942 while frame is not None:
1943 # using __name__ here requires that we have __name__ in the
1944 # __globals__ of the decorated string functions we make also.
1945 # we generate this using {"__name__": fn.__module__}
1946 if not stacklevel_found and not re.match(
1947 _not_sa_pattern, frame.f_globals.get("__name__", "")
1948 ):
1949 # stop incrementing stack level if an out-of-SQLA line
1950 # were found.
1951 stacklevel_found = True
1952
1953 # however, for the warning tag thing, we have to keep
1954 # scanning up the whole traceback
1955
1956 if frame.f_code in _warning_tags:
1957 warning_tag_found = True
1958 _suffix, _category = _warning_tags[frame.f_code]
1959 category = category or _category
1960 message = f"{message} ({_suffix})"
1961
1962 frame = frame.f_back # type: ignore[assignment]
1963
1964 if not stacklevel_found:
1965 stacklevel += 1
1966 elif stacklevel_found and warning_tag_found:
1967 break
1968
1969 if category is not None:
1970 warnings.warn(message, category, stacklevel=stacklevel + 1)
1971 else:
1972 warnings.warn(message, stacklevel=stacklevel + 1)
1973
1974
1975def only_once(
1976 fn: Callable[..., _T], retry_on_exception: bool
1977) -> Callable[..., Optional[_T]]:
1978 """Decorate the given function to be a no-op after it is called exactly
1979 once."""
1980
1981 once = [fn]
1982
1983 def go(*arg: Any, **kw: Any) -> Optional[_T]:
1984 # strong reference fn so that it isn't garbage collected,
1985 # which interferes with the event system's expectations
1986 strong_fn = fn # noqa
1987 if once:
1988 once_fn = once.pop()
1989 try:
1990 return once_fn(*arg, **kw)
1991 except:
1992 if retry_on_exception:
1993 once.insert(0, once_fn)
1994 raise
1995
1996 return None
1997
1998 return go
1999
2000
2001_SQLA_RE = re.compile(r"sqlalchemy/([a-z_]+/){0,2}[a-z_]+\.py")
2002_UNITTEST_RE = re.compile(r"unit(?:2|test2?/)")
2003
2004
2005def chop_traceback(
2006 tb: List[str],
2007 exclude_prefix: re.Pattern[str] = _UNITTEST_RE,
2008 exclude_suffix: re.Pattern[str] = _SQLA_RE,
2009) -> List[str]:
2010 """Chop extraneous lines off beginning and end of a traceback.
2011
2012 :param tb:
2013 a list of traceback lines as returned by ``traceback.format_stack()``
2014
2015 :param exclude_prefix:
2016 a regular expression object matching lines to skip at beginning of
2017 ``tb``
2018
2019 :param exclude_suffix:
2020 a regular expression object matching lines to skip at end of ``tb``
2021 """
2022 start = 0
2023 end = len(tb) - 1
2024 while start <= end and exclude_prefix.search(tb[start]):
2025 start += 1
2026 while start <= end and exclude_suffix.search(tb[end]):
2027 end -= 1
2028 return tb[start : end + 1]
2029
2030
2031def attrsetter(attrname):
2032 code = "def set(obj, value): obj.%s = value" % attrname
2033 env = locals().copy()
2034 exec(code, env)
2035 return env["set"]
2036
2037
2038dunders_re = re.compile("^__.+__$")
2039
2040
2041class TypingOnly:
2042 """A mixin class that marks a class as 'typing only', meaning it has
2043 absolutely no methods, attributes, or runtime functionality whatsoever.
2044
2045 """
2046
2047 __slots__ = ()
2048
2049 def __init_subclass__(cls, **kw: Any) -> None:
2050 if TypingOnly in cls.__bases__:
2051 remaining = {
2052 name for name in cls.__dict__ if not dunders_re.match(name)
2053 }
2054 if remaining:
2055 raise AssertionError(
2056 f"Class {cls} directly inherits TypingOnly but has "
2057 f"additional attributes {remaining}."
2058 )
2059 super().__init_subclass__(**kw)
2060
2061
2062class EnsureKWArg:
2063 r"""Apply translation of functions to accept \**kw arguments if they
2064 don't already.
2065
2066 Used to ensure cross-compatibility with third party legacy code, for things
2067 like compiler visit methods that need to accept ``**kw`` arguments,
2068 but may have been copied from old code that didn't accept them.
2069
2070 """
2071
2072 ensure_kwarg: str
2073 """a regular expression that indicates method names for which the method
2074 should accept ``**kw`` arguments.
2075
2076 The class will scan for methods matching the name template and decorate
2077 them if necessary to ensure ``**kw`` parameters are accepted.
2078
2079 """
2080
2081 def __init_subclass__(cls) -> None:
2082 fn_reg = cls.ensure_kwarg
2083 clsdict = cls.__dict__
2084 if fn_reg:
2085 for key in clsdict:
2086 m = re.match(fn_reg, key)
2087 if m:
2088 fn = clsdict[key]
2089 spec = compat.inspect_getfullargspec(fn)
2090 if not spec.varkw:
2091 wrapped = cls._wrap_w_kw(fn)
2092 setattr(cls, key, wrapped)
2093 super().__init_subclass__()
2094
2095 @classmethod
2096 def _wrap_w_kw(cls, fn: Callable[..., Any]) -> Callable[..., Any]:
2097 def wrap(*arg: Any, **kw: Any) -> Any:
2098 return fn(*arg)
2099
2100 return update_wrapper(wrap, fn)
2101
2102
2103def wrap_callable(wrapper, fn):
2104 """Augment functools.update_wrapper() to work with objects with
2105 a ``__call__()`` method.
2106
2107 :param fn:
2108 object with __call__ method
2109
2110 """
2111 if hasattr(fn, "__name__"):
2112 return update_wrapper(wrapper, fn)
2113 else:
2114 _f = wrapper
2115 _f.__name__ = fn.__class__.__name__
2116 if hasattr(fn, "__module__"):
2117 _f.__module__ = fn.__module__
2118
2119 if hasattr(fn.__call__, "__doc__") and fn.__call__.__doc__:
2120 _f.__doc__ = fn.__call__.__doc__
2121 elif fn.__doc__:
2122 _f.__doc__ = fn.__doc__
2123
2124 return _f
2125
2126
2127def quoted_token_parser(value):
2128 """Parse a dotted identifier with accommodation for quoted names.
2129
2130 Includes support for SQL-style double quotes as a literal character.
2131
2132 E.g.::
2133
2134 >>> quoted_token_parser("name")
2135 ["name"]
2136 >>> quoted_token_parser("schema.name")
2137 ["schema", "name"]
2138 >>> quoted_token_parser('"Schema"."Name"')
2139 ['Schema', 'Name']
2140 >>> quoted_token_parser('"Schema"."Name""Foo"')
2141 ['Schema', 'Name""Foo']
2142
2143 """
2144
2145 if '"' not in value:
2146 return value.split(".")
2147
2148 # 0 = outside of quotes
2149 # 1 = inside of quotes
2150 state = 0
2151 result: List[List[str]] = [[]]
2152 idx = 0
2153 lv = len(value)
2154 while idx < lv:
2155 char = value[idx]
2156 if char == '"':
2157 if state == 1 and idx < lv - 1 and value[idx + 1] == '"':
2158 result[-1].append('"')
2159 idx += 1
2160 else:
2161 state ^= 1
2162 elif char == "." and state == 0:
2163 result.append([])
2164 else:
2165 result[-1].append(char)
2166 idx += 1
2167
2168 return ["".join(token) for token in result]
2169
2170
2171def add_parameter_text(params: Any, text: str) -> Callable[[_F], _F]:
2172 params = _collections.to_list(params)
2173
2174 def decorate(fn):
2175 doc = fn.__doc__ is not None and fn.__doc__ or ""
2176 if doc:
2177 doc = inject_param_text(doc, {param: text for param in params})
2178 fn.__doc__ = doc
2179 return fn
2180
2181 return decorate
2182
2183
2184def _dedent_docstring(text: str) -> str:
2185 split_text = text.split("\n", 1)
2186 if len(split_text) == 1:
2187 return text
2188 else:
2189 firstline, remaining = split_text
2190 if not firstline.startswith(" "):
2191 return firstline + "\n" + textwrap.dedent(remaining)
2192 else:
2193 return textwrap.dedent(text)
2194
2195
2196def inject_docstring_text(
2197 given_doctext: Optional[str], injecttext: str, pos: int
2198) -> str:
2199 doctext: str = _dedent_docstring(given_doctext or "")
2200 lines = doctext.split("\n")
2201 if len(lines) == 1:
2202 lines.append("")
2203 injectlines = textwrap.dedent(injecttext).split("\n")
2204 if injectlines[0]:
2205 injectlines.insert(0, "")
2206
2207 blanks = [num for num, line in enumerate(lines) if not line.strip()]
2208 blanks.insert(0, 0)
2209
2210 inject_pos = blanks[min(pos, len(blanks) - 1)]
2211
2212 lines = lines[0:inject_pos] + injectlines + lines[inject_pos:]
2213 return "\n".join(lines)
2214
2215
2216_param_reg = re.compile(r"(\s+):param (.+?):")
2217
2218
2219def inject_param_text(doctext: str, inject_params: Dict[str, str]) -> str:
2220 doclines = collections.deque(doctext.splitlines())
2221 lines = []
2222
2223 # TODO: this is not working for params like ":param case_sensitive=True:"
2224
2225 to_inject = None
2226 while doclines:
2227 line = doclines.popleft()
2228
2229 m = _param_reg.match(line)
2230
2231 if to_inject is None:
2232 if m:
2233 param = m.group(2).lstrip("*")
2234 if param in inject_params:
2235 # default indent to that of :param: plus one
2236 indent = " " * len(m.group(1)) + " "
2237
2238 # but if the next line has text, use that line's
2239 # indentation
2240 if doclines:
2241 m2 = re.match(r"(\s+)\S", doclines[0])
2242 if m2:
2243 indent = " " * len(m2.group(1))
2244
2245 to_inject = indent + inject_params[param]
2246 elif m:
2247 lines.extend(["\n", to_inject, "\n"])
2248 to_inject = None
2249 elif not line.rstrip():
2250 lines.extend([line, to_inject, "\n"])
2251 to_inject = None
2252 elif line.endswith("::"):
2253 # TODO: this still won't cover if the code example itself has
2254 # blank lines in it, need to detect those via indentation.
2255 lines.extend([line, doclines.popleft()])
2256 continue
2257 lines.append(line)
2258
2259 return "\n".join(lines)
2260
2261
2262def repr_tuple_names(names: List[str]) -> Optional[str]:
2263 """Trims a list of strings from the middle and return a string of up to
2264 four elements. Strings greater than 11 characters will be truncated"""
2265 if len(names) == 0:
2266 return None
2267 flag = len(names) <= 4
2268 names = names[0:4] if flag else names[0:3] + names[-1:]
2269 res = ["%s.." % name[:11] if len(name) > 11 else name for name in names]
2270 if flag:
2271 return ", ".join(res)
2272 else:
2273 return "%s, ..., %s" % (", ".join(res[0:3]), res[-1])
2274
2275
2276def has_compiled_ext(raise_=False):
2277 from ._has_cython import HAS_CYEXTENSION
2278
2279 if HAS_CYEXTENSION:
2280 return True
2281 elif raise_:
2282 raise ImportError(
2283 "cython extensions were expected to be installed, "
2284 "but are not present"
2285 )
2286 else:
2287 return False
2288
2289
2290def load_uncompiled_module(module: _M) -> _M:
2291 """Load the non-compied version of a module that is also
2292 compiled with cython.
2293 """
2294 full_name = module.__name__
2295 assert module.__spec__
2296 parent_name = module.__spec__.parent
2297 assert parent_name
2298 parent_module = sys.modules[parent_name]
2299 assert parent_module.__spec__
2300 package_path = parent_module.__spec__.origin
2301 assert package_path and package_path.endswith("__init__.py")
2302
2303 name = full_name.split(".")[-1]
2304 module_path = package_path.replace("__init__.py", f"{name}.py")
2305
2306 py_spec = importlib.util.spec_from_file_location(full_name, module_path)
2307 assert py_spec
2308 py_module = importlib.util.module_from_spec(py_spec)
2309 assert py_spec.loader
2310 py_spec.loader.exec_module(py_module)
2311 return cast(_M, py_module)
2312
2313
2314class _Missing(enum.Enum):
2315 Missing = enum.auto()
2316
2317
2318Missing = _Missing.Missing
2319MissingOr = Union[_T, Literal[_Missing.Missing]]