1# util/compat.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Handle Python version/platform incompatibilities."""
10
11from __future__ import annotations
12
13import base64
14import dataclasses
15import hashlib
16import inspect
17import operator
18import platform
19import sys
20import sysconfig
21import typing
22from typing import Any
23from typing import Callable
24from typing import cast
25from typing import Dict
26from typing import Iterable
27from typing import List
28from typing import Mapping
29from typing import Optional
30from typing import Sequence
31from typing import Set
32from typing import Tuple
33from typing import Type
34from typing import TypeVar
35
36py314b1 = sys.version_info >= (3, 14, 0, "beta", 1)
37py314 = sys.version_info >= (3, 14)
38py313 = sys.version_info >= (3, 13)
39py312 = sys.version_info >= (3, 12)
40py311 = sys.version_info >= (3, 11)
41py310 = sys.version_info >= (3, 10)
42py39 = sys.version_info >= (3, 9)
43py38 = sys.version_info >= (3, 8)
44pypy = platform.python_implementation() == "PyPy"
45cpython = platform.python_implementation() == "CPython"
46freethreading = bool(sysconfig.get_config_var("Py_GIL_DISABLED"))
47
48win32 = sys.platform.startswith("win")
49osx = sys.platform.startswith("darwin")
50arm = "aarch" in platform.machine().lower()
51is64bit = sys.maxsize > 2**32
52
53has_refcount_gc = bool(cpython)
54
55dottedgetter = operator.attrgetter
56
57_T_co = TypeVar("_T_co", covariant=True)
58
59
60if py314:
61 # vendor a minimal form of get_annotations per
62 # https://github.com/python/cpython/issues/133684#issuecomment-2863841891
63
64 from annotationlib import call_annotate_function # type: ignore[import-not-found,unused-ignore] # noqa: E501
65 from annotationlib import Format
66
67 def _get_and_call_annotate(obj, format): # noqa: A002
68 annotate = getattr(obj, "__annotate__", None)
69 if annotate is not None:
70 ann = call_annotate_function(annotate, format, owner=obj)
71 if not isinstance(ann, dict):
72 raise ValueError(f"{obj!r}.__annotate__ returned a non-dict")
73 return ann
74 return None
75
76 # this is ported from py3.13.0a7
77 _BASE_GET_ANNOTATIONS = type.__dict__["__annotations__"].__get__
78
79 def _get_dunder_annotations(obj):
80 if isinstance(obj, type):
81 try:
82 ann = _BASE_GET_ANNOTATIONS(obj)
83 except AttributeError:
84 # For static types, the descriptor raises AttributeError.
85 return {}
86 else:
87 ann = getattr(obj, "__annotations__", None)
88 if ann is None:
89 return {}
90
91 if not isinstance(ann, dict):
92 raise ValueError(
93 f"{obj!r}.__annotations__ is neither a dict nor None"
94 )
95 return dict(ann)
96
97 def _vendored_get_annotations(
98 obj: Any, *, format: Format # noqa: A002
99 ) -> Mapping[str, Any]:
100 """A sparse implementation of annotationlib.get_annotations()"""
101
102 try:
103 ann = _get_dunder_annotations(obj)
104 except Exception:
105 pass
106 else:
107 if ann is not None:
108 return dict(ann)
109
110 # But if __annotations__ threw a NameError, we try calling __annotate__
111 ann = _get_and_call_annotate(obj, format)
112 if ann is None:
113 # If that didn't work either, we have a very weird object:
114 # evaluating
115 # __annotations__ threw NameError and there is no __annotate__.
116 # In that case,
117 # we fall back to trying __annotations__ again.
118 ann = _get_dunder_annotations(obj)
119
120 if ann is None:
121 if isinstance(obj, type) or callable(obj):
122 return {}
123 raise TypeError(f"{obj!r} does not have annotations")
124
125 if not ann:
126 return {}
127
128 return dict(ann)
129
130 def get_annotations(obj: Any) -> Mapping[str, Any]:
131 # FORWARDREF has the effect of giving us ForwardRefs and not
132 # actually trying to evaluate the annotations. We need this so
133 # that the annotations act as much like
134 # "from __future__ import annotations" as possible, which is going
135 # away in future python as a separate mode
136 return _vendored_get_annotations(obj, format=Format.FORWARDREF)
137
138elif py310:
139
140 def get_annotations(obj: Any) -> Mapping[str, Any]:
141 return inspect.get_annotations(obj)
142
143else:
144
145 def get_annotations(obj: Any) -> Mapping[str, Any]:
146 # it's been observed that cls.__annotations__ can be non present.
147 # it's not clear what causes this, running under tox py37/38 it
148 # happens, running straight pytest it doesnt
149
150 # https://docs.python.org/3/howto/annotations.html#annotations-howto
151 if isinstance(obj, type):
152 ann = obj.__dict__.get("__annotations__", None)
153 else:
154 ann = getattr(obj, "__annotations__", None)
155
156 from . import _collections
157
158 if ann is None:
159 return _collections.EMPTY_DICT
160 else:
161 return cast("Mapping[str, Any]", ann)
162
163
164class FullArgSpec(typing.NamedTuple):
165 args: List[str]
166 varargs: Optional[str]
167 varkw: Optional[str]
168 defaults: Optional[Tuple[Any, ...]]
169 kwonlyargs: List[str]
170 kwonlydefaults: Optional[Dict[str, Any]]
171 annotations: Mapping[str, Any]
172
173
174def inspect_getfullargspec(func: Callable[..., Any]) -> FullArgSpec:
175 """Fully vendored version of getfullargspec from Python 3.3."""
176
177 if inspect.ismethod(func):
178 func = func.__func__
179 if not inspect.isfunction(func) and not hasattr(func, "__code__"):
180 raise TypeError(f"{func!r} is not a Python function")
181
182 co = func.__code__
183 if not inspect.iscode(co):
184 raise TypeError(f"{co!r} is not a code object")
185
186 nargs = co.co_argcount
187 names = co.co_varnames
188 nkwargs = co.co_kwonlyargcount
189 args = list(names[:nargs])
190 kwonlyargs = list(names[nargs : nargs + nkwargs])
191
192 nargs += nkwargs
193 varargs = None
194 if co.co_flags & inspect.CO_VARARGS:
195 varargs = co.co_varnames[nargs]
196 nargs = nargs + 1
197 varkw = None
198 if co.co_flags & inspect.CO_VARKEYWORDS:
199 varkw = co.co_varnames[nargs]
200
201 return FullArgSpec(
202 args,
203 varargs,
204 varkw,
205 func.__defaults__,
206 kwonlyargs,
207 func.__kwdefaults__,
208 get_annotations(func),
209 )
210
211
212if py39:
213 # python stubs don't have a public type for this. not worth
214 # making a protocol
215 def md5_not_for_security() -> Any:
216 return hashlib.md5(usedforsecurity=False)
217
218else:
219
220 def md5_not_for_security() -> Any:
221 return hashlib.md5()
222
223
224if typing.TYPE_CHECKING or py38:
225 from importlib import metadata as importlib_metadata
226else:
227 import importlib_metadata # noqa
228
229
230if typing.TYPE_CHECKING or py39:
231 # pep 584 dict union
232 dict_union = operator.or_ # noqa
233else:
234
235 def dict_union(a: dict, b: dict) -> dict:
236 a = a.copy()
237 a.update(b)
238 return a
239
240
241if py310:
242 anext_ = anext
243else:
244 _NOT_PROVIDED = object()
245 from collections.abc import AsyncIterator
246
247 async def anext_(async_iterator, default=_NOT_PROVIDED):
248 """vendored from https://github.com/python/cpython/pull/8895"""
249
250 if not isinstance(async_iterator, AsyncIterator):
251 raise TypeError(
252 f"anext expected an AsyncIterator, got {type(async_iterator)}"
253 )
254 anxt = type(async_iterator).__anext__
255 try:
256 return await anxt(async_iterator)
257 except StopAsyncIteration:
258 if default is _NOT_PROVIDED:
259 raise
260 return default
261
262
263def importlib_metadata_get(group):
264 ep = importlib_metadata.entry_points()
265 if typing.TYPE_CHECKING or hasattr(ep, "select"):
266 return ep.select(group=group)
267 else:
268 return ep.get(group, ())
269
270
271def b(s):
272 return s.encode("latin-1")
273
274
275def b64decode(x: str) -> bytes:
276 return base64.b64decode(x.encode("ascii"))
277
278
279def b64encode(x: bytes) -> str:
280 return base64.b64encode(x).decode("ascii")
281
282
283def decode_backslashreplace(text: bytes, encoding: str) -> str:
284 return text.decode(encoding, errors="backslashreplace")
285
286
287def cmp(a, b):
288 return (a > b) - (a < b)
289
290
291def _formatannotation(annotation, base_module=None):
292 """vendored from python 3.7"""
293
294 if isinstance(annotation, str):
295 return annotation
296
297 if getattr(annotation, "__module__", None) == "typing":
298 return repr(annotation).replace("typing.", "").replace("~", "")
299 if isinstance(annotation, type):
300 if annotation.__module__ in ("builtins", base_module):
301 return repr(annotation.__qualname__)
302 return annotation.__module__ + "." + annotation.__qualname__
303 elif isinstance(annotation, typing.TypeVar):
304 return repr(annotation).replace("~", "")
305 return repr(annotation).replace("~", "")
306
307
308def inspect_formatargspec(
309 args: List[str],
310 varargs: Optional[str] = None,
311 varkw: Optional[str] = None,
312 defaults: Optional[Sequence[Any]] = None,
313 kwonlyargs: Optional[Sequence[str]] = (),
314 kwonlydefaults: Optional[Mapping[str, Any]] = {},
315 annotations: Mapping[str, Any] = {},
316 formatarg: Callable[[str], str] = str,
317 formatvarargs: Callable[[str], str] = lambda name: "*" + name,
318 formatvarkw: Callable[[str], str] = lambda name: "**" + name,
319 formatvalue: Callable[[Any], str] = lambda value: "=" + repr(value),
320 formatreturns: Callable[[Any], str] = lambda text: " -> " + str(text),
321 formatannotation: Callable[[Any], str] = _formatannotation,
322) -> str:
323 """Copy formatargspec from python 3.7 standard library.
324
325 Python 3 has deprecated formatargspec and requested that Signature
326 be used instead, however this requires a full reimplementation
327 of formatargspec() in terms of creating Parameter objects and such.
328 Instead of introducing all the object-creation overhead and having
329 to reinvent from scratch, just copy their compatibility routine.
330
331 Ultimately we would need to rewrite our "decorator" routine completely
332 which is not really worth it right now, until all Python 2.x support
333 is dropped.
334
335 """
336
337 kwonlydefaults = kwonlydefaults or {}
338 annotations = annotations or {}
339
340 def formatargandannotation(arg):
341 result = formatarg(arg)
342 if arg in annotations:
343 result += ": " + formatannotation(annotations[arg])
344 return result
345
346 specs = []
347 if defaults:
348 firstdefault = len(args) - len(defaults)
349 else:
350 firstdefault = -1
351
352 for i, arg in enumerate(args):
353 spec = formatargandannotation(arg)
354 if defaults and i >= firstdefault:
355 spec = spec + formatvalue(defaults[i - firstdefault])
356 specs.append(spec)
357
358 if varargs is not None:
359 specs.append(formatvarargs(formatargandannotation(varargs)))
360 else:
361 if kwonlyargs:
362 specs.append("*")
363
364 if kwonlyargs:
365 for kwonlyarg in kwonlyargs:
366 spec = formatargandannotation(kwonlyarg)
367 if kwonlydefaults and kwonlyarg in kwonlydefaults:
368 spec += formatvalue(kwonlydefaults[kwonlyarg])
369 specs.append(spec)
370
371 if varkw is not None:
372 specs.append(formatvarkw(formatargandannotation(varkw)))
373
374 result = "(" + ", ".join(specs) + ")"
375 if "return" in annotations:
376 result += formatreturns(formatannotation(annotations["return"]))
377 return result
378
379
380def dataclass_fields(cls: Type[Any]) -> Iterable[dataclasses.Field[Any]]:
381 """Return a sequence of all dataclasses.Field objects associated
382 with a class as an already processed dataclass.
383
384 The class must **already be a dataclass** for Field objects to be returned.
385
386 """
387
388 if dataclasses.is_dataclass(cls):
389 return dataclasses.fields(cls)
390 else:
391 return []
392
393
394def local_dataclass_fields(cls: Type[Any]) -> Iterable[dataclasses.Field[Any]]:
395 """Return a sequence of all dataclasses.Field objects associated with
396 an already processed dataclass, excluding those that originate from a
397 superclass.
398
399 The class must **already be a dataclass** for Field objects to be returned.
400
401 """
402
403 if dataclasses.is_dataclass(cls):
404 super_fields: Set[dataclasses.Field[Any]] = set()
405 for sup in cls.__bases__:
406 super_fields.update(dataclass_fields(sup))
407 return [f for f in dataclasses.fields(cls) if f not in super_fields]
408 else:
409 return []
410
411
412if freethreading:
413 import threading
414
415 mini_gil = threading.RLock()
416 """provide a threading.RLock() under python freethreading only"""
417else:
418 import contextlib
419
420 mini_gil = contextlib.nullcontext() # type: ignore[assignment]