Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/util/compat.py: 56%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

232 statements  

1# util/compat.py 

2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9"""Handle Python version/platform incompatibilities.""" 

10 

11from __future__ import annotations 

12 

13import base64 

14import dataclasses 

15import hashlib 

16import inspect 

17import operator 

18import platform 

19import sys 

20import sysconfig 

21import typing 

22from typing import Any 

23from typing import Callable 

24from typing import cast 

25from typing import Dict 

26from typing import Iterable 

27from typing import List 

28from typing import Mapping 

29from typing import Optional 

30from typing import Sequence 

31from typing import Set 

32from typing import Tuple 

33from typing import Type 

34from typing import TypeVar 

35 

36 

37py314b1 = sys.version_info >= (3, 14, 0, "beta", 1) 

38py314 = sys.version_info >= (3, 14) 

39py313 = sys.version_info >= (3, 13) 

40py312 = sys.version_info >= (3, 12) 

41py311 = sys.version_info >= (3, 11) 

42py310 = sys.version_info >= (3, 10) 

43py39 = sys.version_info >= (3, 9) 

44py38 = sys.version_info >= (3, 8) 

45pypy = platform.python_implementation() == "PyPy" 

46cpython = platform.python_implementation() == "CPython" 

47freethreading = bool(sysconfig.get_config_var("Py_GIL_DISABLED")) 

48 

49win32 = sys.platform.startswith("win") 

50osx = sys.platform.startswith("darwin") 

51arm = "aarch" in platform.machine().lower() 

52is64bit = sys.maxsize > 2**32 

53 

54has_refcount_gc = bool(cpython) 

55 

56dottedgetter = operator.attrgetter 

57 

58_T_co = TypeVar("_T_co", covariant=True) 

59 

60 

61if py314: 

62 # vendor a minimal form of get_annotations per 

63 # https://github.com/python/cpython/issues/133684#issuecomment-2863841891 

64 

65 from annotationlib import call_annotate_function # type: ignore[import-not-found,unused-ignore] # noqa: E501 

66 from annotationlib import Format 

67 

68 def _get_and_call_annotate(obj, format): # noqa: A002 

69 annotate = getattr(obj, "__annotate__", None) 

70 if annotate is not None: 

71 ann = call_annotate_function(annotate, format, owner=obj) 

72 if not isinstance(ann, dict): 

73 raise ValueError(f"{obj!r}.__annotate__ returned a non-dict") 

74 return ann 

75 return None 

76 

77 # this is ported from py3.13.0a7 

78 _BASE_GET_ANNOTATIONS = type.__dict__["__annotations__"].__get__ 

79 

80 def _get_dunder_annotations(obj): 

81 if isinstance(obj, type): 

82 try: 

83 ann = _BASE_GET_ANNOTATIONS(obj) 

84 except AttributeError: 

85 # For static types, the descriptor raises AttributeError. 

86 return {} 

87 else: 

88 ann = getattr(obj, "__annotations__", None) 

89 if ann is None: 

90 return {} 

91 

92 if not isinstance(ann, dict): 

93 raise ValueError( 

94 f"{obj!r}.__annotations__ is neither a dict nor None" 

95 ) 

96 return dict(ann) 

97 

98 def _vendored_get_annotations( 

99 obj: Any, *, format: Format # noqa: A002 

100 ) -> Mapping[str, Any]: 

101 """A sparse implementation of annotationlib.get_annotations()""" 

102 

103 try: 

104 ann = _get_dunder_annotations(obj) 

105 except Exception: 

106 pass 

107 else: 

108 if ann is not None: 

109 return dict(ann) 

110 

111 # But if __annotations__ threw a NameError, we try calling __annotate__ 

112 ann = _get_and_call_annotate(obj, format) 

113 if ann is None: 

114 # If that didn't work either, we have a very weird object: 

115 # evaluating 

116 # __annotations__ threw NameError and there is no __annotate__. 

117 # In that case, 

118 # we fall back to trying __annotations__ again. 

119 ann = _get_dunder_annotations(obj) 

120 

121 if ann is None: 

122 if isinstance(obj, type) or callable(obj): 

123 return {} 

124 raise TypeError(f"{obj!r} does not have annotations") 

125 

126 if not ann: 

127 return {} 

128 

129 return dict(ann) 

130 

131 def get_annotations(obj: Any) -> Mapping[str, Any]: 

132 # FORWARDREF has the effect of giving us ForwardRefs and not 

133 # actually trying to evaluate the annotations. We need this so 

134 # that the annotations act as much like 

135 # "from __future__ import annotations" as possible, which is going 

136 # away in future python as a separate mode 

137 return _vendored_get_annotations(obj, format=Format.FORWARDREF) 

138 

139elif py310: 

140 

141 def get_annotations(obj: Any) -> Mapping[str, Any]: 

142 return inspect.get_annotations(obj) 

143 

144else: 

145 

