Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/util/compat.py: 72%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

165 statements  

1# util/compat.py 

2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9"""Handle Python version/platform incompatibilities.""" 

10 

11from __future__ import annotations 

12 

13import base64 

14import dataclasses 

15import hashlib 

16from importlib import metadata as importlib_metadata 

17import inspect 

18import operator 

19import platform 

20import sys 

21import sysconfig 

22import typing 

23from typing import Any 

24from typing import Callable 

25from typing import Dict 

26from typing import Iterable 

27from typing import List 

28from typing import Mapping 

29from typing import Optional 

30from typing import Sequence 

31from typing import Set 

32from typing import Tuple 

33from typing import Type 

34 

35py314b1 = sys.version_info >= (3, 14, 0, "beta", 1) 

36py314 = sys.version_info >= (3, 14) 

37py313 = sys.version_info >= (3, 13) 

38py312 = sys.version_info >= (3, 12) 

39py311 = sys.version_info >= (3, 11) 

40pypy = platform.python_implementation() == "PyPy" 

41cpython = platform.python_implementation() == "CPython" 

42freethreading = bool(sysconfig.get_config_var("Py_GIL_DISABLED")) 

43 

44win32 = sys.platform.startswith("win") 

45osx = sys.platform.startswith("darwin") 

46arm = "aarch" in platform.machine().lower() 

47is64bit = sys.maxsize > 2**32 

48 

49has_refcount_gc = bool(cpython) 

50 

51dottedgetter = operator.attrgetter 

52 

53 

54# use sys.version_info to enable mypy version narrowing 

55if sys.version_info >= (3, 14): 

56 

57 import annotationlib 

58 from string.templatelib import Template as Template 

59 

60 def get_annotations(obj: Any) -> Mapping[str, Any]: 

61 return annotationlib.get_annotations( 

62 obj, format=annotationlib.Format.FORWARDREF 

63 ) 

64 

65else: 

66 

67 def get_annotations(obj: Any) -> Mapping[str, Any]: 

68 return inspect.get_annotations(obj) 

69 

70 class Template: 

71 """Minimal Template for Python < 3.14 (test usage only).""" 

72 

73 def __init__(self, *parts: Any): 

74 self._parts = parts 

75 

76 @property 

77 def strings(self) -> Tuple[str, ...]: 

78 return tuple(p for p in self._parts if isinstance(p, str)) 

79 

80 @property 

81 def interpolations(self) -> Tuple[Any, ...]: 

82 return tuple(p for p in self._parts if not isinstance(p, str)) 

83 

84 def __iter__(self) -> Any: 

85 return iter(self._parts) 

86 

87 

88class FullArgSpec(typing.NamedTuple): 

89 args: List[str] 

90 varargs: Optional[str] 

91 varkw: Optional[str] 

92 defaults: Optional[Tuple[Any, ...]] 

93 kwonlyargs: List[str] 

94 kwonlydefaults: Optional[Dict[str, Any]] 

95 annotations: Mapping[str, Any] 

96 

97 

98def inspect_getfullargspec(func: Callable[..., Any]) -> FullArgSpec: 

99 """Fully vendored version of getfullargspec from Python 3.3.""" 

100 

101 if inspect.ismethod(func): 

102 func = func.__func__ 

103 if not inspect.isfunction(func): 

104 raise TypeError(f"{func!r} is not a Python function") 

105 

106 co = func.__code__ 

107 if not inspect.iscode(co): 

108 raise TypeError(f"{co!r} is not a code object") 

109 

110 nargs = co.co_argcount 

111 names = co.co_varnames 

112 nkwargs = co.co_kwonlyargcount 

113 args = list(names[:nargs]) 

114 kwonlyargs = list(names[nargs : nargs + nkwargs]) 

115 

116 nargs += nkwargs 

117 varargs = None 

118 if co.co_flags & inspect.CO_VARARGS: 

119 varargs = co.co_varnames[nargs] 

120 nargs = nargs + 1 

121 varkw = None 

122 if co.co_flags & inspect.CO_VARKEYWORDS: 

123 varkw = co.co_varnames[nargs] 

124 

125 return FullArgSpec( 

126 args, 

127 varargs, 

128 varkw, 

129 func.__defaults__, 

130 kwonlyargs, 

131 func.__kwdefaults__, 

132 get_annotations(func), 

133 ) 

134 

135 

136# python stubs don't have a public type for this. not worth 

137# making a protocol 

138def md5_not_for_security() -> Any: 

139 return hashlib.md5(usedforsecurity=False) 

140 

141 

142def importlib_metadata_get(group): 

143 ep = importlib_metadata.entry_points() 

144 if typing.TYPE_CHECKING or hasattr(ep, "select"): 

145 return ep.select(group=group) 

146 else: 

147 return ep.get(group, ()) 

148 

149 

150def b(s): 

151 return s.encode("latin-1") 

152 

153 

154def b64decode(x: str) -> bytes: 

155 return base64.b64decode(x.encode("ascii")) 

156 

157 

158def b64encode(x: bytes) -> str: 

159 return base64.b64encode(x).decode("ascii") 

160 

161 

162def decode_backslashreplace(text: bytes, encoding: str) -> str: 

