Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/util/compat.py: 56%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

232 statements  

1# util/compat.py 

2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9"""Handle Python version/platform incompatibilities.""" 

10 

11from __future__ import annotations 

12 

13import base64 

14import dataclasses 

15import hashlib 

16import inspect 

17import operator 

18import platform 

19import sys 

20import sysconfig 

21import typing 

22from typing import Any 

23from typing import Callable 

24from typing import cast 

25from typing import Dict 

26from typing import Iterable 

27from typing import List 

28from typing import Mapping 

29from typing import Optional 

30from typing import Sequence 

31from typing import Set 

32from typing import Tuple 

33from typing import Type 

34from typing import TypeVar 

35 

36py314b1 = sys.version_info >= (3, 14, 0, "beta", 1) 

37py314 = sys.version_info >= (3, 14) 

38py313 = sys.version_info >= (3, 13) 

39py312 = sys.version_info >= (3, 12) 

40py311 = sys.version_info >= (3, 11) 

41py310 = sys.version_info >= (3, 10) 

42py39 = sys.version_info >= (3, 9) 

43py38 = sys.version_info >= (3, 8) 

44pypy = platform.python_implementation() == "PyPy" 

45cpython = platform.python_implementation() == "CPython" 

46freethreading = bool(sysconfig.get_config_var("Py_GIL_DISABLED")) 

47 

48win32 = sys.platform.startswith("win") 

49osx = sys.platform.startswith("darwin") 

50arm = "aarch" in platform.machine().lower() 

51is64bit = sys.maxsize > 2**32 

52 

53has_refcount_gc = bool(cpython) 

54 

55dottedgetter = operator.attrgetter 

56 

57_T_co = TypeVar("_T_co", covariant=True) 

58 

59 

60if py314: 

61 # vendor a minimal form of get_annotations per 

62 # https://github.com/python/cpython/issues/133684#issuecomment-2863841891 

63 

64 from annotationlib import call_annotate_function # type: ignore[import-not-found,unused-ignore] # noqa: E501 

65 from annotationlib import Format 

66 

67 def _get_and_call_annotate(obj, format): # noqa: A002 

68 annotate = getattr(obj, "__annotate__", None) 

69 if annotate is not None: 

70 ann = call_annotate_function(annotate, format, owner=obj) 

71 if not isinstance(ann, dict): 

72 raise ValueError(f"{obj!r}.__annotate__ returned a non-dict") 

73 return ann 

74 return None 

75 

76 # this is ported from py3.13.0a7 

77 _BASE_GET_ANNOTATIONS = type.__dict__["__annotations__"].__get__ 

78 

79 def _get_dunder_annotations(obj): 

80 if isinstance(obj, type): 

81 try: 

82 ann = _BASE_GET_ANNOTATIONS(obj) 

83 except AttributeError: 

84 # For static types, the descriptor raises AttributeError. 

85 return {} 

86 else: 

87 ann = getattr(obj, "__annotations__", None) 

88 if ann is None: 

89 return {} 

90 

91 if not isinstance(ann, dict): 

92 raise ValueError( 

93 f"{obj!r}.__annotations__ is neither a dict nor None" 

94 ) 

95 return dict(ann) 

96 

97 def _vendored_get_annotations( 

98 obj: Any, *, format: Format # noqa: A002 

99 ) -> Mapping[str, Any]: 

100 """A sparse implementation of annotationlib.get_annotations()""" 

101 

102 try: 

103 ann = _get_dunder_annotations(obj) 

104 except Exception: 

105 pass 

106 else: 

107 if ann is not None: 

108 return dict(ann) 

109 

110 # But if __annotations__ threw a NameError, we try calling __annotate__ 

111 ann = _get_and_call_annotate(obj, format) 

112 if ann is None: 

113 # If that didn't work either, we have a very weird object: 

114 # evaluating 

115 # __annotations__ threw NameError and there is no __annotate__. 

116 # In that case, 

117 # we fall back to trying __annotations__ again. 

118 ann = _get_dunder_annotations(obj) 

119 

120 if ann is None: 

121 if isinstance(obj, type) or callable(obj): 

122 return {} 

123 raise TypeError(f"{obj!r} does not have annotations") 

124 

125 if not ann: 

126 return {} 

127 

128 return dict(ann) 

129 

130 def get_annotations(obj: Any) -> Mapping[str, Any]: 

131 # FORWARDREF has the effect of giving us ForwardRefs and not 

132 # actually trying to evaluate the annotations. We need this so 

133 # that the annotations act as much like 

134 # "from __future__ import annotations" as possible, which is going 

135 # away in future python as a separate mode 

136 return _vendored_get_annotations(obj, format=Format.FORWARDREF) 

137 

138elif py310: 

139 

140 def get_annotations(obj: Any) -> Mapping[str, Any]: 

141 return inspect.get_annotations(obj) 

142 

143else: 

144 

145 def get_annotations(obj: Any) -> Mapping[str, Any]: 

