1# util/compat.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Handle Python version/platform incompatibilities."""
10
11from __future__ import annotations
12
13import base64
14import dataclasses
15import hashlib
16import inspect
17import operator
18import platform
19import sys
20import sysconfig
21import typing
22from typing import Any
23from typing import Callable
24from typing import Dict
25from typing import Iterable
26from typing import List
27from typing import Mapping
28from typing import Optional
29from typing import Sequence
30from typing import Set
31from typing import Tuple
32from typing import Type
33from typing import TypeVar
34
35
36py314b1 = sys.version_info >= (3, 14, 0, "beta", 1)
37py314 = sys.version_info >= (3, 14)
38py313 = sys.version_info >= (3, 13)
39py312 = sys.version_info >= (3, 12)
40py311 = sys.version_info >= (3, 11)
41py310 = sys.version_info >= (3, 10)
42py39 = sys.version_info >= (3, 9)
43py38 = sys.version_info >= (3, 8)
44pypy = platform.python_implementation() == "PyPy"
45cpython = platform.python_implementation() == "CPython"
46freethreading = bool(sysconfig.get_config_var("Py_GIL_DISABLED"))
47
48win32 = sys.platform.startswith("win")
49osx = sys.platform.startswith("darwin")
50arm = "aarch" in platform.machine().lower()
51is64bit = sys.maxsize > 2**32
52
53has_refcount_gc = bool(cpython)
54
55dottedgetter = operator.attrgetter
56
57_T_co = TypeVar("_T_co", covariant=True)
58
59
60class FullArgSpec(typing.NamedTuple):
61 args: List[str]
62 varargs: Optional[str]
63 varkw: Optional[str]
64 defaults: Optional[Tuple[Any, ...]]
65 kwonlyargs: List[str]
66 kwonlydefaults: Optional[Dict[str, Any]]
67 annotations: Dict[str, Any]
68
69
70def inspect_getfullargspec(func: Callable[..., Any]) -> FullArgSpec:
71 """Fully vendored version of getfullargspec from Python 3.3."""
72
73 if inspect.ismethod(func):
74 func = func.__func__
75 if not inspect.isfunction(func):
76 raise TypeError(f"{func!r} is not a Python function")
77
78 co = func.__code__
79 if not inspect.iscode(co):
80 raise TypeError(f"{co!r} is not a code object")
81
82 nargs = co.co_argcount
83 names = co.co_varnames
84 nkwargs = co.co_kwonlyargcount
85 args = list(names[:nargs])
86 kwonlyargs = list(names[nargs : nargs + nkwargs])
87
88 nargs += nkwargs
89 varargs = None
90 if co.co_flags & inspect.CO_VARARGS:
91 varargs = co.co_varnames[nargs]
92 nargs = nargs + 1
93 varkw = None
94 if co.co_flags & inspect.CO_VARKEYWORDS:
95 varkw = co.co_varnames[nargs]
96
97 return FullArgSpec(
98 args,
99 varargs,
100 varkw,
101 func.__defaults__,
102 kwonlyargs,
103 func.__kwdefaults__,
104 func.__annotations__,
105 )
106
107
108if py39:
109 # python stubs don't have a public type for this. not worth
110 # making a protocol
111 def md5_not_for_security() -> Any:
112 return hashlib.md5(usedforsecurity=False)
113
114else:
115
116 def md5_not_for_security() -> Any:
117 return hashlib.md5()
118
119
120if typing.TYPE_CHECKING or py38:
121 from importlib import metadata as importlib_metadata
122else:
123 import importlib_metadata # noqa
124
125
126if typing.TYPE_CHECKING or py39:
127 # pep 584 dict union
128 dict_union = operator.or_ # noqa
129else:
130
131 def dict_union(a: dict, b: dict) -> dict:
132 a = a.copy()
133 a.update(b)
134 return a
135
136
137if py310:
138 anext_ = anext
139else:
140 _NOT_PROVIDED = object()
141 from collections.abc import AsyncIterator
142
143 async def anext_(async_iterator, default=_NOT_PROVIDED):
144 """vendored from https://github.com/python/cpython/pull/8895"""
145
146 if not isinstance(async_iterator, AsyncIterator):
147 raise TypeError(
148 f"anext expected an AsyncIterator, got {type(async_iterator)}"
149 )
150 anxt = type(async_iterator).__anext__
151 try:
152 return await anxt(async_iterator)
153 except StopAsyncIteration:
154 if default is _NOT_PROVIDED:
155 raise
156 return default
157
158
159def importlib_metadata_get(group):
160 ep = importlib_metadata.entry_points()
161 if typing.TYPE_CHECKING or hasattr(ep, "select"):
162 return ep.select(group=group)
163 else:
164 return ep.get(group, ())
165
166
167def b(s):
168 return s.encode("latin-1")
169
170
171def b64decode(x: str) -> bytes:
172 return base64.b64decode(x.encode("ascii"))
173
174
175def b64encode(x: bytes) -> str:
176 return base64.b64encode(x).decode("ascii")
177
178
179def decode_backslashreplace(text: bytes, encoding: str) -> str:
180 return text.decode(encoding, errors="backslashreplace")
181
182
183def cmp(a, b):
184 return (a > b) - (a < b)
185
186
187def _formatannotation(annotation, base_module=None):
188 """vendored from python 3.7"""
189
190 if isinstance(annotation, str):
191 return annotation
192
193 if getattr(annotation, "__module__", None) == "typing":
194 return repr(annotation).replace("typing.", "").replace("~", "")
195 if isinstance(annotation, type):
196 if annotation.__module__ in ("builtins", base_module):
197 return repr(annotation.__qualname__)
198 return annotation.__module__ + "." + annotation.__qualname__
199 elif isinstance(annotation, typing.TypeVar):
200 return repr(annotation).replace("~", "")
201 return repr(annotation).replace("~", "")
202
203
204def inspect_formatargspec(
205 args: List[str],
206 varargs: Optional[str] = None,
207 varkw: Optional[str] = None,
208 defaults: Optional[Sequence[Any]] = None,
209 kwonlyargs: Optional[Sequence[str]] = (),
210 kwonlydefaults: Optional[Mapping[str, Any]] = {},
211 annotations: Mapping[str, Any] = {},
212 formatarg: Callable[[str], str] = str,
213 formatvarargs: Callable[[str], str] = lambda name: "*" + name,
214 formatvarkw: Callable[[str], str] = lambda name: "**" + name,
215 formatvalue: Callable[[Any], str] = lambda value: "=" + repr(value),
216 formatreturns: Callable[[Any], str] = lambda text: " -> " + str(text),
217 formatannotation: Callable[[Any], str] = _formatannotation,
218) -> str:
219 """Copy formatargspec from python 3.7 standard library.
220
221 Python 3 has deprecated formatargspec and requested that Signature
222 be used instead, however this requires a full reimplementation
223 of formatargspec() in terms of creating Parameter objects and such.
224 Instead of introducing all the object-creation overhead and having
225 to reinvent from scratch, just copy their compatibility routine.
226
227 Ultimately we would need to rewrite our "decorator" routine completely
228 which is not really worth it right now, until all Python 2.x support
229 is dropped.
230
231 """
232
233 kwonlydefaults = kwonlydefaults or {}
234 annotations = annotations or {}
235
236 def formatargandannotation(arg):
237 result = formatarg(arg)
238 if arg in annotations:
239 result += ": " + formatannotation(annotations[arg])
240 return result
241
242 specs = []
243 if defaults:
244 firstdefault = len(args) - len(defaults)
245 else:
246 firstdefault = -1
247
248 for i, arg in enumerate(args):
249 spec = formatargandannotation(arg)
250 if defaults and i >= firstdefault:
251 spec = spec + formatvalue(defaults[i - firstdefault])
252 specs.append(spec)
253
254 if varargs is not None:
255 specs.append(formatvarargs(formatargandannotation(varargs)))
256 else:
257 if kwonlyargs:
258 specs.append("*")
259
260 if kwonlyargs:
261 for kwonlyarg in kwonlyargs:
262 spec = formatargandannotation(kwonlyarg)
263 if kwonlydefaults and kwonlyarg in kwonlydefaults:
264 spec += formatvalue(kwonlydefaults[kwonlyarg])
265 specs.append(spec)
266
267 if varkw is not None:
268 specs.append(formatvarkw(formatargandannotation(varkw)))
269
270 result = "(" + ", ".join(specs) + ")"
271 if "return" in annotations:
272 result += formatreturns(formatannotation(annotations["return"]))
273 return result
274
275
276def dataclass_fields(cls: Type[Any]) -> Iterable[dataclasses.Field[Any]]:
277 """Return a sequence of all dataclasses.Field objects associated
278 with a class as an already processed dataclass.
279
280 The class must **already be a dataclass** for Field objects to be returned.
281
282 """
283
284 if dataclasses.is_dataclass(cls):
285 return dataclasses.fields(cls)
286 else:
287 return []
288
289
290def local_dataclass_fields(cls: Type[Any]) -> Iterable[dataclasses.Field[Any]]:
291 """Return a sequence of all dataclasses.Field objects associated with
292 an already processed dataclass, excluding those that originate from a
293 superclass.
294
295 The class must **already be a dataclass** for Field objects to be returned.
296
297 """
298
299 if dataclasses.is_dataclass(cls):
300 super_fields: Set[dataclasses.Field[Any]] = set()
301 for sup in cls.__bases__:
302 super_fields.update(dataclass_fields(sup))
303 return [f for f in dataclasses.fields(cls) if f not in super_fields]
304 else:
305 return []
306
307
308if freethreading:
309 import threading
310
311 mini_gil = threading.RLock()
312 """provide a threading.RLock() under python freethreading only"""
313else:
314 import contextlib
315
316 mini_gil = contextlib.nullcontext() # type: ignore[assignment]