163 return text.decode(encoding, errors="backslashreplace") 

164 

165 

166def cmp(a, b): 

167 return (a > b) - (a < b) 

168 

169 

170def _formatannotation(annotation, base_module=None): 

171 """vendored from python 3.7""" 

172 

173 if isinstance(annotation, str): 

174 return annotation 

175 

176 if getattr(annotation, "__module__", None) == "typing": 

177 return repr(annotation).replace("typing.", "").replace("~", "") 

178 if isinstance(annotation, type): 

179 if annotation.__module__ in ("builtins", base_module): 

180 return repr(annotation.__qualname__) 

181 return annotation.__module__ + "." + annotation.__qualname__ 

182 elif isinstance(annotation, typing.TypeVar): 

183 return repr(annotation).replace("~", "") 

184 return repr(annotation).replace("~", "") 

185 

186 

187def inspect_formatargspec( 

188 args: List[str], 

189 varargs: Optional[str] = None, 

190 varkw: Optional[str] = None, 

191 defaults: Optional[Sequence[Any]] = None, 

192 kwonlyargs: Optional[Sequence[str]] = (), 

193 kwonlydefaults: Optional[Mapping[str, Any]] = {}, 

194 annotations: Mapping[str, Any] = {}, 

195 formatarg: Callable[[str], str] = str, 

196 formatvarargs: Callable[[str], str] = lambda name: "*" + name, 

197 formatvarkw: Callable[[str], str] = lambda name: "**" + name, 

198 formatvalue: Callable[[Any], str] = lambda value: "=" + repr(value), 

199 formatreturns: Callable[[Any], str] = lambda text: " -> " + str(text), 

200 formatannotation: Callable[[Any], str] = _formatannotation, 

201) -> str: 

202 """Copy formatargspec from python 3.7 standard library. 

203 

204 Python 3 has deprecated formatargspec and requested that Signature 

205 be used instead, however this requires a full reimplementation 

206 of formatargspec() in terms of creating Parameter objects and such. 

207 Instead of introducing all the object-creation overhead and having 

208 to reinvent from scratch, just copy their compatibility routine. 

209 

210 Ultimately we would need to rewrite our "decorator" routine completely 

211 which is not really worth it right now, until all Python 2.x support 

212 is dropped. 

213 

214 """ 

215 

216 kwonlydefaults = kwonlydefaults or {} 

217 annotations = annotations or {} 

218 

219 def formatargandannotation(arg): 

220 result = formatarg(arg) 

221 if arg in annotations: 

222 result += ": " + formatannotation(annotations[arg]) 

223 return result 

224 

225 specs = [] 

226 if defaults: 

227 firstdefault = len(args) - len(defaults) 

228 else: 

229 firstdefault = -1 

230 

231 for i, arg in enumerate(args): 

232 spec = formatargandannotation(arg) 

233 if defaults and i >= firstdefault: 

234 spec = spec + formatvalue(defaults[i - firstdefault]) 

235 specs.append(spec) 

236 

237 if varargs is not None: 

238 specs.append(formatvarargs(formatargandannotation(varargs))) 

239 else: 

240 if kwonlyargs: 

241 specs.append("*") 

242 

243 if kwonlyargs: 

244 for kwonlyarg in kwonlyargs: 

245 spec = formatargandannotation(kwonlyarg) 

246 if kwonlydefaults and kwonlyarg in kwonlydefaults: 

247 spec += formatvalue(kwonlydefaults[kwonlyarg]) 

248 specs.append(spec) 

249 

250 if varkw is not None: 

251 specs.append(formatvarkw(formatargandannotation(varkw))) 

252 

253 result = "(" + ", ".join(specs) + ")" 

254 if "return" in annotations: 

255 result += formatreturns(formatannotation(annotations["return"])) 

256 return result 

257 

258 

259def dataclass_fields(cls: Type[Any]) -> Iterable[dataclasses.Field[Any]]: 

260 """Return a sequence of all dataclasses.Field objects associated 

261 with a class as an already processed dataclass. 

262 

263 The class must **already be a dataclass** for Field objects to be returned. 

264 

265 """ 

266 

267 if dataclasses.is_dataclass(cls): 

268 return dataclasses.fields(cls) 

269 else: 

270 return [] 

271 

272 

273def local_dataclass_fields(cls: Type[Any]) -> Iterable[dataclasses.Field[Any]]: 

274 """Return a sequence of all dataclasses.Field objects associated with 

275 an already processed dataclass, excluding those that originate from a 

276 superclass. 

277 

278 The class must **already be a dataclass** for Field objects to be returned. 

279 

280 """ 

281 

282 if dataclasses.is_dataclass(cls): 

283 super_fields: Set[dataclasses.Field[Any]] = set() 

284 for sup in cls.__bases__: 

285 super_fields.update(dataclass_fields(sup)) 

286 return [f for f in dataclasses.fields(cls) if f not in super_fields] 

287 else: 

288 return [] 

289 

290 

291if freethreading: 

292 import threading 

293 

294 mini_gil = threading.RLock() 

295 """provide a threading.RLock() under python freethreading only""" 

296else: 

297 import contextlib 

298 

299 mini_gil = contextlib.nullcontext() # type: ignore[assignment]