1# util/compat.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Handle Python version/platform incompatibilities."""
10
11from __future__ import annotations
12
13import base64
14import dataclasses
15import hashlib
16from importlib import metadata as importlib_metadata
17import inspect
18import operator
19import platform
20import sys
21import sysconfig
22import typing
23from typing import Any
24from typing import Callable
25from typing import Dict
26from typing import Iterable
27from typing import List
28from typing import Mapping
29from typing import Optional
30from typing import Sequence
31from typing import Set
32from typing import Tuple
33from typing import Type
34from typing import TYPE_CHECKING
35
36py314b1 = sys.version_info >= (3, 14, 0, "beta", 1)
37py314 = sys.version_info >= (3, 14)
38py313 = sys.version_info >= (3, 13)
39py312 = sys.version_info >= (3, 12)
40py311 = sys.version_info >= (3, 11)
41pypy = platform.python_implementation() == "PyPy"
42cpython = platform.python_implementation() == "CPython"
43freethreading = bool(sysconfig.get_config_var("Py_GIL_DISABLED"))
44
45win32 = sys.platform.startswith("win")
46osx = sys.platform.startswith("darwin")
47arm = "aarch" in platform.machine().lower()
48is64bit = sys.maxsize > 2**32
49
50has_refcount_gc = bool(cpython)
51
52dottedgetter = operator.attrgetter
53
54if py314 or TYPE_CHECKING:
55 from string.templatelib import Template as Template
56else:
57
58 class Template: # type: ignore[no-redef]
59 """Minimal Template for Python < 3.14 (test usage only)."""
60
61 def __init__(self, *parts: Any):
62 self._parts = parts
63
64 @property
65 def strings(self) -> Tuple[str, ...]:
66 return tuple(p for p in self._parts if isinstance(p, str))
67
68 @property
69 def interpolations(self) -> Tuple[Any, ...]:
70 return tuple(p for p in self._parts if not isinstance(p, str))
71
72 def __iter__(self) -> Any:
73 return iter(self._parts)
74
75
76class FullArgSpec(typing.NamedTuple):
77 args: List[str]
78 varargs: Optional[str]
79 varkw: Optional[str]
80 defaults: Optional[Tuple[Any, ...]]
81 kwonlyargs: List[str]
82 kwonlydefaults: Optional[Dict[str, Any]]
83 annotations: Dict[str, Any]
84
85
86def inspect_getfullargspec(func: Callable[..., Any]) -> FullArgSpec:
87 """Fully vendored version of getfullargspec from Python 3.3."""
88
89 if inspect.ismethod(func):
90 func = func.__func__
91 if not inspect.isfunction(func):
92 raise TypeError(f"{func!r} is not a Python function")
93
94 co = func.__code__
95 if not inspect.iscode(co):
96 raise TypeError(f"{co!r} is not a code object")
97
98 nargs = co.co_argcount
99 names = co.co_varnames
100 nkwargs = co.co_kwonlyargcount
101 args = list(names[:nargs])
102 kwonlyargs = list(names[nargs : nargs + nkwargs])
103
104 nargs += nkwargs
105 varargs = None
106 if co.co_flags & inspect.CO_VARARGS:
107 varargs = co.co_varnames[nargs]
108 nargs = nargs + 1
109 varkw = None
110 if co.co_flags & inspect.CO_VARKEYWORDS:
111 varkw = co.co_varnames[nargs]
112
113 return FullArgSpec(
114 args,
115 varargs,
116 varkw,
117 func.__defaults__,
118 kwonlyargs,
119 func.__kwdefaults__,
120 func.__annotations__,
121 )
122
123
124# python stubs don't have a public type for this. not worth
125# making a protocol
126def md5_not_for_security() -> Any:
127 return hashlib.md5(usedforsecurity=False)
128
129
130def importlib_metadata_get(group):
131 ep = importlib_metadata.entry_points()
132 if typing.TYPE_CHECKING or hasattr(ep, "select"):
133 return ep.select(group=group)
134 else:
135 return ep.get(group, ())
136
137
138def b(s):
139 return s.encode("latin-1")
140
141
142def b64decode(x: str) -> bytes:
143 return base64.b64decode(x.encode("ascii"))
144
145
146def b64encode(x: bytes) -> str:
147 return base64.b64encode(x).decode("ascii")
148
149
150def decode_backslashreplace(text: bytes, encoding: str) -> str:
151 return text.decode(encoding, errors="backslashreplace")
152
153
154def cmp(a, b):
155 return (a > b) - (a < b)
156
157
158def _formatannotation(annotation, base_module=None):
159 """vendored from python 3.7"""
160
161 if isinstance(annotation, str):
162 return annotation
163
164 if getattr(annotation, "__module__", None) == "typing":
165 return repr(annotation).replace("typing.", "").replace("~", "")
166 if isinstance(annotation, type):
167 if annotation.__module__ in ("builtins", base_module):
168 return repr(annotation.__qualname__)
169 return annotation.__module__ + "." + annotation.__qualname__
170 elif isinstance(annotation, typing.TypeVar):
171 return repr(annotation).replace("~", "")
172 return repr(annotation).replace("~", "")
173
174
175def inspect_formatargspec(
176 args: List[str],
177 varargs: Optional[str] = None,
178 varkw: Optional[str] = None,
179 defaults: Optional[Sequence[Any]] = None,
180 kwonlyargs: Optional[Sequence[str]] = (),
181 kwonlydefaults: Optional[Mapping[str, Any]] = {},
182 annotations: Mapping[str, Any] = {},
183 formatarg: Callable[[str], str] = str,
184 formatvarargs: Callable[[str], str] = lambda name: "*" + name,
185 formatvarkw: Callable[[str], str] = lambda name: "**" + name,
186 formatvalue: Callable[[Any], str] = lambda value: "=" + repr(value),
187 formatreturns: Callable[[Any], str] = lambda text: " -> " + str(text),
188 formatannotation: Callable[[Any], str] = _formatannotation,
189) -> str:
190 """Copy formatargspec from python 3.7 standard library.
191
192 Python 3 has deprecated formatargspec and requested that Signature
193 be used instead, however this requires a full reimplementation
194 of formatargspec() in terms of creating Parameter objects and such.
195 Instead of introducing all the object-creation overhead and having
196 to reinvent from scratch, just copy their compatibility routine.
197
198 Ultimately we would need to rewrite our "decorator" routine completely
199 which is not really worth it right now, until all Python 2.x support
200 is dropped.
201
202 """
203
204 kwonlydefaults = kwonlydefaults or {}
205 annotations = annotations or {}
206
207 def formatargandannotation(arg):
208 result = formatarg(arg)
209 if arg in annotations:
210 result += ": " + formatannotation(annotations[arg])
211 return result
212
213 specs = []
214 if defaults:
215 firstdefault = len(args) - len(defaults)
216 else:
217 firstdefault = -1
218
219 for i, arg in enumerate(args):
220 spec = formatargandannotation(arg)
221 if defaults and i >= firstdefault:
222 spec = spec + formatvalue(defaults[i - firstdefault])
223 specs.append(spec)
224
225 if varargs is not None:
226 specs.append(formatvarargs(formatargandannotation(varargs)))
227 else:
228 if kwonlyargs:
229 specs.append("*")
230
231 if kwonlyargs:
232 for kwonlyarg in kwonlyargs:
233 spec = formatargandannotation(kwonlyarg)
234 if kwonlydefaults and kwonlyarg in kwonlydefaults:
235 spec += formatvalue(kwonlydefaults[kwonlyarg])
236 specs.append(spec)
237
238 if varkw is not None:
239 specs.append(formatvarkw(formatargandannotation(varkw)))
240
241 result = "(" + ", ".join(specs) + ")"
242 if "return" in annotations:
243 result += formatreturns(formatannotation(annotations["return"]))
244 return result
245
246
247def dataclass_fields(cls: Type[Any]) -> Iterable[dataclasses.Field[Any]]:
248 """Return a sequence of all dataclasses.Field objects associated
249 with a class as an already processed dataclass.
250
251 The class must **already be a dataclass** for Field objects to be returned.
252
253 """
254
255 if dataclasses.is_dataclass(cls):
256 return dataclasses.fields(cls)
257 else:
258 return []
259
260
261def local_dataclass_fields(cls: Type[Any]) -> Iterable[dataclasses.Field[Any]]:
262 """Return a sequence of all dataclasses.Field objects associated with
263 an already processed dataclass, excluding those that originate from a
264 superclass.
265
266 The class must **already be a dataclass** for Field objects to be returned.
267
268 """
269
270 if dataclasses.is_dataclass(cls):
271 super_fields: Set[dataclasses.Field[Any]] = set()
272 for sup in cls.__bases__:
273 super_fields.update(dataclass_fields(sup))
274 return [f for f in dataclasses.fields(cls) if f not in super_fields]
275 else:
276 return []
277
278
279if freethreading:
280 import threading
281
282 mini_gil = threading.RLock()
283 """provide a threading.RLock() under python freethreading only"""
284else:
285 import contextlib
286
287 mini_gil = contextlib.nullcontext() # type: ignore[assignment]