1# util/compat.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Handle Python version/platform incompatibilities."""
10
11from __future__ import annotations
12
13import base64
14import dataclasses
15import hashlib
16import inspect
17import operator
18import platform
19import sys
20import sysconfig
21import typing
22from typing import Any
23from typing import Callable
24from typing import cast
25from typing import Dict
26from typing import Iterable
27from typing import List
28from typing import Mapping
29from typing import Optional
30from typing import Sequence
31from typing import Set
32from typing import Tuple
33from typing import Type
34from typing import TypeVar
35
36
37py314b1 = sys.version_info >= (3, 14, 0, "beta", 1)
38py314 = sys.version_info >= (3, 14)
39py313 = sys.version_info >= (3, 13)
40py312 = sys.version_info >= (3, 12)
41py311 = sys.version_info >= (3, 11)
42py310 = sys.version_info >= (3, 10)
43py39 = sys.version_info >= (3, 9)
44py38 = sys.version_info >= (3, 8)
45pypy = platform.python_implementation() == "PyPy"
46cpython = platform.python_implementation() == "CPython"
47freethreading = bool(sysconfig.get_config_var("Py_GIL_DISABLED"))
48
49win32 = sys.platform.startswith("win")
50osx = sys.platform.startswith("darwin")
51arm = "aarch" in platform.machine().lower()
52is64bit = sys.maxsize > 2**32
53
54has_refcount_gc = bool(cpython)
55
56dottedgetter = operator.attrgetter
57
58_T_co = TypeVar("_T_co", covariant=True)
59
60
61if py314:
62 # vendor a minimal form of get_annotations per
63 # https://github.com/python/cpython/issues/133684#issuecomment-2863841891
64
65 from annotationlib import call_annotate_function # type: ignore[import-not-found,unused-ignore] # noqa: E501
66 from annotationlib import Format
67
68 def _get_and_call_annotate(obj, format): # noqa: A002
69 annotate = getattr(obj, "__annotate__", None)
70 if annotate is not None:
71 ann = call_annotate_function(annotate, format, owner=obj)
72 if not isinstance(ann, dict):
73 raise ValueError(f"{obj!r}.__annotate__ returned a non-dict")
74 return ann
75 return None
76
77 # this is ported from py3.13.0a7
78 _BASE_GET_ANNOTATIONS = type.__dict__["__annotations__"].__get__
79
80 def _get_dunder_annotations(obj):
81 if isinstance(obj, type):
82 try:
83 ann = _BASE_GET_ANNOTATIONS(obj)
84 except AttributeError:
85 # For static types, the descriptor raises AttributeError.
86 return {}
87 else:
88 ann = getattr(obj, "__annotations__", None)
89 if ann is None:
90 return {}
91
92 if not isinstance(ann, dict):
93 raise ValueError(
94 f"{obj!r}.__annotations__ is neither a dict nor None"
95 )
96 return dict(ann)
97
98 def _vendored_get_annotations(
99 obj: Any, *, format: Format # noqa: A002
100 ) -> Mapping[str, Any]:
101 """A sparse implementation of annotationlib.get_annotations()"""
102
103 try:
104 ann = _get_dunder_annotations(obj)
105 except Exception:
106 pass
107 else:
108 if ann is not None:
109 return dict(ann)
110
111 # But if __annotations__ threw a NameError, we try calling __annotate__
112 ann = _get_and_call_annotate(obj, format)
113 if ann is None:
114 # If that didn't work either, we have a very weird object:
115 # evaluating
116 # __annotations__ threw NameError and there is no __annotate__.
117 # In that case,
118 # we fall back to trying __annotations__ again.
119 ann = _get_dunder_annotations(obj)
120
121 if ann is None:
122 if isinstance(obj, type) or callable(obj):
123 return {}
124 raise TypeError(f"{obj!r} does not have annotations")
125
126 if not ann:
127 return {}
128
129 return dict(ann)
130
131 def get_annotations(obj: Any) -> Mapping[str, Any]:
132 # FORWARDREF has the effect of giving us ForwardRefs and not
133 # actually trying to evaluate the annotations. We need this so
134 # that the annotations act as much like
135 # "from __future__ import annotations" as possible, which is going
136 # away in future python as a separate mode
137 return _vendored_get_annotations(obj, format=Format.FORWARDREF)
138
139elif py310:
140
141 def get_annotations(obj: Any) -> Mapping[str, Any]:
142 return inspect.get_annotations(obj)
143
144else:
145
146 def get_annotations(obj: Any) -> Mapping[str, Any]:
147 # it's been observed that cls.__annotations__ can be non present.
148 # it's not clear what causes this, running under tox py37/38 it
149 # happens, running straight pytest it doesnt
150
151 # https://docs.python.org/3/howto/annotations.html#annotations-howto
152 if isinstance(obj, type):
153 ann = obj.__dict__.get("__annotations__", None)
154 else:
155 ann = getattr(obj, "__annotations__", None)
156
157 from . import _collections
158
159 if ann is None:
160 return _collections.EMPTY_DICT
161 else:
162 return cast("Mapping[str, Any]", ann)
163
164
165class FullArgSpec(typing.NamedTuple):
166 args: List[str]
167 varargs: Optional[str]
168 varkw: Optional[str]
169 defaults: Optional[Tuple[Any, ...]]
170 kwonlyargs: List[str]
171 kwonlydefaults: Optional[Dict[str, Any]]
172 annotations: Mapping[str, Any]
173
174
175def inspect_getfullargspec(func: Callable[..., Any]) -> FullArgSpec:
176 """Fully vendored version of getfullargspec from Python 3.3."""
177
178 if inspect.ismethod(func):
179 func = func.__func__
180 if not inspect.isfunction(func):
181 raise TypeError(f"{func!r} is not a Python function")
182
183 co = func.__code__
184 if not inspect.iscode(co):
185 raise TypeError(f"{co!r} is not a code object")
186
187 nargs = co.co_argcount
188 names = co.co_varnames
189 nkwargs = co.co_kwonlyargcount
190 args = list(names[:nargs])
191 kwonlyargs = list(names[nargs : nargs + nkwargs])
192
193 nargs += nkwargs
194 varargs = None
195 if co.co_flags & inspect.CO_VARARGS:
196 varargs = co.co_varnames[nargs]
197 nargs = nargs + 1
198 varkw = None
199 if co.co_flags & inspect.CO_VARKEYWORDS:
200 varkw = co.co_varnames[nargs]
201
202 return FullArgSpec(
203 args,
204 varargs,
205 varkw,
206 func.__defaults__,
207 kwonlyargs,
208 func.__kwdefaults__,
209 get_annotations(func),
210 )
211
212
213if py39:
214 # python stubs don't have a public type for this. not worth
215 # making a protocol
216 def md5_not_for_security() -> Any:
217 return hashlib.md5(usedforsecurity=False)
218
219else:
220
221 def md5_not_for_security() -> Any:
222 return hashlib.md5()
223
224
225if typing.TYPE_CHECKING or py38:
226 from importlib import metadata as importlib_metadata
227else:
228 import importlib_metadata # noqa
229
230
231if typing.TYPE_CHECKING or py39:
232 # pep 584 dict union
233 dict_union = operator.or_ # noqa
234else:
235
236 def dict_union(a: dict, b: dict) -> dict:
237 a = a.copy()
238 a.update(b)
239 return a
240
241
242if py310:
243 anext_ = anext
244else:
245 _NOT_PROVIDED = object()
246 from collections.abc import AsyncIterator
247
248 async def anext_(async_iterator, default=_NOT_PROVIDED):
249 """vendored from https://github.com/python/cpython/pull/8895"""
250
251 if not isinstance(async_iterator, AsyncIterator):
252 raise TypeError(
253 f"anext expected an AsyncIterator, got {type(async_iterator)}"
254 )
255 anxt = type(async_iterator).__anext__
256 try:
257 return await anxt(async_iterator)
258 except StopAsyncIteration:
259 if default is _NOT_PROVIDED:
260 raise
261 return default
262
263
264def importlib_metadata_get(group):
265 ep = importlib_metadata.entry_points()
266 if typing.TYPE_CHECKING or hasattr(ep, "select"):
267 return ep.select(group=group)
268 else:
269 return ep.get(group, ())
270
271
272def b(s):
273 return s.encode("latin-1")
274
275
276def b64decode(x: str) -> bytes:
277 return base64.b64decode(x.encode("ascii"))
278
279
280def b64encode(x: bytes) -> str:
281 return base64.b64encode(x).decode("ascii")
282
283
284def decode_backslashreplace(text: bytes, encoding: str) -> str:
285 return text.decode(encoding, errors="backslashreplace")
286
287
288def cmp(a, b):
289 return (a > b) - (a < b)
290
291
292def _formatannotation(annotation, base_module=None):
293 """vendored from python 3.7"""
294
295 if isinstance(annotation, str):
296 return annotation
297
298 if getattr(annotation, "__module__", None) == "typing":
299 return repr(annotation).replace("typing.", "").replace("~", "")
300 if isinstance(annotation, type):
301 if annotation.__module__ in ("builtins", base_module):
302 return repr(annotation.__qualname__)
303 return annotation.__module__ + "." + annotation.__qualname__
304 elif isinstance(annotation, typing.TypeVar):
305 return repr(annotation).replace("~", "")
306 return repr(annotation).replace("~", "")
307
308
309def inspect_formatargspec(
310 args: List[str],
311 varargs: Optional[str] = None,
312 varkw: Optional[str] = None,
313 defaults: Optional[Sequence[Any]] = None,
314 kwonlyargs: Optional[Sequence[str]] = (),
315 kwonlydefaults: Optional[Mapping[str, Any]] = {},
316 annotations: Mapping[str, Any] = {},
317 formatarg: Callable[[str], str] = str,
318 formatvarargs: Callable[[str], str] = lambda name: "*" + name,
319 formatvarkw: Callable[[str], str] = lambda name: "**" + name,
320 formatvalue: Callable[[Any], str] = lambda value: "=" + repr(value),
321 formatreturns: Callable[[Any], str] = lambda text: " -> " + str(text),
322 formatannotation: Callable[[Any], str] = _formatannotation,
323) -> str:
324 """Copy formatargspec from python 3.7 standard library.
325
326 Python 3 has deprecated formatargspec and requested that Signature
327 be used instead, however this requires a full reimplementation
328 of formatargspec() in terms of creating Parameter objects and such.
329 Instead of introducing all the object-creation overhead and having
330 to reinvent from scratch, just copy their compatibility routine.
331
332 Ultimately we would need to rewrite our "decorator" routine completely
333 which is not really worth it right now, until all Python 2.x support
334 is dropped.
335
336 """
337
338 kwonlydefaults = kwonlydefaults or {}
339 annotations = annotations or {}
340
341 def formatargandannotation(arg):
342 result = formatarg(arg)
343 if arg in annotations:
344 result += ": " + formatannotation(annotations[arg])
345 return result
346
347 specs = []
348 if defaults:
349 firstdefault = len(args) - len(defaults)
350 else:
351 firstdefault = -1
352
353 for i, arg in enumerate(args):
354 spec = formatargandannotation(arg)
355 if defaults and i >= firstdefault:
356 spec = spec + formatvalue(defaults[i - firstdefault])
357 specs.append(spec)
358
359 if varargs is not None:
360 specs.append(formatvarargs(formatargandannotation(varargs)))
361 else:
362 if kwonlyargs:
363 specs.append("*")
364
365 if kwonlyargs:
366 for kwonlyarg in kwonlyargs:
367 spec = formatargandannotation(kwonlyarg)
368 if kwonlydefaults and kwonlyarg in kwonlydefaults:
369 spec += formatvalue(kwonlydefaults[kwonlyarg])
370 specs.append(spec)
371
372 if varkw is not None:
373 specs.append(formatvarkw(formatargandannotation(varkw)))
374
375 result = "(" + ", ".join(specs) + ")"
376 if "return" in annotations:
377 result += formatreturns(formatannotation(annotations["return"]))
378 return result
379
380
381def dataclass_fields(cls: Type[Any]) -> Iterable[dataclasses.Field[Any]]:
382 """Return a sequence of all dataclasses.Field objects associated
383 with a class as an already processed dataclass.
384
385 The class must **already be a dataclass** for Field objects to be returned.
386
387 """
388
389 if dataclasses.is_dataclass(cls):
390 return dataclasses.fields(cls)
391 else:
392 return []
393
394
395def local_dataclass_fields(cls: Type[Any]) -> Iterable[dataclasses.Field[Any]]:
396 """Return a sequence of all dataclasses.Field objects associated with
397 an already processed dataclass, excluding those that originate from a
398 superclass.
399
400 The class must **already be a dataclass** for Field objects to be returned.
401
402 """
403
404 if dataclasses.is_dataclass(cls):
405 super_fields: Set[dataclasses.Field[Any]] = set()
406 for sup in cls.__bases__:
407 super_fields.update(dataclass_fields(sup))
408 return [f for f in dataclasses.fields(cls) if f not in super_fields]
409 else:
410 return []
411
412
413if freethreading:
414 import threading
415
416 mini_gil = threading.RLock()
417 """provide a threading.RLock() under python freethreading only"""
418else:
419 import contextlib
420
421 mini_gil = contextlib.nullcontext() # type: ignore[assignment]