146 def get_annotations(obj: Any) -> Mapping[str, Any]: 

147 # it's been observed that cls.__annotations__ can be non present. 

148 # it's not clear what causes this, running under tox py37/38 it 

149 # happens, running straight pytest it doesnt 

150 

151 # https://docs.python.org/3/howto/annotations.html#annotations-howto 

152 if isinstance(obj, type): 

153 ann = obj.__dict__.get("__annotations__", None) 

154 else: 

155 ann = getattr(obj, "__annotations__", None) 

156 

157 from . import _collections 

158 

159 if ann is None: 

160 return _collections.EMPTY_DICT 

161 else: 

162 return cast("Mapping[str, Any]", ann) 

163 

164 

165class FullArgSpec(typing.NamedTuple): 

166 args: List[str] 

167 varargs: Optional[str] 

168 varkw: Optional[str] 

169 defaults: Optional[Tuple[Any, ...]] 

170 kwonlyargs: List[str] 

171 kwonlydefaults: Optional[Dict[str, Any]] 

172 annotations: Mapping[str, Any] 

173 

174 

175def inspect_getfullargspec(func: Callable[..., Any]) -> FullArgSpec: 

176 """Fully vendored version of getfullargspec from Python 3.3.""" 

177 

178 if inspect.ismethod(func): 

179 func = func.__func__ 

180 if not inspect.isfunction(func): 

181 raise TypeError(f"{func!r} is not a Python function") 

182 

183 co = func.__code__ 

184 if not inspect.iscode(co): 

185 raise TypeError(f"{co!r} is not a code object") 

186 

187 nargs = co.co_argcount 

188 names = co.co_varnames 

189 nkwargs = co.co_kwonlyargcount 

190 args = list(names[:nargs]) 

191 kwonlyargs = list(names[nargs : nargs + nkwargs]) 

192 

193 nargs += nkwargs 

194 varargs = None 

195 if co.co_flags & inspect.CO_VARARGS: 

196 varargs = co.co_varnames[nargs] 

197 nargs = nargs + 1 

198 varkw = None 

199 if co.co_flags & inspect.CO_VARKEYWORDS: 

200 varkw = co.co_varnames[nargs] 

201 

202 return FullArgSpec( 

203 args, 

204 varargs, 

205 varkw, 

206 func.__defaults__, 

207 kwonlyargs, 

208 func.__kwdefaults__, 

209 get_annotations(func), 

210 ) 

211 

212 

213if py39: 

214 # python stubs don't have a public type for this. not worth 

215 # making a protocol 

216 def md5_not_for_security() -> Any: 

217 return hashlib.md5(usedforsecurity=False) 

218 

219else: 

220 

221 def md5_not_for_security() -> Any: 

222 return hashlib.md5() 

223 

224 

225if typing.TYPE_CHECKING or py38: 

226 from importlib import metadata as importlib_metadata 

227else: 

228 import importlib_metadata # noqa 

229 

230 

231if typing.TYPE_CHECKING or py39: 

232 # pep 584 dict union 

233 dict_union = operator.or_ # noqa 

234else: 

235 

236 def dict_union(a: dict, b: dict) -> dict: 

237 a = a.copy() 

238 a.update(b) 

239 return a 

240 

241 

242if py310: 

243 anext_ = anext 

244else: 

245 _NOT_PROVIDED = object() 

246 from collections.abc import AsyncIterator 

247 

248 async def anext_(async_iterator, default=_NOT_PROVIDED): 

249 """vendored from https://github.com/python/cpython/pull/8895""" 

250 

251 if not isinstance(async_iterator, AsyncIterator): 

252 raise TypeError( 

253 f"anext expected an AsyncIterator, got {type(async_iterator)}" 

254 ) 

255 anxt = type(async_iterator).__anext__ 

256 try: 

257 return await anxt(async_iterator) 

258 except StopAsyncIteration: 

259 if default is _NOT_PROVIDED: 

260 raise 

261 return default 

262 

263 

264def importlib_metadata_get(group): 

265 ep = importlib_metadata.entry_points() 

266 if typing.TYPE_CHECKING or hasattr(ep, "select"): 

267 return ep.select(group=group) 

268 else: 

269 return ep.get(group, ()) 

270 

271 

272def b(s): 

273 return s.encode("latin-1") 

274 

275 

276def b64decode(x: str) -> bytes: 

277 return base64.b64decode(x.encode("ascii")) 

278 

279 

280def b64encode(x: bytes) -> str: 

281 return base64.b64encode(x).decode("ascii") 

282 

283 

284def decode_backslashreplace(text: bytes, encoding: str) -> str: 

285 return text.decode(encoding, errors="backslashreplace") 

286 

287 

288def cmp(a, b): 

289 return (a > b) - (a < b) 

290 

291 

292def _formatannotation(annotation, base_module=None): 

293 """vendored from python 3.7""" 

294 

295 if isinstance(annotation, str): 

296 return annotation 

