1# util/langhelpers.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Routines to help with the creation, loading and introspection of
10modules, classes, hierarchies, attributes, functions, and methods.
11
12"""
13from __future__ import annotations
14
15import collections
16import enum
17from functools import update_wrapper
18import importlib.util
19import inspect
20import itertools
21import operator
22import re
23import sys
24import textwrap
25import threading
26import types
27from types import CodeType
28from types import ModuleType
29from typing import Any
30from typing import Callable
31from typing import cast
32from typing import Dict
33from typing import FrozenSet
34from typing import Generic
35from typing import Iterator
36from typing import List
37from typing import Literal
38from typing import Mapping
39from typing import NoReturn
40from typing import Optional
41from typing import overload
42from typing import Sequence
43from typing import Set
44from typing import Tuple
45from typing import Type
46from typing import TYPE_CHECKING
47from typing import TypeVar
48from typing import Union
49import warnings
50
51from . import _collections
52from . import compat
53from .. import exc
54
55_T = TypeVar("_T")
56_T_co = TypeVar("_T_co", covariant=True)
57_F = TypeVar("_F", bound=Callable[..., Any])
58_MA = TypeVar("_MA", bound="HasMemoized.memoized_attribute[Any]")
59_M = TypeVar("_M", bound=ModuleType)
60
61if compat.py314:
62
63 import annotationlib
64
65 def get_annotations(obj: Any) -> Mapping[str, Any]:
66 return annotationlib.get_annotations(
67 obj, format=annotationlib.Format.FORWARDREF
68 )
69
70else:
71
72 def get_annotations(obj: Any) -> Mapping[str, Any]:
73 return inspect.get_annotations(obj)
74
75
76def restore_annotations(
77 cls: type, new_annotations: dict[str, Any]
78) -> Callable[[], None]:
79 """apply alternate annotations to a class, with a callable to restore
80 the pristine state of the former.
81 This is used strictly to provide dataclasses on a mapped class, where
82 in some cases where are making dataclass fields based on an attribute
83 that is actually a python descriptor on a superclass which we called
84 to get a value.
85 if dataclasses were to give us a way to achieve this without swapping
86 __annotations__, that would be much better.
87 """
88 delattr_ = object()
89
90 # pep-649 means classes have "__annotate__", and it's a callable. if it's
91 # there and is None, we're in "legacy future mode", where it's python 3.14
92 # or higher and "from __future__ import annotations" is set. in "legacy
93 # future mode" we have to do the same steps we do for older pythons,
94 # __annotate__ can be ignored
95 is_pep649 = hasattr(cls, "__annotate__") and cls.__annotate__ is not None
96
97 if is_pep649:
98 memoized = {
99 "__annotate__": getattr(cls, "__annotate__", delattr_),
100 }
101 else:
102 memoized = {
103 "__annotations__": getattr(cls, "__annotations__", delattr_)
104 }
105
106 cls.__annotations__ = new_annotations
107
108 def restore():
109 for k, v in memoized.items():
110 if v is delattr_:
111 delattr(cls, k)
112 else:
113 setattr(cls, k, v)
114
115 return restore
116
117
118def md5_hex(x: Any) -> str:
119 x = x.encode("utf-8")
120 m = compat.md5_not_for_security()
121 m.update(x)
122 return cast(str, m.hexdigest())
123
124
125class safe_reraise:
126 """Reraise an exception after invoking some
127 handler code.
128
129 Stores the existing exception info before
130 invoking so that it is maintained across a potential
131 coroutine context switch.
132
133 e.g.::
134
135 try:
136 sess.commit()
137 except:
138 with safe_reraise():
139 sess.rollback()
140
141 TODO: we should at some point evaluate current behaviors in this regard
142 based on current greenlet, gevent/eventlet implementations in Python 3, and
143 also see the degree to which our own asyncio (based on greenlet also) is
144 impacted by this. .rollback() will cause IO / context switch to occur in
145 all these scenarios; what happens to the exception context from an
146 "except:" block if we don't explicitly store it? Original issue was #2703.
147
148 """
149
150 __slots__ = ("_exc_info",)
151
152 _exc_info: Union[
153 None,
154 Tuple[
155 Type[BaseException],
156 BaseException,
157 types.TracebackType,
158 ],
159 Tuple[None, None, None],
160 ]
161
162 def __enter__(self) -> None:
163 self._exc_info = sys.exc_info()
164
165 def __exit__(
166 self,
167 type_: Optional[Type[BaseException]],
168 value: Optional[BaseException],
169 traceback: Optional[types.TracebackType],
170 ) -> NoReturn:
171 assert self._exc_info is not None
172 # see #2703 for notes
173 if type_ is None:
174 exc_type, exc_value, exc_tb = self._exc_info
175 assert exc_value is not None
176 self._exc_info = None # remove potential circular references
177 raise exc_value.with_traceback(exc_tb)
178 else:
179 self._exc_info = None # remove potential circular references
180 assert value is not None
181 raise value.with_traceback(traceback)
182
183
184def walk_subclasses(cls: Type[_T]) -> Iterator[Type[_T]]:
185 seen: Set[Any] = set()
186
187 stack = [cls]
188 while stack:
189 cls = stack.pop()
190 if cls in seen:
191 continue
192 else:
193 seen.add(cls)
194 stack.extend(cls.__subclasses__())
195 yield cls
196
197
198def string_or_unprintable(element: Any) -> str:
199 if isinstance(element, str):
200 return element
201 else:
202 try:
203 return str(element)
204 except Exception:
205 return "unprintable element %r" % element
206
207
208def clsname_as_plain_name(
209 cls: Type[Any], use_name: Optional[str] = None
210) -> str:
211 name = use_name or cls.__name__
212 return " ".join(n.lower() for n in re.findall(r"([A-Z][a-z]+|SQL)", name))
213
214
215def method_is_overridden(
216 instance_or_cls: Union[Type[Any], object],
217 against_method: Callable[..., Any],
218) -> bool:
219 """Return True if the two class methods don't match."""
220
221 if not isinstance(instance_or_cls, type):
222 current_cls = instance_or_cls.__class__
223 else:
224 current_cls = instance_or_cls
225
226 method_name = against_method.__name__
227
228 current_method: types.MethodType = getattr(current_cls, method_name)
229
230 return current_method != against_method
231
232
233def decode_slice(slc: slice) -> Tuple[Any, ...]:
234 """decode a slice object as sent to __getitem__.
235
236 takes into account the 2.5 __index__() method, basically.
237
238 """
239 ret: List[Any] = []
240 for x in slc.start, slc.stop, slc.step:
241 if hasattr(x, "__index__"):
242 x = x.__index__()
243 ret.append(x)
244 return tuple(ret)
245
246
247def _unique_symbols(used: Sequence[str], *bases: str) -> Iterator[str]:
248 used_set = set(used)
249 for base in bases:
250 pool = itertools.chain(
251 (base,),
252 map(lambda i: base + str(i), range(1000)),
253 )
254 for sym in pool:
255 if sym not in used_set:
256 used_set.add(sym)
257 yield sym
258 break
259 else:
260 raise NameError("exhausted namespace for symbol base %s" % base)
261
262
263def map_bits(fn: Callable[[int], Any], n: int) -> Iterator[Any]:
264 """Call the given function given each nonzero bit from n."""
265
266 while n:
267 b = n & (~n + 1)
268 yield fn(b)
269 n ^= b
270
271
272_Fn = TypeVar("_Fn", bound="Callable[..., Any]")
273
274# this seems to be in flux in recent mypy versions
275
276
277def decorator(target: Callable[..., Any]) -> Callable[[_Fn], _Fn]:
278 """A signature-matching decorator factory."""
279
280 def decorate(fn: _Fn) -> _Fn:
281 if not inspect.isfunction(fn) and not inspect.ismethod(fn):
282 raise Exception("not a decoratable function")
283
284 # Python 3.14 defer creating __annotations__ until its used.
285 # We do not want to create __annotations__ now.
286 annofunc = getattr(fn, "__annotate__", None)
287 if annofunc is not None:
288 fn.__annotate__ = None # type: ignore[union-attr]
289 try:
290 spec = compat.inspect_getfullargspec(fn)
291 finally:
292 fn.__annotate__ = annofunc # type: ignore[union-attr]
293 else:
294 spec = compat.inspect_getfullargspec(fn)
295
296 # Do not generate code for annotations.
297 # update_wrapper() copies the annotation from fn to decorated.
298 # We use dummy defaults for code generation to avoid having
299 # copy of large globals for compiling.
300 # We copy __defaults__ and __kwdefaults__ from fn to decorated.
301 empty_defaults = (None,) * len(spec.defaults or ())
302 empty_kwdefaults = dict.fromkeys(spec.kwonlydefaults or ())
303 spec = spec._replace(
304 annotations={},
305 defaults=empty_defaults,
306 kwonlydefaults=empty_kwdefaults,
307 )
308
309 names = (
310 tuple(cast("Tuple[str, ...]", spec[0]))
311 + cast("Tuple[str, ...]", spec[1:3])
312 + (fn.__name__,)
313 )
314 targ_name, fn_name = _unique_symbols(names, "target", "fn")
315
316 metadata: Dict[str, Optional[str]] = dict(target=targ_name, fn=fn_name)
317 metadata.update(format_argspec_plus(spec, grouped=False))
318 metadata["name"] = fn.__name__
319
320 if inspect.iscoroutinefunction(fn):
321 metadata["prefix"] = "async "
322 metadata["target_prefix"] = "await "
323 else:
324 metadata["prefix"] = ""
325 metadata["target_prefix"] = ""
326
327 # look for __ positional arguments. This is a convention in
328 # SQLAlchemy that arguments should be passed positionally
329 # rather than as keyword
330 # arguments. note that apply_pos doesn't currently work in all cases
331 # such as when a kw-only indicator "*" is present, which is why
332 # we limit the use of this to just that case we can detect. As we add
333 # more kinds of methods that use @decorator, things may have to
334 # be further improved in this area
335 if "__" in repr(spec[0]):
336 code = (
337 """\
338%(prefix)sdef %(name)s%(grouped_args)s:
339 return %(target_prefix)s%(target)s(%(fn)s, %(apply_pos)s)
340"""
341 % metadata
342 )
343 else:
344 code = (
345 """\
346%(prefix)sdef %(name)s%(grouped_args)s:
347 return %(target_prefix)s%(target)s(%(fn)s, %(apply_kw)s)
348"""
349 % metadata
350 )
351
352 env: Dict[str, Any] = {
353 targ_name: target,
354 fn_name: fn,
355 "__name__": fn.__module__,
356 }
357
358 decorated = cast(
359 types.FunctionType,
360 _exec_code_in_env(code, env, fn.__name__),
361 )
362 decorated.__defaults__ = fn.__defaults__
363 decorated.__kwdefaults__ = fn.__kwdefaults__ # type: ignore
364 return update_wrapper(decorated, fn) # type: ignore[return-value]
365
366 return update_wrapper(decorate, target) # type: ignore[return-value]
367
368
369def _exec_code_in_env(
370 code: Union[str, types.CodeType], env: Dict[str, Any], fn_name: str
371) -> Callable[..., Any]:
372 exec(code, env)
373 return env[fn_name] # type: ignore[no-any-return]
374
375
376_PF = TypeVar("_PF")
377_TE = TypeVar("_TE")
378
379
380class PluginLoader:
381 def __init__(
382 self, group: str, auto_fn: Optional[Callable[..., Any]] = None
383 ):
384 self.group = group
385 self.impls: Dict[str, Any] = {}
386 self.auto_fn = auto_fn
387
388 def clear(self):
389 self.impls.clear()
390
391 def load(self, name: str) -> Any:
392 if name in self.impls:
393 return self.impls[name]()
394
395 if self.auto_fn:
396 loader = self.auto_fn(name)
397 if loader:
398 self.impls[name] = loader
399 return loader()
400
401 for impl in compat.importlib_metadata_get(self.group):
402 if impl.name == name:
403 self.impls[name] = impl.load
404 return impl.load()
405
406 raise exc.NoSuchModuleError(
407 "Can't load plugin: %s:%s" % (self.group, name)
408 )
409
410 def register(self, name: str, modulepath: str, objname: str) -> None:
411 def load():
412 mod = __import__(modulepath)
413 for token in modulepath.split(".")[1:]:
414 mod = getattr(mod, token)
415 return getattr(mod, objname)
416
417 self.impls[name] = load
418
419 def deregister(self, name: str) -> None:
420 del self.impls[name]
421
422
423def _inspect_func_args(fn):
424 try:
425 co_varkeywords = inspect.CO_VARKEYWORDS
426 except AttributeError:
427 # https://docs.python.org/3/library/inspect.html
428 # The flags are specific to CPython, and may not be defined in other
429 # Python implementations. Furthermore, the flags are an implementation
430 # detail, and can be removed or deprecated in future Python releases.
431 spec = compat.inspect_getfullargspec(fn)
432 return spec[0], bool(spec[2])
433 else:
434 # use fn.__code__ plus flags to reduce method call overhead
435 co = fn.__code__
436 nargs = co.co_argcount
437 return (
438 list(co.co_varnames[:nargs]),
439 bool(co.co_flags & co_varkeywords),
440 )
441
442
443@overload
444def get_cls_kwargs(
445 cls: type,
446 *,
447 _set: Optional[Set[str]] = None,
448 raiseerr: Literal[True] = ...,
449) -> Set[str]: ...
450
451
452@overload
453def get_cls_kwargs(
454 cls: type, *, _set: Optional[Set[str]] = None, raiseerr: bool = False
455) -> Optional[Set[str]]: ...
456
457
458def get_cls_kwargs(
459 cls: type, *, _set: Optional[Set[str]] = None, raiseerr: bool = False
460) -> Optional[Set[str]]:
461 r"""Return the full set of inherited kwargs for the given `cls`.
462
463 Probes a class's __init__ method, collecting all named arguments. If the
464 __init__ defines a \**kwargs catch-all, then the constructor is presumed
465 to pass along unrecognized keywords to its base classes, and the
466 collection process is repeated recursively on each of the bases.
467
468 Uses a subset of inspect.getfullargspec() to cut down on method overhead,
469 as this is used within the Core typing system to create copies of type
470 objects which is a performance-sensitive operation.
471
472 No anonymous tuple arguments please !
473
474 """
475 toplevel = _set is None
476 if toplevel:
477 _set = set()
478 assert _set is not None
479
480 ctr = cls.__dict__.get("__init__", False)
481
482 has_init = (
483 ctr
484 and isinstance(ctr, types.FunctionType)
485 and isinstance(ctr.__code__, types.CodeType)
486 )
487
488 if has_init:
489 names, has_kw = _inspect_func_args(ctr)
490 _set.update(names)
491
492 if not has_kw and not toplevel:
493 if raiseerr:
494 raise TypeError(
495 f"given cls {cls} doesn't have an __init__ method"
496 )
497 else:
498 return None
499 else:
500 has_kw = False
501
502 if not has_init or has_kw:
503 for c in cls.__bases__:
504 if get_cls_kwargs(c, _set=_set) is None:
505 break
506
507 _set.discard("self")
508 return _set
509
510
511def get_func_kwargs(func: Callable[..., Any]) -> List[str]:
512 """Return the set of legal kwargs for the given `func`.
513
514 Uses getargspec so is safe to call for methods, functions,
515 etc.
516
517 """
518
519 return compat.inspect_getfullargspec(func)[0]
520
521
522def get_callable_argspec(
523 fn: Callable[..., Any], no_self: bool = False, _is_init: bool = False
524) -> compat.FullArgSpec:
525 """Return the argument signature for any callable.
526
527 All pure-Python callables are accepted, including
528 functions, methods, classes, objects with __call__;
529 builtins and other edge cases like functools.partial() objects
530 raise a TypeError.
531
532 """
533 if inspect.isbuiltin(fn):
534 raise TypeError("Can't inspect builtin: %s" % fn)
535 elif inspect.isfunction(fn):
536 if _is_init and no_self:
537 spec = compat.inspect_getfullargspec(fn)
538 return compat.FullArgSpec(
539 spec.args[1:],
540 spec.varargs,
541 spec.varkw,
542 spec.defaults,
543 spec.kwonlyargs,
544 spec.kwonlydefaults,
545 spec.annotations,
546 )
547 else:
548 return compat.inspect_getfullargspec(fn)
549 elif inspect.ismethod(fn):
550 if no_self and (_is_init or fn.__self__):
551 spec = compat.inspect_getfullargspec(fn.__func__)
552 return compat.FullArgSpec(
553 spec.args[1:],
554 spec.varargs,
555 spec.varkw,
556 spec.defaults,
557 spec.kwonlyargs,
558 spec.kwonlydefaults,
559 spec.annotations,
560 )
561 else:
562 return compat.inspect_getfullargspec(fn.__func__)
563 elif inspect.isclass(fn):
564 return get_callable_argspec(
565 fn.__init__, no_self=no_self, _is_init=True
566 )
567 elif hasattr(fn, "__func__"):
568 return compat.inspect_getfullargspec(fn.__func__)
569 elif hasattr(fn, "__call__"):
570 if inspect.ismethod(fn.__call__):
571 return get_callable_argspec(fn.__call__, no_self=no_self)
572 else:
573 raise TypeError("Can't inspect callable: %s" % fn)
574 else:
575 raise TypeError("Can't inspect callable: %s" % fn)
576
577
578def format_argspec_plus(
579 fn: Union[Callable[..., Any], compat.FullArgSpec], grouped: bool = True
580) -> Dict[str, Optional[str]]:
581 """Returns a dictionary of formatted, introspected function arguments.
582
583 A enhanced variant of inspect.formatargspec to support code generation.
584
585 fn
586 An inspectable callable or tuple of inspect getargspec() results.
587 grouped
588 Defaults to True; include (parens, around, argument) lists
589
590 Returns:
591
592 args
593 Full inspect.formatargspec for fn
594 self_arg
595 The name of the first positional argument, varargs[0], or None
596 if the function defines no positional arguments.
597 apply_pos
598 args, re-written in calling rather than receiving syntax. Arguments are
599 passed positionally.
600 apply_kw
601 Like apply_pos, except keyword-ish args are passed as keywords.
602 apply_pos_proxied
603 Like apply_pos but omits the self/cls argument
604
605 Example::
606
607 >>> format_argspec_plus(lambda self, a, b, c=3, **d: 123)
608 {'grouped_args': '(self, a, b, c=3, **d)',
609 'self_arg': 'self',
610 'apply_kw': '(self, a, b, c=c, **d)',
611 'apply_pos': '(self, a, b, c, **d)'}
612
613 """
614 if callable(fn):
615 spec = compat.inspect_getfullargspec(fn)
616 else:
617 spec = fn
618
619 args = compat.inspect_formatargspec(*spec)
620
621 apply_pos = compat.inspect_formatargspec(
622 spec[0], spec[1], spec[2], None, spec[4]
623 )
624
625 if spec[0]:
626 self_arg = spec[0][0]
627
628 apply_pos_proxied = compat.inspect_formatargspec(
629 spec[0][1:], spec[1], spec[2], None, spec[4]
630 )
631
632 elif spec[1]:
633 # I'm not sure what this is
634 self_arg = "%s[0]" % spec[1]
635
636 apply_pos_proxied = apply_pos
637 else:
638 self_arg = None
639 apply_pos_proxied = apply_pos
640
641 num_defaults = 0
642 if spec[3]:
643 num_defaults += len(cast(Tuple[Any], spec[3]))
644 if spec[4]:
645 num_defaults += len(spec[4])
646
647 name_args = spec[0] + spec[4]
648
649 defaulted_vals: Union[List[str], Tuple[()]]
650
651 if num_defaults:
652 defaulted_vals = name_args[0 - num_defaults :]
653 else:
654 defaulted_vals = ()
655
656 apply_kw = compat.inspect_formatargspec(
657 name_args,
658 spec[1],
659 spec[2],
660 defaulted_vals,
661 formatvalue=lambda x: "=" + str(x),
662 )
663
664 if spec[0]:
665 apply_kw_proxied = compat.inspect_formatargspec(
666 name_args[1:],
667 spec[1],
668 spec[2],
669 defaulted_vals,
670 formatvalue=lambda x: "=" + str(x),
671 )
672 else:
673 apply_kw_proxied = apply_kw
674
675 if grouped:
676 return dict(
677 grouped_args=args,
678 self_arg=self_arg,
679 apply_pos=apply_pos,
680 apply_kw=apply_kw,
681 apply_pos_proxied=apply_pos_proxied,
682 apply_kw_proxied=apply_kw_proxied,
683 )
684 else:
685 return dict(
686 grouped_args=args,
687 self_arg=self_arg,
688 apply_pos=apply_pos[1:-1],
689 apply_kw=apply_kw[1:-1],
690 apply_pos_proxied=apply_pos_proxied[1:-1],
691 apply_kw_proxied=apply_kw_proxied[1:-1],
692 )
693
694
695def format_argspec_init(method, grouped=True):
696 """format_argspec_plus with considerations for typical __init__ methods
697
698 Wraps format_argspec_plus with error handling strategies for typical
699 __init__ cases:
700
701 .. sourcecode:: text
702
703 object.__init__ -> (self)
704 other unreflectable (usually C) -> (self, *args, **kwargs)
705
706 """
707 if method is object.__init__:
708 grouped_args = "(self)"
709 args = "(self)" if grouped else "self"
710 proxied = "()" if grouped else ""
711 else:
712 try:
713 return format_argspec_plus(method, grouped=grouped)
714 except TypeError:
715 grouped_args = "(self, *args, **kwargs)"
716 args = grouped_args if grouped else "self, *args, **kwargs"
717 proxied = "(*args, **kwargs)" if grouped else "*args, **kwargs"
718 return dict(
719 self_arg="self",
720 grouped_args=grouped_args,
721 apply_pos=args,
722 apply_kw=args,
723 apply_pos_proxied=proxied,
724 apply_kw_proxied=proxied,
725 )
726
727
728def create_proxy_methods(
729 target_cls: Type[Any],
730 target_cls_sphinx_name: str,
731 proxy_cls_sphinx_name: str,
732 classmethods: Sequence[str] = (),
733 methods: Sequence[str] = (),
734 attributes: Sequence[str] = (),
735 use_intermediate_variable: Sequence[str] = (),
736) -> Callable[[_T], _T]:
737 """A class decorator indicating attributes should refer to a proxy
738 class.
739
740 This decorator is now a "marker" that does nothing at runtime. Instead,
741 it is consumed by the tools/generate_proxy_methods.py script to
742 statically generate proxy methods and attributes that are fully
743 recognized by typing tools such as mypy.
744
745 """
746
747 def decorate(cls):
748 return cls
749
750 return decorate
751
752
753def getargspec_init(method):
754 """inspect.getargspec with considerations for typical __init__ methods
755
756 Wraps inspect.getargspec with error handling for typical __init__ cases:
757
758 .. sourcecode:: text
759
760 object.__init__ -> (self)
761 other unreflectable (usually C) -> (self, *args, **kwargs)
762
763 """
764 try:
765 return compat.inspect_getfullargspec(method)
766 except TypeError:
767 if method is object.__init__:
768 return (["self"], None, None, None)
769 else:
770 return (["self"], "args", "kwargs", None)
771
772
773def unbound_method_to_callable(func_or_cls):
774 """Adjust the incoming callable such that a 'self' argument is not
775 required.
776
777 """
778
779 if isinstance(func_or_cls, types.MethodType) and not func_or_cls.__self__:
780 return func_or_cls.__func__
781 else:
782 return func_or_cls
783
784
785def generic_repr(
786 obj: Any,
787 additional_kw: Sequence[Tuple[str, Any]] = (),
788 to_inspect: Optional[Union[object, List[object]]] = None,
789 omit_kwarg: Sequence[str] = (),
790) -> str:
791 """Produce a __repr__() based on direct association of the __init__()
792 specification vs. same-named attributes present.
793
794 """
795 if to_inspect is None:
796 to_inspect = [obj]
797 else:
798 to_inspect = _collections.to_list(to_inspect)
799
800 missing = object()
801
802 pos_args = []
803 kw_args: _collections.OrderedDict[str, Any] = _collections.OrderedDict()
804 vargs = None
805 for i, insp in enumerate(to_inspect):
806 try:
807 spec = compat.inspect_getfullargspec(insp.__init__)
808 except TypeError:
809 continue
810 else:
811 default_len = len(spec.defaults) if spec.defaults else 0
812 if i == 0:
813 if spec.varargs:
814 vargs = spec.varargs
815 if default_len:
816 pos_args.extend(spec.args[1:-default_len])
817 else:
818 pos_args.extend(spec.args[1:])
819 else:
820 kw_args.update(
821 [(arg, missing) for arg in spec.args[1:-default_len]]
822 )
823
824 if default_len:
825 assert spec.defaults
826 kw_args.update(
827 [
828 (arg, default)
829 for arg, default in zip(
830 spec.args[-default_len:], spec.defaults
831 )
832 ]
833 )
834 output: List[str] = []
835
836 output.extend(repr(getattr(obj, arg, None)) for arg in pos_args)
837
838 if vargs is not None and hasattr(obj, vargs):
839 output.extend([repr(val) for val in getattr(obj, vargs)])
840
841 for arg, defval in kw_args.items():
842 if arg in omit_kwarg:
843 continue
844 try:
845 val = getattr(obj, arg, missing)
846 if val is not missing and val != defval:
847 output.append("%s=%r" % (arg, val))
848 except Exception:
849 pass
850
851 if additional_kw:
852 for arg, defval in additional_kw:
853 try:
854 val = getattr(obj, arg, missing)
855 if val is not missing and val != defval:
856 output.append("%s=%r" % (arg, val))
857 except Exception:
858 pass
859
860 return "%s(%s)" % (obj.__class__.__name__, ", ".join(output))
861
862
863def class_hierarchy(cls):
864 """Return an unordered sequence of all classes related to cls.
865
866 Traverses diamond hierarchies.
867
868 Fibs slightly: subclasses of builtin types are not returned. Thus
869 class_hierarchy(class A(object)) returns (A, object), not A plus every
870 class systemwide that derives from object.
871
872 """
873
874 hier = {cls}
875 process = list(cls.__mro__)
876 while process:
877 c = process.pop()
878 bases = (_ for _ in c.__bases__ if _ not in hier)
879
880 for b in bases:
881 process.append(b)
882 hier.add(b)
883
884 if c.__module__ == "builtins" or not hasattr(c, "__subclasses__"):
885 continue
886
887 for s in [
888 _
889 for _ in (
890 c.__subclasses__()
891 if not issubclass(c, type)
892 else c.__subclasses__(c)
893 )
894 if _ not in hier
895 ]:
896 process.append(s)
897 hier.add(s)
898 return list(hier)
899
900
901def iterate_attributes(cls):
902 """iterate all the keys and attributes associated
903 with a class, without using getattr().
904
905 Does not use getattr() so that class-sensitive
906 descriptors (i.e. property.__get__()) are not called.
907
908 """
909 keys = dir(cls)
910 for key in keys:
911 for c in cls.__mro__:
912 if key in c.__dict__:
913 yield (key, c.__dict__[key])
914 break
915
916
917def monkeypatch_proxied_specials(
918 into_cls,
919 from_cls,
920 skip=None,
921 only=None,
922 name="self.proxy",
923 from_instance=None,
924):
925 """Automates delegation of __specials__ for a proxying type."""
926
927 if only:
928 dunders = only
929 else:
930 if skip is None:
931 skip = (
932 "__slots__",
933 "__del__",
934 "__getattribute__",
935 "__metaclass__",
936 "__getstate__",
937 "__setstate__",
938 )
939 dunders = [
940 m
941 for m in dir(from_cls)
942 if (
943 m.startswith("__")
944 and m.endswith("__")
945 and not hasattr(into_cls, m)
946 and m not in skip
947 )
948 ]
949
950 for method in dunders:
951 try:
952 maybe_fn = getattr(from_cls, method)
953 if not hasattr(maybe_fn, "__call__"):
954 continue
955 maybe_fn = getattr(maybe_fn, "__func__", maybe_fn)
956 fn = cast(types.FunctionType, maybe_fn)
957
958 except AttributeError:
959 continue
960 try:
961 spec = compat.inspect_getfullargspec(fn)
962 fn_args = compat.inspect_formatargspec(spec[0])
963 d_args = compat.inspect_formatargspec(spec[0][1:])
964 except TypeError:
965 fn_args = "(self, *args, **kw)"
966 d_args = "(*args, **kw)"
967
968 py = (
969 "def %(method)s%(fn_args)s: "
970 "return %(name)s.%(method)s%(d_args)s" % locals()
971 )
972
973 env: Dict[str, types.FunctionType] = (
974 from_instance is not None and {name: from_instance} or {}
975 )
976 exec(py, env)
977 try:
978 env[method].__defaults__ = fn.__defaults__
979 except AttributeError:
980 pass
981 setattr(into_cls, method, env[method])
982
983
984def methods_equivalent(meth1, meth2):
985 """Return True if the two methods are the same implementation."""
986
987 return getattr(meth1, "__func__", meth1) is getattr(
988 meth2, "__func__", meth2
989 )
990
991
992def as_interface(obj, cls=None, methods=None, required=None):
993 """Ensure basic interface compliance for an instance or dict of callables.
994
995 Checks that ``obj`` implements public methods of ``cls`` or has members
996 listed in ``methods``. If ``required`` is not supplied, implementing at
997 least one interface method is sufficient. Methods present on ``obj`` that
998 are not in the interface are ignored.
999
1000 If ``obj`` is a dict and ``dict`` does not meet the interface
1001 requirements, the keys of the dictionary are inspected. Keys present in
1002 ``obj`` that are not in the interface will raise TypeErrors.
1003
1004 Raises TypeError if ``obj`` does not meet the interface criteria.
1005
1006 In all passing cases, an object with callable members is returned. In the
1007 simple case, ``obj`` is returned as-is; if dict processing kicks in then
1008 an anonymous class is returned.
1009
1010 obj
1011 A type, instance, or dictionary of callables.
1012 cls
1013 Optional, a type. All public methods of cls are considered the
1014 interface. An ``obj`` instance of cls will always pass, ignoring
1015 ``required``..
1016 methods
1017 Optional, a sequence of method names to consider as the interface.
1018 required
1019 Optional, a sequence of mandatory implementations. If omitted, an
1020 ``obj`` that provides at least one interface method is considered
1021 sufficient. As a convenience, required may be a type, in which case
1022 all public methods of the type are required.
1023
1024 """
1025 if not cls and not methods:
1026 raise TypeError("a class or collection of method names are required")
1027
1028 if isinstance(cls, type) and isinstance(obj, cls):
1029 return obj
1030
1031 interface = set(methods or [m for m in dir(cls) if not m.startswith("_")])
1032 implemented = set(dir(obj))
1033
1034 complies = operator.ge
1035 if isinstance(required, type):
1036 required = interface
1037 elif not required:
1038 required = set()
1039 complies = operator.gt
1040 else:
1041 required = set(required)
1042
1043 if complies(implemented.intersection(interface), required):
1044 return obj
1045
1046 # No dict duck typing here.
1047 if not isinstance(obj, dict):
1048 qualifier = complies is operator.gt and "any of" or "all of"
1049 raise TypeError(
1050 "%r does not implement %s: %s"
1051 % (obj, qualifier, ", ".join(interface))
1052 )
1053
1054 class AnonymousInterface:
1055 """A callable-holding shell."""
1056
1057 if cls:
1058 AnonymousInterface.__name__ = "Anonymous" + cls.__name__
1059 found = set()
1060
1061 for method, impl in dictlike_iteritems(obj):
1062 if method not in interface:
1063 raise TypeError("%r: unknown in this interface" % method)
1064 if not callable(impl):
1065 raise TypeError("%r=%r is not callable" % (method, impl))
1066 setattr(AnonymousInterface, method, staticmethod(impl))
1067 found.add(method)
1068
1069 if complies(found, required):
1070 return AnonymousInterface
1071
1072 raise TypeError(
1073 "dictionary does not contain required keys %s"
1074 % ", ".join(required - found)
1075 )
1076
1077
1078_GFD = TypeVar("_GFD", bound="generic_fn_descriptor[Any]")
1079
1080
1081class generic_fn_descriptor(Generic[_T_co]):
1082 """Descriptor which proxies a function when the attribute is not
1083 present in dict
1084
1085 This superclass is organized in a particular way with "memoized" and
1086 "non-memoized" implementation classes that are hidden from type checkers,
1087 as Mypy seems to not be able to handle seeing multiple kinds of descriptor
1088 classes used for the same attribute.
1089
1090 """
1091
1092 fget: Callable[..., _T_co]
1093 __doc__: Optional[str]
1094 __name__: str
1095
1096 def __init__(self, fget: Callable[..., _T_co], doc: Optional[str] = None):
1097 self.fget = fget
1098 self.__doc__ = doc or fget.__doc__
1099 self.__name__ = fget.__name__
1100
1101 @overload
1102 def __get__(self: _GFD, obj: None, cls: Any) -> _GFD: ...
1103
1104 @overload
1105 def __get__(self, obj: object, cls: Any) -> _T_co: ...
1106
1107 def __get__(self: _GFD, obj: Any, cls: Any) -> Union[_GFD, _T_co]:
1108 raise NotImplementedError()
1109
1110 if TYPE_CHECKING:
1111
1112 def __set__(self, instance: Any, value: Any) -> None: ...
1113
1114 def __delete__(self, instance: Any) -> None: ...
1115
1116 def _reset(self, obj: Any) -> None:
1117 raise NotImplementedError()
1118
1119 @classmethod
1120 def reset(cls, obj: Any, name: str) -> None:
1121 raise NotImplementedError()
1122
1123
1124class _non_memoized_property(generic_fn_descriptor[_T_co]):
1125 """a plain descriptor that proxies a function.
1126
1127 primary rationale is to provide a plain attribute that's
1128 compatible with memoized_property which is also recognized as equivalent
1129 by mypy.
1130
1131 """
1132
1133 if not TYPE_CHECKING:
1134
1135 def __get__(self, obj, cls):
1136 if obj is None:
1137 return self
1138 return self.fget(obj)
1139
1140
1141class _memoized_property(generic_fn_descriptor[_T_co]):
1142 """A read-only @property that is only evaluated once."""
1143
1144 if not TYPE_CHECKING:
1145
1146 def __get__(self, obj, cls):
1147 if obj is None:
1148 return self
1149 obj.__dict__[self.__name__] = result = self.fget(obj)
1150 return result
1151
1152 def _reset(self, obj):
1153 _memoized_property.reset(obj, self.__name__)
1154
1155 @classmethod
1156 def reset(cls, obj, name):
1157 obj.__dict__.pop(name, None)
1158
1159
1160# despite many attempts to get Mypy to recognize an overridden descriptor
1161# where one is memoized and the other isn't, there seems to be no reliable
1162# way other than completely deceiving the type checker into thinking there
1163# is just one single descriptor type everywhere. Otherwise, if a superclass
1164# has non-memoized and subclass has memoized, that requires
1165# "class memoized(non_memoized)". but then if a superclass has memoized and
1166# superclass has non-memoized, the class hierarchy of the descriptors
1167# would need to be reversed; "class non_memoized(memoized)". so there's no
1168# way to achieve this.
1169# additional issues, RO properties:
1170# https://github.com/python/mypy/issues/12440
1171if TYPE_CHECKING:
1172 # allow memoized and non-memoized to be freely mixed by having them
1173 # be the same class
1174 memoized_property = generic_fn_descriptor
1175 non_memoized_property = generic_fn_descriptor
1176
1177 # for read only situations, mypy only sees @property as read only.
1178 # read only is needed when a subtype specializes the return type
1179 # of a property, meaning assignment needs to be disallowed
1180 ro_memoized_property = property
1181 ro_non_memoized_property = property
1182
1183else:
1184 memoized_property = ro_memoized_property = _memoized_property
1185 non_memoized_property = ro_non_memoized_property = _non_memoized_property
1186
1187
1188def memoized_instancemethod(fn: _F) -> _F:
1189 """Decorate a method memoize its return value.
1190
1191 Best applied to no-arg methods: memoization is not sensitive to
1192 argument values, and will always return the same value even when
1193 called with different arguments.
1194
1195 """
1196
1197 def oneshot(self, *args, **kw):
1198 result = fn(self, *args, **kw)
1199
1200 def memo(*a, **kw):
1201 return result
1202
1203 memo.__name__ = fn.__name__
1204 memo.__doc__ = fn.__doc__
1205 self.__dict__[fn.__name__] = memo
1206 return result
1207
1208 return update_wrapper(oneshot, fn) # type: ignore
1209
1210
1211class HasMemoized:
1212 """A mixin class that maintains the names of memoized elements in a
1213 collection for easy cache clearing, generative, etc.
1214
1215 """
1216
1217 if not TYPE_CHECKING:
1218 # support classes that want to have __slots__ with an explicit
1219 # slot for __dict__. not sure if that requires base __slots__ here.
1220 __slots__ = ()
1221
1222 _memoized_keys: FrozenSet[str] = frozenset()
1223
1224 def _reset_memoizations(self) -> None:
1225 for elem in self._memoized_keys:
1226 self.__dict__.pop(elem, None)
1227
1228 def _assert_no_memoizations(self) -> None:
1229 for elem in self._memoized_keys:
1230 assert elem not in self.__dict__
1231
1232 def _set_memoized_attribute(self, key: str, value: Any) -> None:
1233 self.__dict__[key] = value
1234 self._memoized_keys |= {key}
1235
1236 class memoized_attribute(memoized_property[_T]):
1237 """A read-only @property that is only evaluated once.
1238
1239 :meta private:
1240
1241 """
1242
1243 fget: Callable[..., _T]
1244 __doc__: Optional[str]
1245 __name__: str
1246
1247 def __init__(self, fget: Callable[..., _T], doc: Optional[str] = None):
1248 self.fget = fget
1249 self.__doc__ = doc or fget.__doc__
1250 self.__name__ = fget.__name__
1251
1252 @overload
1253 def __get__(self: _MA, obj: None, cls: Any) -> _MA: ...
1254
1255 @overload
1256 def __get__(self, obj: Any, cls: Any) -> _T: ...
1257
1258 def __get__(self, obj, cls):
1259 if obj is None:
1260 return self
1261 obj.__dict__[self.__name__] = result = self.fget(obj)
1262 obj._memoized_keys |= {self.__name__}
1263 return result
1264
1265 @classmethod
1266 def memoized_instancemethod(cls, fn: _F) -> _F:
1267 """Decorate a method memoize its return value.
1268
1269 :meta private:
1270
1271 """
1272
1273 def oneshot(self: Any, *args: Any, **kw: Any) -> Any:
1274 result = fn(self, *args, **kw)
1275
1276 def memo(*a, **kw):
1277 return result
1278
1279 memo.__name__ = fn.__name__
1280 memo.__doc__ = fn.__doc__
1281 self.__dict__[fn.__name__] = memo
1282 self._memoized_keys |= {fn.__name__}
1283 return result
1284
1285 return update_wrapper(oneshot, fn) # type: ignore
1286
1287
1288if TYPE_CHECKING:
1289 HasMemoized_ro_memoized_attribute = property
1290else:
1291 HasMemoized_ro_memoized_attribute = HasMemoized.memoized_attribute
1292
1293
1294class MemoizedSlots:
1295 """Apply memoized items to an object using a __getattr__ scheme.
1296
1297 This allows the functionality of memoized_property and
1298 memoized_instancemethod to be available to a class using __slots__.
1299
1300 The memoized get is not threadsafe under freethreading and the
1301 creator method may in extremely rare cases be called more than once.
1302
1303 """
1304
1305 __slots__ = ()
1306
1307 def _fallback_getattr(self, key):
1308 raise AttributeError(key)
1309
1310 def __getattr__(self, key: str) -> Any:
1311 if key.startswith("_memoized_attr_") or key.startswith(
1312 "_memoized_method_"
1313 ):
1314 raise AttributeError(key)
1315 # to avoid recursion errors when interacting with other __getattr__
1316 # schemes that refer to this one, when testing for memoized method
1317 # look at __class__ only rather than going into __getattr__ again.
1318 elif hasattr(self.__class__, f"_memoized_attr_{key}"):
1319 value = getattr(self, f"_memoized_attr_{key}")()
1320 setattr(self, key, value)
1321 return value
1322 elif hasattr(self.__class__, f"_memoized_method_{key}"):
1323 meth = getattr(self, f"_memoized_method_{key}")
1324
1325 def oneshot(*args, **kw):
1326 result = meth(*args, **kw)
1327
1328 def memo(*a, **kw):
1329 return result
1330
1331 memo.__name__ = meth.__name__
1332 memo.__doc__ = meth.__doc__
1333 setattr(self, key, memo)
1334 return result
1335
1336 oneshot.__doc__ = meth.__doc__
1337 return oneshot
1338 else:
1339 return self._fallback_getattr(key)
1340
1341
1342# from paste.deploy.converters
1343def asbool(obj: Any) -> bool:
1344 if isinstance(obj, str):
1345 obj = obj.strip().lower()
1346 if obj in ["true", "yes", "on", "y", "t", "1"]:
1347 return True
1348 elif obj in ["false", "no", "off", "n", "f", "0"]:
1349 return False
1350 else:
1351 raise ValueError("String is not true/false: %r" % obj)
1352 return bool(obj)
1353
1354
1355def bool_or_str(*text: str) -> Callable[[str], Union[str, bool]]:
1356 """Return a callable that will evaluate a string as
1357 boolean, or one of a set of "alternate" string values.
1358
1359 """
1360
1361 def bool_or_value(obj: str) -> Union[str, bool]:
1362 if obj in text:
1363 return obj
1364 else:
1365 return asbool(obj)
1366
1367 return bool_or_value
1368
1369
1370def asint(value: Any) -> Optional[int]:
1371 """Coerce to integer."""
1372
1373 if value is None:
1374 return value
1375 return int(value)
1376
1377
1378def coerce_kw_type(
1379 kw: Dict[str, Any],
1380 key: str,
1381 type_: Type[Any],
1382 flexi_bool: bool = True,
1383 dest: Optional[Dict[str, Any]] = None,
1384) -> None:
1385 r"""If 'key' is present in dict 'kw', coerce its value to type 'type\_' if
1386 necessary. If 'flexi_bool' is True, the string '0' is considered false
1387 when coercing to boolean.
1388 """
1389
1390 if dest is None:
1391 dest = kw
1392
1393 if (
1394 key in kw
1395 and (not isinstance(type_, type) or not isinstance(kw[key], type_))
1396 and kw[key] is not None
1397 ):
1398 if type_ is bool and flexi_bool:
1399 dest[key] = asbool(kw[key])
1400 else:
1401 dest[key] = type_(kw[key])
1402
1403
1404def constructor_key(obj: Any, cls: Type[Any]) -> Tuple[Any, ...]:
1405 """Produce a tuple structure that is cacheable using the __dict__ of
1406 obj to retrieve values
1407
1408 """
1409 names = get_cls_kwargs(cls)
1410 return (cls,) + tuple(
1411 (k, obj.__dict__[k]) for k in names if k in obj.__dict__
1412 )
1413
1414
1415def constructor_copy(obj: _T, cls: Type[_T], *args: Any, **kw: Any) -> _T:
1416 """Instantiate cls using the __dict__ of obj as constructor arguments.
1417
1418 Uses inspect to match the named arguments of ``cls``.
1419
1420 """
1421
1422 names = get_cls_kwargs(cls)
1423 kw.update(
1424 (k, obj.__dict__[k]) for k in names.difference(kw) if k in obj.__dict__
1425 )
1426 return cls(*args, **kw)
1427
1428
1429def counter() -> Callable[[], int]:
1430 """Return a threadsafe counter function."""
1431
1432 lock = threading.Lock()
1433 counter = itertools.count(1)
1434
1435 # avoid the 2to3 "next" transformation...
1436 def _next():
1437 with lock:
1438 return next(counter)
1439
1440 return _next
1441
1442
1443def duck_type_collection(
1444 specimen: Any, default: Optional[Type[Any]] = None
1445) -> Optional[Type[Any]]:
1446 """Given an instance or class, guess if it is or is acting as one of
1447 the basic collection types: list, set and dict. If the __emulates__
1448 property is present, return that preferentially.
1449 """
1450
1451 if hasattr(specimen, "__emulates__"):
1452 # canonicalize set vs sets.Set to a standard: the builtin set
1453 if specimen.__emulates__ is not None and issubclass(
1454 specimen.__emulates__, set
1455 ):
1456 return set
1457 else:
1458 return specimen.__emulates__ # type: ignore
1459
1460 isa = issubclass if isinstance(specimen, type) else isinstance
1461 if isa(specimen, list):
1462 return list
1463 elif isa(specimen, set):
1464 return set
1465 elif isa(specimen, dict):
1466 return dict
1467
1468 if hasattr(specimen, "append"):
1469 return list
1470 elif hasattr(specimen, "add"):
1471 return set
1472 elif hasattr(specimen, "set"):
1473 return dict
1474 else:
1475 return default
1476
1477
1478def assert_arg_type(
1479 arg: Any, argtype: Union[Tuple[Type[Any], ...], Type[Any]], name: str
1480) -> Any:
1481 if isinstance(arg, argtype):
1482 return arg
1483 else:
1484 if isinstance(argtype, tuple):
1485 raise exc.ArgumentError(
1486 "Argument '%s' is expected to be one of type %s, got '%s'"
1487 % (name, " or ".join("'%s'" % a for a in argtype), type(arg))
1488 )
1489 else:
1490 raise exc.ArgumentError(
1491 "Argument '%s' is expected to be of type '%s', got '%s'"
1492 % (name, argtype, type(arg))
1493 )
1494
1495
1496def dictlike_iteritems(dictlike):
1497 """Return a (key, value) iterator for almost any dict-like object."""
1498
1499 if hasattr(dictlike, "items"):
1500 return list(dictlike.items())
1501
1502 getter = getattr(dictlike, "__getitem__", getattr(dictlike, "get", None))
1503 if getter is None:
1504 raise TypeError("Object '%r' is not dict-like" % dictlike)
1505
1506 if hasattr(dictlike, "iterkeys"):
1507
1508 def iterator():
1509 for key in dictlike.iterkeys():
1510 assert getter is not None
1511 yield key, getter(key)
1512
1513 return iterator()
1514 elif hasattr(dictlike, "keys"):
1515 return iter((key, getter(key)) for key in dictlike.keys())
1516 else:
1517 raise TypeError("Object '%r' is not dict-like" % dictlike)
1518
1519
1520class classproperty(property):
1521 """A decorator that behaves like @property except that operates
1522 on classes rather than instances.
1523
1524 The decorator is currently special when using the declarative
1525 module, but note that the
1526 :class:`~.sqlalchemy.ext.declarative.declared_attr`
1527 decorator should be used for this purpose with declarative.
1528
1529 """
1530
1531 fget: Callable[[Any], Any]
1532
1533 def __init__(self, fget: Callable[[Any], Any], *arg: Any, **kw: Any):
1534 super().__init__(fget, *arg, **kw)
1535 self.__doc__ = fget.__doc__
1536
1537 def __get__(self, obj: Any, cls: Optional[type] = None) -> Any:
1538 return self.fget(cls)
1539
1540
1541class hybridproperty(Generic[_T]):
1542 def __init__(self, func: Callable[..., _T]):
1543 self.func = func
1544 self.clslevel = func
1545
1546 def __get__(self, instance: Any, owner: Any) -> _T:
1547 if instance is None:
1548 clsval = self.clslevel(owner)
1549 return clsval
1550 else:
1551 return self.func(instance)
1552
1553 def classlevel(self, func: Callable[..., Any]) -> hybridproperty[_T]:
1554 self.clslevel = func
1555 return self
1556
1557
1558class rw_hybridproperty(Generic[_T]):
1559 def __init__(self, func: Callable[..., _T]):
1560 self.func = func
1561 self.clslevel = func
1562 self.setfn: Optional[Callable[..., Any]] = None
1563
1564 def __get__(self, instance: Any, owner: Any) -> _T:
1565 if instance is None:
1566 clsval = self.clslevel(owner)
1567 return clsval
1568 else:
1569 return self.func(instance)
1570
1571 def __set__(self, instance: Any, value: Any) -> None:
1572 assert self.setfn is not None
1573 self.setfn(instance, value)
1574
1575 def setter(self, func: Callable[..., Any]) -> rw_hybridproperty[_T]:
1576 self.setfn = func
1577 return self
1578
1579 def classlevel(self, func: Callable[..., Any]) -> rw_hybridproperty[_T]:
1580 self.clslevel = func
1581 return self
1582
1583
1584class hybridmethod(Generic[_T]):
1585 """Decorate a function as cls- or instance- level."""
1586
1587 def __init__(self, func: Callable[..., _T]):
1588 self.func = self.__func__ = func
1589 self.clslevel = func
1590
1591 def __get__(self, instance: Any, owner: Any) -> Callable[..., _T]:
1592 if instance is None:
1593 return self.clslevel.__get__(owner, owner.__class__) # type:ignore
1594 else:
1595 return self.func.__get__(instance, owner) # type:ignore
1596
1597 def classlevel(self, func: Callable[..., Any]) -> hybridmethod[_T]:
1598 self.clslevel = func
1599 return self
1600
1601
1602class symbol(int):
1603 """A constant symbol.
1604
1605 >>> symbol("foo") is symbol("foo")
1606 True
1607 >>> symbol("foo")
1608 <symbol 'foo>
1609
1610 A slight refinement of the MAGICCOOKIE=object() pattern. The primary
1611 advantage of symbol() is its repr(). They are also singletons.
1612
1613 Repeated calls of symbol('name') will all return the same instance.
1614
1615 """
1616
1617 name: str
1618
1619 symbols: Dict[str, symbol] = {}
1620 _lock = threading.Lock()
1621
1622 def __new__(
1623 cls,
1624 name: str,
1625 doc: Optional[str] = None,
1626 canonical: Optional[int] = None,
1627 ) -> symbol:
1628 with cls._lock:
1629 sym = cls.symbols.get(name)
1630 if sym is None:
1631 assert isinstance(name, str)
1632 if canonical is None:
1633 canonical = hash(name)
1634 sym = int.__new__(symbol, canonical)
1635 sym.name = name
1636 if doc:
1637 sym.__doc__ = doc
1638
1639 # NOTE: we should ultimately get rid of this global thing,
1640 # however, currently it is to support pickling. The best
1641 # change would be when we are on py3.11 at a minimum, we
1642 # switch to stdlib enum.IntFlag.
1643 cls.symbols[name] = sym
1644 else:
1645 if canonical and canonical != sym:
1646 raise TypeError(
1647 f"Can't replace canonical symbol for {name!r} "
1648 f"with new int value {canonical}"
1649 )
1650 return sym
1651
1652 def __reduce__(self):
1653 return symbol, (self.name, "x", int(self))
1654
1655 def __str__(self):
1656 return repr(self)
1657
1658 def __repr__(self):
1659 return f"symbol({self.name!r})"
1660
1661
1662class _IntFlagMeta(type):
1663 def __init__(
1664 cls,
1665 classname: str,
1666 bases: Tuple[Type[Any], ...],
1667 dict_: Dict[str, Any],
1668 **kw: Any,
1669 ) -> None:
1670 items: List[symbol]
1671 cls._items = items = []
1672 for k, v in dict_.items():
1673 if re.match(r"^__.*__$", k):
1674 continue
1675 if isinstance(v, int):
1676 sym = symbol(k, canonical=v)
1677 elif not k.startswith("_"):
1678 raise TypeError("Expected integer values for IntFlag")
1679 else:
1680 continue
1681 setattr(cls, k, sym)
1682 items.append(sym)
1683
1684 cls.__members__ = _collections.immutabledict(
1685 {sym.name: sym for sym in items}
1686 )
1687
1688 def __iter__(self) -> Iterator[symbol]:
1689 raise NotImplementedError(
1690 "iter not implemented to ensure compatibility with "
1691 "Python 3.11 IntFlag. Please use __members__. See "
1692 "https://github.com/python/cpython/issues/99304"
1693 )
1694
1695
1696class _FastIntFlag(metaclass=_IntFlagMeta):
1697 """An 'IntFlag' copycat that isn't slow when performing bitwise
1698 operations.
1699
1700 the ``FastIntFlag`` class will return ``enum.IntFlag`` under TYPE_CHECKING
1701 and ``_FastIntFlag`` otherwise.
1702
1703 """
1704
1705
1706if TYPE_CHECKING:
1707 from enum import IntFlag
1708
1709 FastIntFlag = IntFlag
1710else:
1711 FastIntFlag = _FastIntFlag
1712
1713
1714_E = TypeVar("_E", bound=enum.Enum)
1715
1716
1717def parse_user_argument_for_enum(
1718 arg: Any,
1719 choices: Dict[_E, List[Any]],
1720 name: str,
1721 resolve_symbol_names: bool = False,
1722) -> Optional[_E]:
1723 """Given a user parameter, parse the parameter into a chosen value
1724 from a list of choice objects, typically Enum values.
1725
1726 The user argument can be a string name that matches the name of a
1727 symbol, or the symbol object itself, or any number of alternate choices
1728 such as True/False/ None etc.
1729
1730 :param arg: the user argument.
1731 :param choices: dictionary of enum values to lists of possible
1732 entries for each.
1733 :param name: name of the argument. Used in an :class:`.ArgumentError`
1734 that is raised if the parameter doesn't match any available argument.
1735
1736 """
1737 for enum_value, choice in choices.items():
1738 if arg is enum_value:
1739 return enum_value
1740 elif resolve_symbol_names and arg == enum_value.name:
1741 return enum_value
1742 elif arg in choice:
1743 return enum_value
1744
1745 if arg is None:
1746 return None
1747
1748 raise exc.ArgumentError(f"Invalid value for '{name}': {arg!r}")
1749
1750
1751_creation_order = 1
1752
1753
1754def set_creation_order(instance: Any) -> None:
1755 """Assign a '_creation_order' sequence to the given instance.
1756
1757 This allows multiple instances to be sorted in order of creation
1758 (typically within a single thread; the counter is not particularly
1759 threadsafe).
1760
1761 """
1762 global _creation_order
1763 instance._creation_order = _creation_order
1764 _creation_order += 1
1765
1766
1767def warn_exception(func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any:
1768 """executes the given function, catches all exceptions and converts to
1769 a warning.
1770
1771 """
1772 try:
1773 return func(*args, **kwargs)
1774 except Exception:
1775 warn("%s('%s') ignored" % sys.exc_info()[0:2])
1776
1777
1778def ellipses_string(value, len_=25):
1779 try:
1780 if len(value) > len_:
1781 return "%s..." % value[0:len_]
1782 else:
1783 return value
1784 except TypeError:
1785 return value
1786
1787
1788class _hash_limit_string(str):
1789 """A string subclass that can only be hashed on a maximum amount
1790 of unique values.
1791
1792 This is used for warnings so that we can send out parameterized warnings
1793 without the __warningregistry__ of the module, or the non-overridable
1794 "once" registry within warnings.py, overloading memory,
1795
1796
1797 """
1798
1799 _hash: int
1800
1801 def __new__(
1802 cls, value: str, num: int, args: Sequence[Any]
1803 ) -> _hash_limit_string:
1804 interpolated = (value % args) + (
1805 " (this warning may be suppressed after %d occurrences)" % num
1806 )
1807 self = super().__new__(cls, interpolated)
1808 self._hash = hash("%s_%d" % (value, hash(interpolated) % num))
1809 return self
1810
1811 def __hash__(self) -> int:
1812 return self._hash
1813
1814 def __eq__(self, other: Any) -> bool:
1815 return hash(self) == hash(other)
1816
1817
1818def warn(msg: str, code: Optional[str] = None) -> None:
1819 """Issue a warning.
1820
1821 If msg is a string, :class:`.exc.SAWarning` is used as
1822 the category.
1823
1824 """
1825 if code:
1826 _warnings_warn(exc.SAWarning(msg, code=code))
1827 else:
1828 _warnings_warn(msg, exc.SAWarning)
1829
1830
1831def warn_limited(msg: str, args: Sequence[Any]) -> None:
1832 """Issue a warning with a parameterized string, limiting the number
1833 of registrations.
1834
1835 """
1836 if args:
1837 msg = _hash_limit_string(msg, 10, args)
1838 _warnings_warn(msg, exc.SAWarning)
1839
1840
1841_warning_tags: Dict[CodeType, Tuple[str, Type[Warning]]] = {}
1842
1843
1844def tag_method_for_warnings(
1845 message: str, category: Type[Warning]
1846) -> Callable[[_F], _F]:
1847 def go(fn):
1848 _warning_tags[fn.__code__] = (message, category)
1849 return fn
1850
1851 return go
1852
1853
1854_not_sa_pattern = re.compile(r"^(?:sqlalchemy\.(?!testing)|alembic\.)")
1855
1856
1857def _warnings_warn(
1858 message: Union[str, Warning],
1859 category: Optional[Type[Warning]] = None,
1860 stacklevel: int = 2,
1861) -> None:
1862
1863 if category is None and isinstance(message, Warning):
1864 category = type(message)
1865
1866 # adjust the given stacklevel to be outside of SQLAlchemy
1867 try:
1868 frame = sys._getframe(stacklevel)
1869 except ValueError:
1870 # being called from less than 3 (or given) stacklevels, weird,
1871 # but don't crash
1872 stacklevel = 0
1873 except:
1874 # _getframe() doesn't work, weird interpreter issue, weird,
1875 # ok, but don't crash
1876 stacklevel = 0
1877 else:
1878 stacklevel_found = warning_tag_found = False
1879 while frame is not None:
1880 # using __name__ here requires that we have __name__ in the
1881 # __globals__ of the decorated string functions we make also.
1882 # we generate this using {"__name__": fn.__module__}
1883 if not stacklevel_found and not re.match(
1884 _not_sa_pattern, frame.f_globals.get("__name__", "")
1885 ):
1886 # stop incrementing stack level if an out-of-SQLA line
1887 # were found.
1888 stacklevel_found = True
1889
1890 # however, for the warning tag thing, we have to keep
1891 # scanning up the whole traceback
1892
1893 if frame.f_code in _warning_tags:
1894 warning_tag_found = True
1895 (_suffix, _category) = _warning_tags[frame.f_code]
1896 category = category or _category
1897 message = f"{message} ({_suffix})"
1898
1899 frame = frame.f_back # type: ignore[assignment]
1900
1901 if not stacklevel_found:
1902 stacklevel += 1
1903 elif stacklevel_found and warning_tag_found:
1904 break
1905
1906 if category is not None:
1907 warnings.warn(message, category, stacklevel=stacklevel + 1)
1908 else:
1909 warnings.warn(message, stacklevel=stacklevel + 1)
1910
1911
1912def only_once(
1913 fn: Callable[..., _T], retry_on_exception: bool
1914) -> Callable[..., Optional[_T]]:
1915 """Decorate the given function to be a no-op after it is called exactly
1916 once."""
1917
1918 once = [fn]
1919
1920 def go(*arg: Any, **kw: Any) -> Optional[_T]:
1921 # strong reference fn so that it isn't garbage collected,
1922 # which interferes with the event system's expectations
1923 strong_fn = fn # noqa
1924 if once:
1925 once_fn = once.pop()
1926 try:
1927 return once_fn(*arg, **kw)
1928 except:
1929 if retry_on_exception:
1930 once.insert(0, once_fn)
1931 raise
1932
1933 return None
1934
1935 return go
1936
1937
1938_SQLA_RE = re.compile(r"sqlalchemy/([a-z_]+/){0,2}[a-z_]+\.py")
1939_UNITTEST_RE = re.compile(r"unit(?:2|test2?/)")
1940
1941
1942def chop_traceback(
1943 tb: List[str],
1944 exclude_prefix: re.Pattern[str] = _UNITTEST_RE,
1945 exclude_suffix: re.Pattern[str] = _SQLA_RE,
1946) -> List[str]:
1947 """Chop extraneous lines off beginning and end of a traceback.
1948
1949 :param tb:
1950 a list of traceback lines as returned by ``traceback.format_stack()``
1951
1952 :param exclude_prefix:
1953 a regular expression object matching lines to skip at beginning of
1954 ``tb``
1955
1956 :param exclude_suffix:
1957 a regular expression object matching lines to skip at end of ``tb``
1958 """
1959 start = 0
1960 end = len(tb) - 1
1961 while start <= end and exclude_prefix.search(tb[start]):
1962 start += 1
1963 while start <= end and exclude_suffix.search(tb[end]):
1964 end -= 1
1965 return tb[start : end + 1]
1966
1967
1968def attrsetter(attrname):
1969 code = "def set(obj, value): obj.%s = value" % attrname
1970 env = locals().copy()
1971 exec(code, env)
1972 return env["set"]
1973
1974
1975_dunders = re.compile("^__.+__$")
1976
1977
1978class TypingOnly:
1979 """A mixin class that marks a class as 'typing only', meaning it has
1980 absolutely no methods, attributes, or runtime functionality whatsoever.
1981
1982 """
1983
1984 __slots__ = ()
1985
1986 def __init_subclass__(cls) -> None:
1987 if TypingOnly in cls.__bases__:
1988 remaining = {
1989 name for name in cls.__dict__ if not _dunders.match(name)
1990 }
1991 if remaining:
1992 raise AssertionError(
1993 f"Class {cls} directly inherits TypingOnly but has "
1994 f"additional attributes {remaining}."
1995 )
1996 super().__init_subclass__()
1997
1998
1999class EnsureKWArg:
2000 r"""Apply translation of functions to accept \**kw arguments if they
2001 don't already.
2002
2003 Used to ensure cross-compatibility with third party legacy code, for things
2004 like compiler visit methods that need to accept ``**kw`` arguments,
2005 but may have been copied from old code that didn't accept them.
2006
2007 """
2008
2009 ensure_kwarg: str
2010 """a regular expression that indicates method names for which the method
2011 should accept ``**kw`` arguments.
2012
2013 The class will scan for methods matching the name template and decorate
2014 them if necessary to ensure ``**kw`` parameters are accepted.
2015
2016 """
2017
2018 def __init_subclass__(cls) -> None:
2019 fn_reg = cls.ensure_kwarg
2020 clsdict = cls.__dict__
2021 if fn_reg:
2022 for key in clsdict:
2023 m = re.match(fn_reg, key)
2024 if m:
2025 fn = clsdict[key]
2026 spec = compat.inspect_getfullargspec(fn)
2027 if not spec.varkw:
2028 wrapped = cls._wrap_w_kw(fn)
2029 setattr(cls, key, wrapped)
2030 super().__init_subclass__()
2031
2032 @classmethod
2033 def _wrap_w_kw(cls, fn: Callable[..., Any]) -> Callable[..., Any]:
2034 def wrap(*arg: Any, **kw: Any) -> Any:
2035 return fn(*arg)
2036
2037 return update_wrapper(wrap, fn)
2038
2039
2040def wrap_callable(wrapper, fn):
2041 """Augment functools.update_wrapper() to work with objects with
2042 a ``__call__()`` method.
2043
2044 :param fn:
2045 object with __call__ method
2046
2047 """
2048 if hasattr(fn, "__name__"):
2049 return update_wrapper(wrapper, fn)
2050 else:
2051 _f = wrapper
2052 _f.__name__ = fn.__class__.__name__
2053 if hasattr(fn, "__module__"):
2054 _f.__module__ = fn.__module__
2055
2056 if hasattr(fn.__call__, "__doc__") and fn.__call__.__doc__:
2057 _f.__doc__ = fn.__call__.__doc__
2058 elif fn.__doc__:
2059 _f.__doc__ = fn.__doc__
2060
2061 return _f
2062
2063
2064def quoted_token_parser(value):
2065 """Parse a dotted identifier with accommodation for quoted names.
2066
2067 Includes support for SQL-style double quotes as a literal character.
2068
2069 E.g.::
2070
2071 >>> quoted_token_parser("name")
2072 ["name"]
2073 >>> quoted_token_parser("schema.name")
2074 ["schema", "name"]
2075 >>> quoted_token_parser('"Schema"."Name"')
2076 ['Schema', 'Name']
2077 >>> quoted_token_parser('"Schema"."Name""Foo"')
2078 ['Schema', 'Name""Foo']
2079
2080 """
2081
2082 if '"' not in value:
2083 return value.split(".")
2084
2085 # 0 = outside of quotes
2086 # 1 = inside of quotes
2087 state = 0
2088 result: List[List[str]] = [[]]
2089 idx = 0
2090 lv = len(value)
2091 while idx < lv:
2092 char = value[idx]
2093 if char == '"':
2094 if state == 1 and idx < lv - 1 and value[idx + 1] == '"':
2095 result[-1].append('"')
2096 idx += 1
2097 else:
2098 state ^= 1
2099 elif char == "." and state == 0:
2100 result.append([])
2101 else:
2102 result[-1].append(char)
2103 idx += 1
2104
2105 return ["".join(token) for token in result]
2106
2107
2108def add_parameter_text(params: Any, text: str) -> Callable[[_F], _F]:
2109 params = _collections.to_list(params)
2110
2111 def decorate(fn):
2112 doc = fn.__doc__ is not None and fn.__doc__ or ""
2113 if doc:
2114 doc = inject_param_text(doc, {param: text for param in params})
2115 fn.__doc__ = doc
2116 return fn
2117
2118 return decorate
2119
2120
2121def _dedent_docstring(text: str) -> str:
2122 split_text = text.split("\n", 1)
2123 if len(split_text) == 1:
2124 return text
2125 else:
2126 firstline, remaining = split_text
2127 if not firstline.startswith(" "):
2128 return firstline + "\n" + textwrap.dedent(remaining)
2129 else:
2130 return textwrap.dedent(text)
2131
2132
2133def inject_docstring_text(
2134 given_doctext: Optional[str], injecttext: str, pos: int
2135) -> str:
2136 doctext: str = _dedent_docstring(given_doctext or "")
2137 lines = doctext.split("\n")
2138 if len(lines) == 1:
2139 lines.append("")
2140 injectlines = textwrap.dedent(injecttext).split("\n")
2141 if injectlines[0]:
2142 injectlines.insert(0, "")
2143
2144 blanks = [num for num, line in enumerate(lines) if not line.strip()]
2145 blanks.insert(0, 0)
2146
2147 inject_pos = blanks[min(pos, len(blanks) - 1)]
2148
2149 lines = lines[0:inject_pos] + injectlines + lines[inject_pos:]
2150 return "\n".join(lines)
2151
2152
2153_param_reg = re.compile(r"(\s+):param (.+?):")
2154
2155
2156def inject_param_text(doctext: str, inject_params: Dict[str, str]) -> str:
2157 doclines = collections.deque(doctext.splitlines())
2158 lines = []
2159
2160 # TODO: this is not working for params like ":param case_sensitive=True:"
2161
2162 to_inject = None
2163 while doclines:
2164 line = doclines.popleft()
2165
2166 m = _param_reg.match(line)
2167
2168 if to_inject is None:
2169 if m:
2170 param = m.group(2).lstrip("*")
2171 if param in inject_params:
2172 # default indent to that of :param: plus one
2173 indent = " " * len(m.group(1)) + " "
2174
2175 # but if the next line has text, use that line's
2176 # indentation
2177 if doclines:
2178 m2 = re.match(r"(\s+)\S", doclines[0])
2179 if m2:
2180 indent = " " * len(m2.group(1))
2181
2182 to_inject = indent + inject_params[param]
2183 elif m:
2184 lines.extend(["\n", to_inject, "\n"])
2185 to_inject = None
2186 elif not line.rstrip():
2187 lines.extend([line, to_inject, "\n"])
2188 to_inject = None
2189 elif line.endswith("::"):
2190 # TODO: this still won't cover if the code example itself has
2191 # blank lines in it, need to detect those via indentation.
2192 lines.extend([line, doclines.popleft()])
2193 continue
2194 lines.append(line)
2195
2196 return "\n".join(lines)
2197
2198
2199def repr_tuple_names(names: List[str]) -> Optional[str]:
2200 """Trims a list of strings from the middle and return a string of up to
2201 four elements. Strings greater than 11 characters will be truncated"""
2202 if len(names) == 0:
2203 return None
2204 flag = len(names) <= 4
2205 names = names[0:4] if flag else names[0:3] + names[-1:]
2206 res = ["%s.." % name[:11] if len(name) > 11 else name for name in names]
2207 if flag:
2208 return ", ".join(res)
2209 else:
2210 return "%s, ..., %s" % (", ".join(res[0:3]), res[-1])
2211
2212
2213def has_compiled_ext(raise_=False):
2214 from ._has_cython import HAS_CYEXTENSION
2215
2216 if HAS_CYEXTENSION:
2217 return True
2218 elif raise_:
2219 raise ImportError(
2220 "cython extensions were expected to be installed, "
2221 "but are not present"
2222 )
2223 else:
2224 return False
2225
2226
2227def load_uncompiled_module(module: _M) -> _M:
2228 """Load the non-compied version of a module that is also
2229 compiled with cython.
2230 """
2231 full_name = module.__name__
2232 assert module.__spec__
2233 parent_name = module.__spec__.parent
2234 assert parent_name
2235 parent_module = sys.modules[parent_name]
2236 assert parent_module.__spec__
2237 package_path = parent_module.__spec__.origin
2238 assert package_path and package_path.endswith("__init__.py")
2239
2240 name = full_name.split(".")[-1]
2241 module_path = package_path.replace("__init__.py", f"{name}.py")
2242
2243 py_spec = importlib.util.spec_from_file_location(full_name, module_path)
2244 assert py_spec
2245 py_module = importlib.util.module_from_spec(py_spec)
2246 assert py_spec.loader
2247 py_spec.loader.exec_module(py_module)
2248 return cast(_M, py_module)
2249
2250
2251class _Missing(enum.Enum):
2252 Missing = enum.auto()
2253
2254
2255Missing = _Missing.Missing
2256MissingOr = Union[_T, Literal[_Missing.Missing]]