Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/util/compat.py: 70%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

177 statements  

1# util/compat.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9"""Handle Python version/platform incompatibilities.""" 

10 

11from __future__ import annotations 

12 

13import base64 

14import dataclasses 

15import hashlib 

16import inspect 

17import operator 

18import platform 

19import sys 

20import sysconfig 

21import typing 

22from typing import Any 

23from typing import Callable 

24from typing import Dict 

25from typing import Iterable 

26from typing import List 

27from typing import Mapping 

28from typing import Optional 

29from typing import Sequence 

30from typing import Set 

31from typing import Tuple 

32from typing import Type 

33from typing import TypeVar 

34 

35 

36py314b1 = sys.version_info >= (3, 14, 0, "beta", 1) 

37py314 = sys.version_info >= (3, 14) 

38py313 = sys.version_info >= (3, 13) 

39py312 = sys.version_info >= (3, 12) 

40py311 = sys.version_info >= (3, 11) 

41py310 = sys.version_info >= (3, 10) 

42py39 = sys.version_info >= (3, 9) 

43py38 = sys.version_info >= (3, 8) 

44pypy = platform.python_implementation() == "PyPy" 

45cpython = platform.python_implementation() == "CPython" 

46freethreading = bool(sysconfig.get_config_var("Py_GIL_DISABLED")) 

47 

48win32 = sys.platform.startswith("win") 

49osx = sys.platform.startswith("darwin") 

50arm = "aarch" in platform.machine().lower() 

51is64bit = sys.maxsize > 2**32 

52 

53has_refcount_gc = bool(cpython) 

54 

55dottedgetter = operator.attrgetter 

56 

57_T_co = TypeVar("_T_co", covariant=True) 

58 

59 

60class FullArgSpec(typing.NamedTuple): 

61 args: List[str] 

62 varargs: Optional[str] 

63 varkw: Optional[str] 

64 defaults: Optional[Tuple[Any, ...]] 

65 kwonlyargs: List[str] 

66 kwonlydefaults: Optional[Dict[str, Any]] 

67 annotations: Dict[str, Any] 

68 

69 

70def inspect_getfullargspec(func: Callable[..., Any]) -> FullArgSpec: 

71 """Fully vendored version of getfullargspec from Python 3.3.""" 

72 

73 if inspect.ismethod(func): 

74 func = func.__func__ 

75 if not inspect.isfunction(func): 

76 raise TypeError(f"{func!r} is not a Python function") 

77 

78 co = func.__code__ 

79 if not inspect.iscode(co): 

80 raise TypeError(f"{co!r} is not a code object") 

81 

82 nargs = co.co_argcount 

83 names = co.co_varnames 

84 nkwargs = co.co_kwonlyargcount 

85 args = list(names[:nargs]) 

86 kwonlyargs = list(names[nargs : nargs + nkwargs]) 

87 

88 nargs += nkwargs 

89 varargs = None 

90 if co.co_flags & inspect.CO_VARARGS: 

91 varargs = co.co_varnames[nargs] 

92 nargs = nargs + 1 

93 varkw = None 

94 if co.co_flags & inspect.CO_VARKEYWORDS: 

95 varkw = co.co_varnames[nargs] 

96 

97 return FullArgSpec( 

98 args, 

99 varargs, 

100 varkw, 

101 func.__defaults__, 

102 kwonlyargs, 

103 func.__kwdefaults__, 

104 func.__annotations__, 

105 ) 

106 

107 

108if py39: 

109 # python stubs don't have a public type for this. not worth 

110 # making a protocol 

111 def md5_not_for_security() -> Any: 

112 return hashlib.md5(usedforsecurity=False) 

113 

114else: 

115 

116 def md5_not_for_security() -> Any: 

117 return hashlib.md5() 

118 

119 

120if typing.TYPE_CHECKING or py38: 

121 from importlib import metadata as importlib_metadata 

122else: 

123 import importlib_metadata # noqa 

124 

125 

126if typing.TYPE_CHECKING or py39: 

127 # pep 584 dict union 

128 dict_union = operator.or_ # noqa 

129else: 

130 

131 def dict_union(a: dict, b: dict) -> dict: 

132 a = a.copy() 

133 a.update(b) 

134 return a 

135 

136 

137if py310: 

138 anext_ = anext 

139else: 

140 _NOT_PROVIDED = object() 

141 from collections.abc import AsyncIterator 

142 

143 async def anext_(async_iterator, default=_NOT_PROVIDED): 

144 """vendored from https://github.com/python/cpython/pull/8895""" 

145 

146 if not isinstance(async_iterator, AsyncIterator): 

147 raise TypeError( 

148 f"anext expected an AsyncIterator, got {type(async_iterator)}" 

149 ) 

150 anxt = type(async_iterator).__anext__ 

151 try: 

152 return await anxt(async_iterator) 

153 except StopAsyncIteration: 

154 if default is _NOT_PROVIDED: 

155 raise 

156 return default 

157 

158 

159def importlib_metadata_get(group): 

160 ep = importlib_metadata.entry_points() 

161 if typing.TYPE_CHECKING or hasattr(ep, "select"): 

162 return ep.select(group=group) 

163 else: 

164 return ep.get(group, ()) 

165 

166 

167def b(s): 

168 return s.encode("latin-1") 

169 

170 

171def b64decode(x: str) -> bytes: 

172 return base64.b64decode(x.encode("ascii")) 

173 