146 # it's been observed that cls.__annotations__ can be non present. 

147 # it's not clear what causes this, running under tox py37/38 it 

148 # happens, running straight pytest it doesnt 

149 

150 # https://docs.python.org/3/howto/annotations.html#annotations-howto 

151 if isinstance(obj, type): 

152 ann = obj.__dict__.get("__annotations__", None) 

153 else: 

154 ann = getattr(obj, "__annotations__", None) 

155 

156 from . import _collections 

157 

158 if ann is None: 

159 return _collections.EMPTY_DICT 

160 else: 

161 return cast("Mapping[str, Any]", ann) 

162 

163 

164class FullArgSpec(typing.NamedTuple): 

165 args: List[str] 

166 varargs: Optional[str] 

167 varkw: Optional[str] 

168 defaults: Optional[Tuple[Any, ...]] 

169 kwonlyargs: List[str] 

170 kwonlydefaults: Optional[Dict[str, Any]] 

171 annotations: Mapping[str, Any] 

172 

173 

174def inspect_getfullargspec(func: Callable[..., Any]) -> FullArgSpec: 

175 """Fully vendored version of getfullargspec from Python 3.3.""" 

176 

177 if inspect.ismethod(func): 

178 func = func.__func__ 

179 if not inspect.isfunction(func) and not hasattr(func, "__code__"): 

180 raise TypeError(f"{func!r} is not a Python function") 

181 

182 co = func.__code__ 

183 if not inspect.iscode(co): 

184 raise TypeError(f"{co!r} is not a code object") 

185 

186 nargs = co.co_argcount 

187 names = co.co_varnames 

188 nkwargs = co.co_kwonlyargcount 

189 args = list(names[:nargs]) 

190 kwonlyargs = list(names[nargs : nargs + nkwargs]) 

191 

192 nargs += nkwargs 

193 varargs = None 

194 if co.co_flags & inspect.CO_VARARGS: 

195 varargs = co.co_varnames[nargs] 

196 nargs = nargs + 1 

197 varkw = None 

198 if co.co_flags & inspect.CO_VARKEYWORDS: 

199 varkw = co.co_varnames[nargs] 

200 

201 return FullArgSpec( 

202 args, 

203 varargs, 

204 varkw, 

205 func.__defaults__, 

206 kwonlyargs, 

207 func.__kwdefaults__, 

208 get_annotations(func), 

209 ) 

210 

211 

212if py39: 

213 # python stubs don't have a public type for this. not worth 

214 # making a protocol 

215 def md5_not_for_security() -> Any: 

216 return hashlib.md5(usedforsecurity=False) 

217 

218else: 

219 

220 def md5_not_for_security() -> Any: 

221 return hashlib.md5() 

222 

223 

224if typing.TYPE_CHECKING or py38: 

225 from importlib import metadata as importlib_metadata 

226else: 

227 import importlib_metadata # noqa 

228 

229 

230if typing.TYPE_CHECKING or py39: 

231 # pep 584 dict union 

232 dict_union = operator.or_ # noqa 

233else: 

234 

235 def dict_union(a: dict, b: dict) -> dict: 

236 a = a.copy() 

237 a.update(b) 

238 return a 

239 

240 

241if py310: 

242 anext_ = anext 

243else: 

244 _NOT_PROVIDED = object() 

245 from collections.abc import AsyncIterator 

246 

247 async def anext_(async_iterator, default=_NOT_PROVIDED): 

248 """vendored from https://github.com/python/cpython/pull/8895""" 

249 

250 if not isinstance(async_iterator, AsyncIterator): 

251 raise TypeError( 

252 f"anext expected an AsyncIterator, got {type(async_iterator)}" 

253 ) 

254 anxt = type(async_iterator).__anext__ 

255 try: 

256 return await anxt(async_iterator) 

257 except StopAsyncIteration: 

258 if default is _NOT_PROVIDED: 

259 raise 

260 return default 

261 

262 

263def importlib_metadata_get(group): 

264 ep = importlib_metadata.entry_points() 

265 if typing.TYPE_CHECKING or hasattr(ep, "select"): 

266 return ep.select(group=group) 

267 else: 

268 return ep.get(group, ()) 

269 

270 

271def b(s): 

272 return s.encode("latin-1") 

273 

274 

275def b64decode(x: str) -> bytes: 

276 return base64.b64decode(x.encode("ascii")) 

277 

278 

279def b64encode(x: bytes) -> str: 

280 return base64.b64encode(x).decode("ascii") 

281 

282 

283def decode_backslashreplace(text: bytes, encoding: str) -> str: 

284 return text.decode(encoding, errors="backslashreplace") 

285 

286 

287def cmp(a, b): 

288 return (a > b) - (a < b) 

289 

290 

291def _formatannotation(annotation, base_module=None): 

292 """vendored from python 3.7""" 

293 

294 if isinstance(annotation, str): 