297 

298 if getattr(annotation, "__module__", None) == "typing": 

299 return repr(annotation).replace("typing.", "").replace("~", "") 

300 if isinstance(annotation, type): 

301 if annotation.__module__ in ("builtins", base_module): 

302 return repr(annotation.__qualname__) 

303 return annotation.__module__ + "." + annotation.__qualname__ 

304 elif isinstance(annotation, typing.TypeVar): 

305 return repr(annotation).replace("~", "") 

306 return repr(annotation).replace("~", "") 

307 

308 

309def inspect_formatargspec( 

310 args: List[str], 

311 varargs: Optional[str] = None, 

312 varkw: Optional[str] = None, 

313 defaults: Optional[Sequence[Any]] = None, 

314 kwonlyargs: Optional[Sequence[str]] = (), 

315 kwonlydefaults: Optional[Mapping[str, Any]] = {}, 

316 annotations: Mapping[str, Any] = {}, 

317 formatarg: Callable[[str], str] = str, 

318 formatvarargs: Callable[[str], str] = lambda name: "*" + name, 

319 formatvarkw: Callable[[str], str] = lambda name: "**" + name, 

320 formatvalue: Callable[[Any], str] = lambda value: "=" + repr(value), 

321 formatreturns: Callable[[Any], str] = lambda text: " -> " + str(text), 

322 formatannotation: Callable[[Any], str] = _formatannotation, 

323) -> str: 

324 """Copy formatargspec from python 3.7 standard library. 

325 

326 Python 3 has deprecated formatargspec and requested that Signature 

327 be used instead, however this requires a full reimplementation 

328 of formatargspec() in terms of creating Parameter objects and such. 

329 Instead of introducing all the object-creation overhead and having 

330 to reinvent from scratch, just copy their compatibility routine. 

331 

332 Ultimately we would need to rewrite our "decorator" routine completely 

333 which is not really worth it right now, until all Python 2.x support 

334 is dropped. 

335 

336 """ 

337 

338 kwonlydefaults = kwonlydefaults or {} 

339 annotations = annotations or {} 

340 

341 def formatargandannotation(arg): 

342 result = formatarg(arg) 

343 if arg in annotations: 

344 result += ": " + formatannotation(annotations[arg]) 

345 return result 

346 

347 specs = [] 

348 if defaults: 

349 firstdefault = len(args) - len(defaults) 

350 else: 

351 firstdefault = -1 

352 

353 for i, arg in enumerate(args): 

354 spec = formatargandannotation(arg) 

355 if defaults and i >= firstdefault: 

356 spec = spec + formatvalue(defaults[i - firstdefault]) 

357 specs.append(spec) 

358 

359 if varargs is not None: 

360 specs.append(formatvarargs(formatargandannotation(varargs))) 

361 else: 

362 if kwonlyargs: 

363 specs.append("*") 

364 

365 if kwonlyargs: 

366 for kwonlyarg in kwonlyargs: 

367 spec = formatargandannotation(kwonlyarg) 

368 if kwonlydefaults and kwonlyarg in kwonlydefaults: 

369 spec += formatvalue(kwonlydefaults[kwonlyarg]) 

370 specs.append(spec) 

371 

372 if varkw is not None: 

373 specs.append(formatvarkw(formatargandannotation(varkw))) 

374 

375 result = "(" + ", ".join(specs) + ")" 

376 if "return" in annotations: 

377 result += formatreturns(formatannotation(annotations["return"])) 

378 return result 

379 

380 

381def dataclass_fields(cls: Type[Any]) -> Iterable[dataclasses.Field[Any]]: 

382 """Return a sequence of all dataclasses.Field objects associated 

383 with a class as an already processed dataclass. 

384 

385 The class must **already be a dataclass** for Field objects to be returned. 

386 

387 """ 

388 

389 if dataclasses.is_dataclass(cls): 

390 return dataclasses.fields(cls) 

391 else: 

392 return [] 

393 

394 

395def local_dataclass_fields(cls: Type[Any]) -> Iterable[dataclasses.Field[Any]]: 

396 """Return a sequence of all dataclasses.Field objects associated with 

397 an already processed dataclass, excluding those that originate from a 

398 superclass. 

399 

400 The class must **already be a dataclass** for Field objects to be returned. 

401 

402 """ 

403 

404 if dataclasses.is_dataclass(cls): 

405 super_fields: Set[dataclasses.Field[Any]] = set() 

406 for sup in cls.__bases__: 

407 super_fields.update(dataclass_fields(sup)) 

408 return [f for f in dataclasses.fields(cls) if f not in super_fields] 

409 else: 

410 return [] 

411 

412 

413if freethreading: 

414 import threading 

415 

416 mini_gil = threading.RLock() 

417 """provide a threading.RLock() under python freethreading only""" 

418else: 

419 import contextlib 

420 

421 mini_gil = contextlib.nullcontext() # type: ignore[assignment]