174 

175def b64encode(x: bytes) -> str: 

176 return base64.b64encode(x).decode("ascii") 

177 

178 

179def decode_backslashreplace(text: bytes, encoding: str) -> str: 

180 return text.decode(encoding, errors="backslashreplace") 

181 

182 

183def cmp(a, b): 

184 return (a > b) - (a < b) 

185 

186 

187def _formatannotation(annotation, base_module=None): 

188 """vendored from python 3.7""" 

189 

190 if isinstance(annotation, str): 

191 return annotation 

192 

193 if getattr(annotation, "__module__", None) == "typing": 

194 return repr(annotation).replace("typing.", "").replace("~", "") 

195 if isinstance(annotation, type): 

196 if annotation.__module__ in ("builtins", base_module): 

197 return repr(annotation.__qualname__) 

198 return annotation.__module__ + "." + annotation.__qualname__ 

199 elif isinstance(annotation, typing.TypeVar): 

200 return repr(annotation).replace("~", "") 

201 return repr(annotation).replace("~", "") 

202 

203 

204def inspect_formatargspec( 

205 args: List[str], 

206 varargs: Optional[str] = None, 

207 varkw: Optional[str] = None, 

208 defaults: Optional[Sequence[Any]] = None, 

209 kwonlyargs: Optional[Sequence[str]] = (), 

210 kwonlydefaults: Optional[Mapping[str, Any]] = {}, 

211 annotations: Mapping[str, Any] = {}, 

212 formatarg: Callable[[str], str] = str, 

213 formatvarargs: Callable[[str], str] = lambda name: "*" + name, 

214 formatvarkw: Callable[[str], str] = lambda name: "**" + name, 

215 formatvalue: Callable[[Any], str] = lambda value: "=" + repr(value), 

216 formatreturns: Callable[[Any], str] = lambda text: " -> " + str(text), 

217 formatannotation: Callable[[Any], str] = _formatannotation, 

218) -> str: 

219 """Copy formatargspec from python 3.7 standard library. 

220 

221 Python 3 has deprecated formatargspec and requested that Signature 

222 be used instead, however this requires a full reimplementation 

223 of formatargspec() in terms of creating Parameter objects and such. 

224 Instead of introducing all the object-creation overhead and having 

225 to reinvent from scratch, just copy their compatibility routine. 

226 

227 Ultimately we would need to rewrite our "decorator" routine completely 

228 which is not really worth it right now, until all Python 2.x support 

229 is dropped. 

230 

231 """ 

232 

233 kwonlydefaults = kwonlydefaults or {} 

234 annotations = annotations or {} 

235 

236 def formatargandannotation(arg): 

237 result = formatarg(arg) 

238 if arg in annotations: 

239 result += ": " + formatannotation(annotations[arg]) 

240 return result 

241 

242 specs = [] 

243 if defaults: 

244 firstdefault = len(args) - len(defaults) 

245 else: 

246 firstdefault = -1 

247 

248 for i, arg in enumerate(args): 

249 spec = formatargandannotation(arg) 

250 if defaults and i >= firstdefault: 

251 spec = spec + formatvalue(defaults[i - firstdefault]) 

252 specs.append(spec) 

253 

254 if varargs is not None: 

255 specs.append(formatvarargs(formatargandannotation(varargs))) 

256 else: 

257 if kwonlyargs: 

258 specs.append("*") 

259 

260 if kwonlyargs: 

261 for kwonlyarg in kwonlyargs: 

262 spec = formatargandannotation(kwonlyarg) 

263 if kwonlydefaults and kwonlyarg in kwonlydefaults: 

264 spec += formatvalue(kwonlydefaults[kwonlyarg]) 

265 specs.append(spec) 

266 

267 if varkw is not None: 

268 specs.append(formatvarkw(formatargandannotation(varkw))) 

269 

270 result = "(" + ", ".join(specs) + ")" 

271 if "return" in annotations: 

272 result += formatreturns(formatannotation(annotations["return"])) 

273 return result 

274 

275 

276def dataclass_fields(cls: Type[Any]) -> Iterable[dataclasses.Field[Any]]: 

277 """Return a sequence of all dataclasses.Field objects associated 

278 with a class as an already processed dataclass. 

279 

280 The class must **already be a dataclass** for Field objects to be returned. 

281 

282 """ 

283 

284 if dataclasses.is_dataclass(cls): 

285 return dataclasses.fields(cls) 

286 else: 

287 return [] 

288 

289 

290def local_dataclass_fields(cls: Type[Any]) -> Iterable[dataclasses.Field[Any]]: 

291 """Return a sequence of all dataclasses.Field objects associated with 

292 an already processed dataclass, excluding those that originate from a 

293 superclass. 

294 

295 The class must **already be a dataclass** for Field objects to be returned. 

296 

297 """ 

298 

299 if dataclasses.is_dataclass(cls): 

300 super_fields: Set[dataclasses.Field[Any]] = set() 

301 for sup in cls.__bases__: 

302 super_fields.update(dataclass_fields(sup)) 

303 return [f for f in dataclasses.fields(cls) if f not in super_fields] 

304 else: 

305 return [] 

306 

307 

308if freethreading: 

309 import threading 

310 

311 mini_gil = threading.RLock() 

312 """provide a threading.RLock() under python freethreading only""" 

313else: 

314 import contextlib 

315 

316 mini_gil = contextlib.nullcontext() # type: ignore[assignment]