295 return annotation 

296 

297 if getattr(annotation, "__module__", None) == "typing": 

298 return repr(annotation).replace("typing.", "").replace("~", "") 

299 if isinstance(annotation, type): 

300 if annotation.__module__ in ("builtins", base_module): 

301 return repr(annotation.__qualname__) 

302 return annotation.__module__ + "." + annotation.__qualname__ 

303 elif isinstance(annotation, typing.TypeVar): 

304 return repr(annotation).replace("~", "") 

305 return repr(annotation).replace("~", "") 

306 

307 

308def inspect_formatargspec( 

309 args: List[str], 

310 varargs: Optional[str] = None, 

311 varkw: Optional[str] = None, 

312 defaults: Optional[Sequence[Any]] = None, 

313 kwonlyargs: Optional[Sequence[str]] = (), 

314 kwonlydefaults: Optional[Mapping[str, Any]] = {}, 

315 annotations: Mapping[str, Any] = {}, 

316 formatarg: Callable[[str], str] = str, 

317 formatvarargs: Callable[[str], str] = lambda name: "*" + name, 

318 formatvarkw: Callable[[str], str] = lambda name: "**" + name, 

319 formatvalue: Callable[[Any], str] = lambda value: "=" + repr(value), 

320 formatreturns: Callable[[Any], str] = lambda text: " -> " + str(text), 

321 formatannotation: Callable[[Any], str] = _formatannotation, 

322) -> str: 

323 """Copy formatargspec from python 3.7 standard library. 

324 

325 Python 3 has deprecated formatargspec and requested that Signature 

326 be used instead, however this requires a full reimplementation 

327 of formatargspec() in terms of creating Parameter objects and such. 

328 Instead of introducing all the object-creation overhead and having 

329 to reinvent from scratch, just copy their compatibility routine. 

330 

331 Ultimately we would need to rewrite our "decorator" routine completely 

332 which is not really worth it right now, until all Python 2.x support 

333 is dropped. 

334 

335 """ 

336 

337 kwonlydefaults = kwonlydefaults or {} 

338 annotations = annotations or {} 

339 

340 def formatargandannotation(arg): 

341 result = formatarg(arg) 

342 if arg in annotations: 

343 result += ": " + formatannotation(annotations[arg]) 

344 return result 

345 

346 specs = [] 

347 if defaults: 

348 firstdefault = len(args) - len(defaults) 

349 else: 

350 firstdefault = -1 

351 

352 for i, arg in enumerate(args): 

353 spec = formatargandannotation(arg) 

354 if defaults and i >= firstdefault: 

355 spec = spec + formatvalue(defaults[i - firstdefault]) 

356 specs.append(spec) 

357 

358 if varargs is not None: 

359 specs.append(formatvarargs(formatargandannotation(varargs))) 

360 else: 

361 if kwonlyargs: 

362 specs.append("*") 

363 

364 if kwonlyargs: 

365 for kwonlyarg in kwonlyargs: 

366 spec = formatargandannotation(kwonlyarg) 

367 if kwonlydefaults and kwonlyarg in kwonlydefaults: 

368 spec += formatvalue(kwonlydefaults[kwonlyarg]) 

369 specs.append(spec) 

370 

371 if varkw is not None: 

372 specs.append(formatvarkw(formatargandannotation(varkw))) 

373 

374 result = "(" + ", ".join(specs) + ")" 

375 if "return" in annotations: 

376 result += formatreturns(formatannotation(annotations["return"])) 

377 return result 

378 

379 

380def dataclass_fields(cls: Type[Any]) -> Iterable[dataclasses.Field[Any]]: 

381 """Return a sequence of all dataclasses.Field objects associated 

382 with a class as an already processed dataclass. 

383 

384 The class must **already be a dataclass** for Field objects to be returned. 

385 

386 """ 

387 

388 if dataclasses.is_dataclass(cls): 

389 return dataclasses.fields(cls) 

390 else: 

391 return [] 

392 

393 

394def local_dataclass_fields(cls: Type[Any]) -> Iterable[dataclasses.Field[Any]]: 

395 """Return a sequence of all dataclasses.Field objects associated with 

396 an already processed dataclass, excluding those that originate from a 

397 superclass. 

398 

399 The class must **already be a dataclass** for Field objects to be returned. 

400 

401 """ 

402 

403 if dataclasses.is_dataclass(cls): 

404 super_fields: Set[dataclasses.Field[Any]] = set() 

405 for sup in cls.__bases__: 

406 super_fields.update(dataclass_fields(sup)) 

407 return [f for f in dataclasses.fields(cls) if f not in super_fields] 

408 else: 

409 return [] 

410 

411 

412if freethreading: 

413 import threading 

414 

415 mini_gil = threading.RLock() 

416 """provide a threading.RLock() under python freethreading only""" 

417else: 

418 import contextlib 

419 

420 mini_gil = contextlib.nullcontext() # type: ignore[assignment]