1# util/compat.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Handle Python version/platform incompatibilities."""
10
11from __future__ import annotations
12
13import base64
14import dataclasses
15import hashlib
16import inspect
17import operator
18import platform
19import sys
20import typing
21from typing import Any
22from typing import Callable
23from typing import Dict
24from typing import Iterable
25from typing import List
26from typing import Mapping
27from typing import Optional
28from typing import Sequence
29from typing import Set
30from typing import Tuple
31from typing import Type
32from typing import TypeVar
33
34
35py314b1 = sys.version_info >= (3, 14, 0, "beta", 1)
36py314 = sys.version_info >= (3, 14)
37py313 = sys.version_info >= (3, 13)
38py312 = sys.version_info >= (3, 12)
39py311 = sys.version_info >= (3, 11)
40py310 = sys.version_info >= (3, 10)
41py39 = sys.version_info >= (3, 9)
42py38 = sys.version_info >= (3, 8)
43pypy = platform.python_implementation() == "PyPy"
44cpython = platform.python_implementation() == "CPython"
45
46win32 = sys.platform.startswith("win")
47osx = sys.platform.startswith("darwin")
48arm = "aarch" in platform.machine().lower()
49is64bit = sys.maxsize > 2**32
50
51has_refcount_gc = bool(cpython)
52
53dottedgetter = operator.attrgetter
54
55_T_co = TypeVar("_T_co", covariant=True)
56
57
58class FullArgSpec(typing.NamedTuple):
59 args: List[str]
60 varargs: Optional[str]
61 varkw: Optional[str]
62 defaults: Optional[Tuple[Any, ...]]
63 kwonlyargs: List[str]
64 kwonlydefaults: Optional[Dict[str, Any]]
65 annotations: Dict[str, Any]
66
67
68def inspect_getfullargspec(func: Callable[..., Any]) -> FullArgSpec:
69 """Fully vendored version of getfullargspec from Python 3.3."""
70
71 if inspect.ismethod(func):
72 func = func.__func__
73 if not inspect.isfunction(func):
74 raise TypeError(f"{func!r} is not a Python function")
75
76 co = func.__code__
77 if not inspect.iscode(co):
78 raise TypeError(f"{co!r} is not a code object")
79
80 nargs = co.co_argcount
81 names = co.co_varnames
82 nkwargs = co.co_kwonlyargcount
83 args = list(names[:nargs])
84 kwonlyargs = list(names[nargs : nargs + nkwargs])
85
86 nargs += nkwargs
87 varargs = None
88 if co.co_flags & inspect.CO_VARARGS:
89 varargs = co.co_varnames[nargs]
90 nargs = nargs + 1
91 varkw = None
92 if co.co_flags & inspect.CO_VARKEYWORDS:
93 varkw = co.co_varnames[nargs]
94
95 return FullArgSpec(
96 args,
97 varargs,
98 varkw,
99 func.__defaults__,
100 kwonlyargs,
101 func.__kwdefaults__,
102 func.__annotations__,
103 )
104
105
106if py39:
107 # python stubs don't have a public type for this. not worth
108 # making a protocol
109 def md5_not_for_security() -> Any:
110 return hashlib.md5(usedforsecurity=False)
111
112else:
113
114 def md5_not_for_security() -> Any:
115 return hashlib.md5()
116
117
118if typing.TYPE_CHECKING or py38:
119 from importlib import metadata as importlib_metadata
120else:
121 import importlib_metadata # noqa
122
123
124if typing.TYPE_CHECKING or py39:
125 # pep 584 dict union
126 dict_union = operator.or_ # noqa
127else:
128
129 def dict_union(a: dict, b: dict) -> dict:
130 a = a.copy()
131 a.update(b)
132 return a
133
134
135if py310:
136 anext_ = anext
137else:
138 _NOT_PROVIDED = object()
139 from collections.abc import AsyncIterator
140
141 async def anext_(async_iterator, default=_NOT_PROVIDED):
142 """vendored from https://github.com/python/cpython/pull/8895"""
143
144 if not isinstance(async_iterator, AsyncIterator):
145 raise TypeError(
146 f"anext expected an AsyncIterator, got {type(async_iterator)}"
147 )
148 anxt = type(async_iterator).__anext__
149 try:
150 return await anxt(async_iterator)
151 except StopAsyncIteration:
152 if default is _NOT_PROVIDED:
153 raise
154 return default
155
156
157def importlib_metadata_get(group):
158 ep = importlib_metadata.entry_points()
159 if typing.TYPE_CHECKING or hasattr(ep, "select"):
160 return ep.select(group=group)
161 else:
162 return ep.get(group, ())
163
164
165def b(s):
166 return s.encode("latin-1")
167
168
169def b64decode(x: str) -> bytes:
170 return base64.b64decode(x.encode("ascii"))
171
172
173def b64encode(x: bytes) -> str:
174 return base64.b64encode(x).decode("ascii")
175
176
177def decode_backslashreplace(text: bytes, encoding: str) -> str:
178 return text.decode(encoding, errors="backslashreplace")
179
180
181def cmp(a, b):
182 return (a > b) - (a < b)
183
184
185def _formatannotation(annotation, base_module=None):
186 """vendored from python 3.7"""
187
188 if isinstance(annotation, str):
189 return annotation
190
191 if getattr(annotation, "__module__", None) == "typing":
192 return repr(annotation).replace("typing.", "").replace("~", "")
193 if isinstance(annotation, type):
194 if annotation.__module__ in ("builtins", base_module):
195 return repr(annotation.__qualname__)
196 return annotation.__module__ + "." + annotation.__qualname__
197 elif isinstance(annotation, typing.TypeVar):
198 return repr(annotation).replace("~", "")
199 return repr(annotation).replace("~", "")
200
201
202def inspect_formatargspec(
203 args: List[str],
204 varargs: Optional[str] = None,
205 varkw: Optional[str] = None,
206 defaults: Optional[Sequence[Any]] = None,
207 kwonlyargs: Optional[Sequence[str]] = (),
208 kwonlydefaults: Optional[Mapping[str, Any]] = {},
209 annotations: Mapping[str, Any] = {},
210 formatarg: Callable[[str], str] = str,
211 formatvarargs: Callable[[str], str] = lambda name: "*" + name,
212 formatvarkw: Callable[[str], str] = lambda name: "**" + name,
213 formatvalue: Callable[[Any], str] = lambda value: "=" + repr(value),
214 formatreturns: Callable[[Any], str] = lambda text: " -> " + str(text),
215 formatannotation: Callable[[Any], str] = _formatannotation,
216) -> str:
217 """Copy formatargspec from python 3.7 standard library.
218
219 Python 3 has deprecated formatargspec and requested that Signature
220 be used instead, however this requires a full reimplementation
221 of formatargspec() in terms of creating Parameter objects and such.
222 Instead of introducing all the object-creation overhead and having
223 to reinvent from scratch, just copy their compatibility routine.
224
225 Ultimately we would need to rewrite our "decorator" routine completely
226 which is not really worth it right now, until all Python 2.x support
227 is dropped.
228
229 """
230
231 kwonlydefaults = kwonlydefaults or {}
232 annotations = annotations or {}
233
234 def formatargandannotation(arg):
235 result = formatarg(arg)
236 if arg in annotations:
237 result += ": " + formatannotation(annotations[arg])
238 return result
239
240 specs = []
241 if defaults:
242 firstdefault = len(args) - len(defaults)
243 else:
244 firstdefault = -1
245
246 for i, arg in enumerate(args):
247 spec = formatargandannotation(arg)
248 if defaults and i >= firstdefault:
249 spec = spec + formatvalue(defaults[i - firstdefault])
250 specs.append(spec)
251
252 if varargs is not None:
253 specs.append(formatvarargs(formatargandannotation(varargs)))
254 else:
255 if kwonlyargs:
256 specs.append("*")
257
258 if kwonlyargs:
259 for kwonlyarg in kwonlyargs:
260 spec = formatargandannotation(kwonlyarg)
261 if kwonlydefaults and kwonlyarg in kwonlydefaults:
262 spec += formatvalue(kwonlydefaults[kwonlyarg])
263 specs.append(spec)
264
265 if varkw is not None:
266 specs.append(formatvarkw(formatargandannotation(varkw)))
267
268 result = "(" + ", ".join(specs) + ")"
269 if "return" in annotations:
270 result += formatreturns(formatannotation(annotations["return"]))
271 return result
272
273
274def dataclass_fields(cls: Type[Any]) -> Iterable[dataclasses.Field[Any]]:
275 """Return a sequence of all dataclasses.Field objects associated
276 with a class as an already processed dataclass.
277
278 The class must **already be a dataclass** for Field objects to be returned.
279
280 """
281
282 if dataclasses.is_dataclass(cls):
283 return dataclasses.fields(cls)
284 else:
285 return []
286
287
288def local_dataclass_fields(cls: Type[Any]) -> Iterable[dataclasses.Field[Any]]:
289 """Return a sequence of all dataclasses.Field objects associated with
290 an already processed dataclass, excluding those that originate from a
291 superclass.
292
293 The class must **already be a dataclass** for Field objects to be returned.
294
295 """
296
297 if dataclasses.is_dataclass(cls):
298 super_fields: Set[dataclasses.Field[Any]] = set()
299 for sup in cls.__bases__:
300 super_fields.update(dataclass_fields(sup))
301 return [f for f in dataclasses.fields(cls) if f not in super_fields]
302 else:
303 return []