1# util/compat.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Handle Python version/platform incompatibilities."""
10
11from __future__ import annotations
12
13import base64
14import dataclasses
15import hashlib
16from importlib import metadata as importlib_metadata
17import inspect
18import operator
19import platform
20import sys
21import typing
22from typing import Any
23from typing import Callable
24from typing import Dict
25from typing import Iterable
26from typing import List
27from typing import Mapping
28from typing import Optional
29from typing import Sequence
30from typing import Set
31from typing import Tuple
32from typing import Type
33
34py314b1 = sys.version_info >= (3, 14, 0, "beta", 1)
35py314 = sys.version_info >= (3, 14)
36py313 = sys.version_info >= (3, 13)
37py312 = sys.version_info >= (3, 12)
38py311 = sys.version_info >= (3, 11)
39py310 = sys.version_info >= (3, 10)
40pypy = platform.python_implementation() == "PyPy"
41cpython = platform.python_implementation() == "CPython"
42
43win32 = sys.platform.startswith("win")
44osx = sys.platform.startswith("darwin")
45arm = "aarch" in platform.machine().lower()
46is64bit = sys.maxsize > 2**32
47
48has_refcount_gc = bool(cpython)
49
50dottedgetter = operator.attrgetter
51
52
53class FullArgSpec(typing.NamedTuple):
54 args: List[str]
55 varargs: Optional[str]
56 varkw: Optional[str]
57 defaults: Optional[Tuple[Any, ...]]
58 kwonlyargs: List[str]
59 kwonlydefaults: Optional[Dict[str, Any]]
60 annotations: Dict[str, Any]
61
62
63def inspect_getfullargspec(func: Callable[..., Any]) -> FullArgSpec:
64 """Fully vendored version of getfullargspec from Python 3.3."""
65
66 if inspect.ismethod(func):
67 func = func.__func__
68 if not inspect.isfunction(func):
69 raise TypeError(f"{func!r} is not a Python function")
70
71 co = func.__code__
72 if not inspect.iscode(co):
73 raise TypeError(f"{co!r} is not a code object")
74
75 nargs = co.co_argcount
76 names = co.co_varnames
77 nkwargs = co.co_kwonlyargcount
78 args = list(names[:nargs])
79 kwonlyargs = list(names[nargs : nargs + nkwargs])
80
81 nargs += nkwargs
82 varargs = None
83 if co.co_flags & inspect.CO_VARARGS:
84 varargs = co.co_varnames[nargs]
85 nargs = nargs + 1
86 varkw = None
87 if co.co_flags & inspect.CO_VARKEYWORDS:
88 varkw = co.co_varnames[nargs]
89
90 return FullArgSpec(
91 args,
92 varargs,
93 varkw,
94 func.__defaults__,
95 kwonlyargs,
96 func.__kwdefaults__,
97 func.__annotations__,
98 )
99
100
101# python stubs don't have a public type for this. not worth
102# making a protocol
103def md5_not_for_security() -> Any:
104 return hashlib.md5(usedforsecurity=False)
105
106
107if py310:
108 anext_ = anext
109else:
110 _NOT_PROVIDED = object()
111 from collections.abc import AsyncIterator
112
113 async def anext_(async_iterator, default=_NOT_PROVIDED):
114 """vendored from https://github.com/python/cpython/pull/8895"""
115
116 if not isinstance(async_iterator, AsyncIterator):
117 raise TypeError(
118 f"anext expected an AsyncIterator, got {type(async_iterator)}"
119 )
120 anxt = type(async_iterator).__anext__
121 try:
122 return await anxt(async_iterator)
123 except StopAsyncIteration:
124 if default is _NOT_PROVIDED:
125 raise
126 return default
127
128
129def importlib_metadata_get(group):
130 ep = importlib_metadata.entry_points()
131 if typing.TYPE_CHECKING or hasattr(ep, "select"):
132 return ep.select(group=group)
133 else:
134 return ep.get(group, ())
135
136
137def b(s):
138 return s.encode("latin-1")
139
140
141def b64decode(x: str) -> bytes:
142 return base64.b64decode(x.encode("ascii"))
143
144
145def b64encode(x: bytes) -> str:
146 return base64.b64encode(x).decode("ascii")
147
148
149def decode_backslashreplace(text: bytes, encoding: str) -> str:
150 return text.decode(encoding, errors="backslashreplace")
151
152
153def cmp(a, b):
154 return (a > b) - (a < b)
155
156
157def _formatannotation(annotation, base_module=None):
158 """vendored from python 3.7"""
159
160 if isinstance(annotation, str):
161 return annotation
162
163 if getattr(annotation, "__module__", None) == "typing":
164 return repr(annotation).replace("typing.", "").replace("~", "")
165 if isinstance(annotation, type):
166 if annotation.__module__ in ("builtins", base_module):
167 return repr(annotation.__qualname__)
168 return annotation.__module__ + "." + annotation.__qualname__
169 elif isinstance(annotation, typing.TypeVar):
170 return repr(annotation).replace("~", "")
171 return repr(annotation).replace("~", "")
172
173
174def inspect_formatargspec(
175 args: List[str],
176 varargs: Optional[str] = None,
177 varkw: Optional[str] = None,
178 defaults: Optional[Sequence[Any]] = None,
179 kwonlyargs: Optional[Sequence[str]] = (),
180 kwonlydefaults: Optional[Mapping[str, Any]] = {},
181 annotations: Mapping[str, Any] = {},
182 formatarg: Callable[[str], str] = str,
183 formatvarargs: Callable[[str], str] = lambda name: "*" + name,
184 formatvarkw: Callable[[str], str] = lambda name: "**" + name,
185 formatvalue: Callable[[Any], str] = lambda value: "=" + repr(value),
186 formatreturns: Callable[[Any], str] = lambda text: " -> " + str(text),
187 formatannotation: Callable[[Any], str] = _formatannotation,
188) -> str:
189 """Copy formatargspec from python 3.7 standard library.
190
191 Python 3 has deprecated formatargspec and requested that Signature
192 be used instead, however this requires a full reimplementation
193 of formatargspec() in terms of creating Parameter objects and such.
194 Instead of introducing all the object-creation overhead and having
195 to reinvent from scratch, just copy their compatibility routine.
196
197 Ultimately we would need to rewrite our "decorator" routine completely
198 which is not really worth it right now, until all Python 2.x support
199 is dropped.
200
201 """
202
203 kwonlydefaults = kwonlydefaults or {}
204 annotations = annotations or {}
205
206 def formatargandannotation(arg):
207 result = formatarg(arg)
208 if arg in annotations:
209 result += ": " + formatannotation(annotations[arg])
210 return result
211
212 specs = []
213 if defaults:
214 firstdefault = len(args) - len(defaults)
215 else:
216 firstdefault = -1
217
218 for i, arg in enumerate(args):
219 spec = formatargandannotation(arg)
220 if defaults and i >= firstdefault:
221 spec = spec + formatvalue(defaults[i - firstdefault])
222 specs.append(spec)
223
224 if varargs is not None:
225 specs.append(formatvarargs(formatargandannotation(varargs)))
226 else:
227 if kwonlyargs:
228 specs.append("*")
229
230 if kwonlyargs:
231 for kwonlyarg in kwonlyargs:
232 spec = formatargandannotation(kwonlyarg)
233 if kwonlydefaults and kwonlyarg in kwonlydefaults:
234 spec += formatvalue(kwonlydefaults[kwonlyarg])
235 specs.append(spec)
236
237 if varkw is not None:
238 specs.append(formatvarkw(formatargandannotation(varkw)))
239
240 result = "(" + ", ".join(specs) + ")"
241 if "return" in annotations:
242 result += formatreturns(formatannotation(annotations["return"]))
243 return result
244
245
246def dataclass_fields(cls: Type[Any]) -> Iterable[dataclasses.Field[Any]]:
247 """Return a sequence of all dataclasses.Field objects associated
248 with a class as an already processed dataclass.
249
250 The class must **already be a dataclass** for Field objects to be returned.
251
252 """
253
254 if dataclasses.is_dataclass(cls):
255 return dataclasses.fields(cls)
256 else:
257 return []
258
259
260def local_dataclass_fields(cls: Type[Any]) -> Iterable[dataclasses.Field[Any]]:
261 """Return a sequence of all dataclasses.Field objects associated with
262 an already processed dataclass, excluding those that originate from a
263 superclass.
264
265 The class must **already be a dataclass** for Field objects to be returned.
266
267 """
268
269 if dataclasses.is_dataclass(cls):
270 super_fields: Set[dataclasses.Field[Any]] = set()
271 for sup in cls.__bases__:
272 super_fields.update(dataclass_fields(sup))
273 return [f for f in dataclasses.fields(cls) if f not in super_fields]
274 else:
275 return []