1# util/compat.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Handle Python version/platform incompatibilities."""
10
11from __future__ import annotations
12
13import base64
14import dataclasses
15import hashlib
16from importlib import metadata as importlib_metadata
17import inspect
18import operator
19import platform
20import sys
21import sysconfig
22import typing
23from typing import Any
24from typing import Callable
25from typing import Dict
26from typing import Iterable
27from typing import List
28from typing import Mapping
29from typing import Optional
30from typing import Sequence
31from typing import Set
32from typing import Tuple
33from typing import Type
34
35py314b1 = sys.version_info >= (3, 14, 0, "beta", 1)
36py314 = sys.version_info >= (3, 14)
37py313 = sys.version_info >= (3, 13)
38py312 = sys.version_info >= (3, 12)
39py311 = sys.version_info >= (3, 11)
40pypy = platform.python_implementation() == "PyPy"
41cpython = platform.python_implementation() == "CPython"
42freethreading = bool(sysconfig.get_config_var("Py_GIL_DISABLED"))
43
44win32 = sys.platform.startswith("win")
45osx = sys.platform.startswith("darwin")
46arm = "aarch" in platform.machine().lower()
47is64bit = sys.maxsize > 2**32
48
49has_refcount_gc = bool(cpython)
50
51dottedgetter = operator.attrgetter
52
53
54class FullArgSpec(typing.NamedTuple):
55 args: List[str]
56 varargs: Optional[str]
57 varkw: Optional[str]
58 defaults: Optional[Tuple[Any, ...]]
59 kwonlyargs: List[str]
60 kwonlydefaults: Optional[Dict[str, Any]]
61 annotations: Dict[str, Any]
62
63
64def inspect_getfullargspec(func: Callable[..., Any]) -> FullArgSpec:
65 """Fully vendored version of getfullargspec from Python 3.3."""
66
67 if inspect.ismethod(func):
68 func = func.__func__
69 if not inspect.isfunction(func):
70 raise TypeError(f"{func!r} is not a Python function")
71
72 co = func.__code__
73 if not inspect.iscode(co):
74 raise TypeError(f"{co!r} is not a code object")
75
76 nargs = co.co_argcount
77 names = co.co_varnames
78 nkwargs = co.co_kwonlyargcount
79 args = list(names[:nargs])
80 kwonlyargs = list(names[nargs : nargs + nkwargs])
81
82 nargs += nkwargs
83 varargs = None
84 if co.co_flags & inspect.CO_VARARGS:
85 varargs = co.co_varnames[nargs]
86 nargs = nargs + 1
87 varkw = None
88 if co.co_flags & inspect.CO_VARKEYWORDS:
89 varkw = co.co_varnames[nargs]
90
91 return FullArgSpec(
92 args,
93 varargs,
94 varkw,
95 func.__defaults__,
96 kwonlyargs,
97 func.__kwdefaults__,
98 func.__annotations__,
99 )
100
101
102# python stubs don't have a public type for this. not worth
103# making a protocol
104def md5_not_for_security() -> Any:
105 return hashlib.md5(usedforsecurity=False)
106
107
108def importlib_metadata_get(group):
109 ep = importlib_metadata.entry_points()
110 if typing.TYPE_CHECKING or hasattr(ep, "select"):
111 return ep.select(group=group)
112 else:
113 return ep.get(group, ())
114
115
116def b(s):
117 return s.encode("latin-1")
118
119
120def b64decode(x: str) -> bytes:
121 return base64.b64decode(x.encode("ascii"))
122
123
124def b64encode(x: bytes) -> str:
125 return base64.b64encode(x).decode("ascii")
126
127
128def decode_backslashreplace(text: bytes, encoding: str) -> str:
129 return text.decode(encoding, errors="backslashreplace")
130
131
132def cmp(a, b):
133 return (a > b) - (a < b)
134
135
136def _formatannotation(annotation, base_module=None):
137 """vendored from python 3.7"""
138
139 if isinstance(annotation, str):
140 return annotation
141
142 if getattr(annotation, "__module__", None) == "typing":
143 return repr(annotation).replace("typing.", "").replace("~", "")
144 if isinstance(annotation, type):
145 if annotation.__module__ in ("builtins", base_module):
146 return repr(annotation.__qualname__)
147 return annotation.__module__ + "." + annotation.__qualname__
148 elif isinstance(annotation, typing.TypeVar):
149 return repr(annotation).replace("~", "")
150 return repr(annotation).replace("~", "")
151
152
153def inspect_formatargspec(
154 args: List[str],
155 varargs: Optional[str] = None,
156 varkw: Optional[str] = None,
157 defaults: Optional[Sequence[Any]] = None,
158 kwonlyargs: Optional[Sequence[str]] = (),
159 kwonlydefaults: Optional[Mapping[str, Any]] = {},
160 annotations: Mapping[str, Any] = {},
161 formatarg: Callable[[str], str] = str,
162 formatvarargs: Callable[[str], str] = lambda name: "*" + name,
163 formatvarkw: Callable[[str], str] = lambda name: "**" + name,
164 formatvalue: Callable[[Any], str] = lambda value: "=" + repr(value),
165 formatreturns: Callable[[Any], str] = lambda text: " -> " + str(text),
166 formatannotation: Callable[[Any], str] = _formatannotation,
167) -> str:
168 """Copy formatargspec from python 3.7 standard library.
169
170 Python 3 has deprecated formatargspec and requested that Signature
171 be used instead, however this requires a full reimplementation
172 of formatargspec() in terms of creating Parameter objects and such.
173 Instead of introducing all the object-creation overhead and having
174 to reinvent from scratch, just copy their compatibility routine.
175
176 Ultimately we would need to rewrite our "decorator" routine completely
177 which is not really worth it right now, until all Python 2.x support
178 is dropped.
179
180 """
181
182 kwonlydefaults = kwonlydefaults or {}
183 annotations = annotations or {}
184
185 def formatargandannotation(arg):
186 result = formatarg(arg)
187 if arg in annotations:
188 result += ": " + formatannotation(annotations[arg])
189 return result
190
191 specs = []
192 if defaults:
193 firstdefault = len(args) - len(defaults)
194 else:
195 firstdefault = -1
196
197 for i, arg in enumerate(args):
198 spec = formatargandannotation(arg)
199 if defaults and i >= firstdefault:
200 spec = spec + formatvalue(defaults[i - firstdefault])
201 specs.append(spec)
202
203 if varargs is not None:
204 specs.append(formatvarargs(formatargandannotation(varargs)))
205 else:
206 if kwonlyargs:
207 specs.append("*")
208
209 if kwonlyargs:
210 for kwonlyarg in kwonlyargs:
211 spec = formatargandannotation(kwonlyarg)
212 if kwonlydefaults and kwonlyarg in kwonlydefaults:
213 spec += formatvalue(kwonlydefaults[kwonlyarg])
214 specs.append(spec)
215
216 if varkw is not None:
217 specs.append(formatvarkw(formatargandannotation(varkw)))
218
219 result = "(" + ", ".join(specs) + ")"
220 if "return" in annotations:
221 result += formatreturns(formatannotation(annotations["return"]))
222 return result
223
224
225def dataclass_fields(cls: Type[Any]) -> Iterable[dataclasses.Field[Any]]:
226 """Return a sequence of all dataclasses.Field objects associated
227 with a class as an already processed dataclass.
228
229 The class must **already be a dataclass** for Field objects to be returned.
230
231 """
232
233 if dataclasses.is_dataclass(cls):
234 return dataclasses.fields(cls)
235 else:
236 return []
237
238
239def local_dataclass_fields(cls: Type[Any]) -> Iterable[dataclasses.Field[Any]]:
240 """Return a sequence of all dataclasses.Field objects associated with
241 an already processed dataclass, excluding those that originate from a
242 superclass.
243
244 The class must **already be a dataclass** for Field objects to be returned.
245
246 """
247
248 if dataclasses.is_dataclass(cls):
249 super_fields: Set[dataclasses.Field[Any]] = set()
250 for sup in cls.__bases__:
251 super_fields.update(dataclass_fields(sup))
252 return [f for f in dataclasses.fields(cls) if f not in super_fields]
253 else:
254 return []
255
256
257if freethreading:
258 import threading
259
260 mini_gil = threading.RLock()
261 """provide a threading.RLock() under python freethreading only"""
262else:
263 import contextlib
264
265 mini_gil = contextlib.nullcontext() # type: ignore[assignment]