Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/util/compat.py: 70%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

169 statements  

1# util/compat.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9"""Handle Python version/platform incompatibilities.""" 

10 

11from __future__ import annotations 

12 

13import base64 

14import dataclasses 

15import hashlib 

16import inspect 

17import operator 

18import platform 

19import sys 

20import typing 

21from typing import Any 

22from typing import Callable 

23from typing import Dict 

24from typing import Iterable 

25from typing import List 

26from typing import Mapping 

27from typing import Optional 

28from typing import Sequence 

29from typing import Set 

30from typing import Tuple 

31from typing import Type 

32from typing import TypeVar 

33 

34 

35py314b1 = sys.version_info >= (3, 14, 0, "beta", 1) 

36py314 = sys.version_info >= (3, 14) 

37py313 = sys.version_info >= (3, 13) 

38py312 = sys.version_info >= (3, 12) 

39py311 = sys.version_info >= (3, 11) 

40py310 = sys.version_info >= (3, 10) 

41py39 = sys.version_info >= (3, 9) 

42py38 = sys.version_info >= (3, 8) 

43pypy = platform.python_implementation() == "PyPy" 

44cpython = platform.python_implementation() == "CPython" 

45 

46win32 = sys.platform.startswith("win") 

47osx = sys.platform.startswith("darwin") 

48arm = "aarch" in platform.machine().lower() 

49is64bit = sys.maxsize > 2**32 

50 

51has_refcount_gc = bool(cpython) 

52 

53dottedgetter = operator.attrgetter 

54 

55_T_co = TypeVar("_T_co", covariant=True) 

56 

57 

58class FullArgSpec(typing.NamedTuple): 

59 args: List[str] 

60 varargs: Optional[str] 

61 varkw: Optional[str] 

62 defaults: Optional[Tuple[Any, ...]] 

63 kwonlyargs: List[str] 

64 kwonlydefaults: Optional[Dict[str, Any]] 

65 annotations: Dict[str, Any] 

66 

67 

68def inspect_getfullargspec(func: Callable[..., Any]) -> FullArgSpec: 

69 """Fully vendored version of getfullargspec from Python 3.3.""" 

70 

71 if inspect.ismethod(func): 

72 func = func.__func__ 

73 if not inspect.isfunction(func): 

74 raise TypeError(f"{func!r} is not a Python function") 

75 

76 co = func.__code__ 

77 if not inspect.iscode(co): 

78 raise TypeError(f"{co!r} is not a code object") 

79 

80 nargs = co.co_argcount 

81 names = co.co_varnames 

82 nkwargs = co.co_kwonlyargcount 

83 args = list(names[:nargs]) 

84 kwonlyargs = list(names[nargs : nargs + nkwargs]) 

85 

86 nargs += nkwargs 

87 varargs = None 

88 if co.co_flags & inspect.CO_VARARGS: 

89 varargs = co.co_varnames[nargs] 

90 nargs = nargs + 1 

91 varkw = None 

92 if co.co_flags & inspect.CO_VARKEYWORDS: 

93 varkw = co.co_varnames[nargs] 

94 

95 return FullArgSpec( 

96 args, 

97 varargs, 

98 varkw, 

99 func.__defaults__, 

100 kwonlyargs, 

101 func.__kwdefaults__, 

102 func.__annotations__, 

103 ) 

104 

105 

106if py39: 

107 # python stubs don't have a public type for this. not worth 

108 # making a protocol 

109 def md5_not_for_security() -> Any: 

110 return hashlib.md5(usedforsecurity=False) 

111 

112else: 

113 

114 def md5_not_for_security() -> Any: 

115 return hashlib.md5() 

116 

117 

118if typing.TYPE_CHECKING or py38: 

119 from importlib import metadata as importlib_metadata 

120else: 

121 import importlib_metadata # noqa 

122 

123 

124if typing.TYPE_CHECKING or py39: 

125 # pep 584 dict union 

126 dict_union = operator.or_ # noqa 

127else: 

128 

129 def dict_union(a: dict, b: dict) -> dict: 

130 a = a.copy() 

131 a.update(b) 

132 return a 

133 

134 

135if py310: 

136 anext_ = anext 

137else: 

138 _NOT_PROVIDED = object() 

139 from collections.abc import AsyncIterator 

140 

141 async def anext_(async_iterator, default=_NOT_PROVIDED): 

142 """vendored from https://github.com/python/cpython/pull/8895""" 

143 

144 if not isinstance(async_iterator, AsyncIterator): 

145 raise TypeError( 

146 f"anext expected an AsyncIterator, got {type(async_iterator)}" 

147 ) 

148 anxt = type(async_iterator).__anext__ 

149 try: 

150 return await anxt(async_iterator) 

151 except StopAsyncIteration: 

152 if default is _NOT_PROVIDED: 

153 raise 

154 return default 

155 

156 

157def importlib_metadata_get(group): 

158 ep = importlib_metadata.entry_points() 

159 if typing.TYPE_CHECKING or hasattr(ep, "select"): 

160 return ep.select(group=group) 

161 else: 

162 return ep.get(group, ()) 

163 

164 

165def b(s): 

166 return s.encode("latin-1") 

167 

168 

169def b64decode(x: str) -> bytes: 

170 return base64.b64decode(x.encode("ascii")) 

171 

172 

173def b64encode(x: bytes) -> str: 

174 return base64.b64encode(x).decode("ascii") 

175 

176 

177def decode_backslashreplace(text: bytes, encoding: str) -> str: 

178 return text.decode(encoding, errors="backslashreplace") 

179 

180 

181def cmp(a, b): 

182 return (a > b) - (a < b) 

183 

184 

185def _formatannotation(annotation, base_module=None): 

186 """vendored from python 3.7""" 

187 

188 if isinstance(annotation, str): 

189 return annotation 

190 

191 if getattr(annotation, "__module__", None) == "typing": 

192 return repr(annotation).replace("typing.", "").replace("~", "") 

193 if isinstance(annotation, type): 

194 if annotation.__module__ in ("builtins", base_module): 

195 return repr(annotation.__qualname__) 

196 return annotation.__module__ + "." + annotation.__qualname__ 

197 elif isinstance(annotation, typing.TypeVar): 

198 return repr(annotation).replace("~", "") 

199 return repr(annotation).replace("~", "") 

200 

201 

202def inspect_formatargspec( 

203 args: List[str], 

204 varargs: Optional[str] = None, 

205 varkw: Optional[str] = None, 

206 defaults: Optional[Sequence[Any]] = None, 

207 kwonlyargs: Optional[Sequence[str]] = (), 

208 kwonlydefaults: Optional[Mapping[str, Any]] = {}, 

209 annotations: Mapping[str, Any] = {}, 

210 formatarg: Callable[[str], str] = str, 

211 formatvarargs: Callable[[str], str] = lambda name: "*" + name, 

212 formatvarkw: Callable[[str], str] = lambda name: "**" + name, 

213 formatvalue: Callable[[Any], str] = lambda value: "=" + repr(value), 

214 formatreturns: Callable[[Any], str] = lambda text: " -> " + str(text), 

215 formatannotation: Callable[[Any], str] = _formatannotation, 

216) -> str: 

217 """Copy formatargspec from python 3.7 standard library. 

218 

219 Python 3 has deprecated formatargspec and requested that Signature 

220 be used instead, however this requires a full reimplementation 

221 of formatargspec() in terms of creating Parameter objects and such. 

222 Instead of introducing all the object-creation overhead and having 

223 to reinvent from scratch, just copy their compatibility routine. 

224 

225 Ultimately we would need to rewrite our "decorator" routine completely 

226 which is not really worth it right now, until all Python 2.x support 

227 is dropped. 

228 

229 """ 

230 

231 kwonlydefaults = kwonlydefaults or {} 

232 annotations = annotations or {} 

233 

234 def formatargandannotation(arg): 

235 result = formatarg(arg) 

236 if arg in annotations: 

237 result += ": " + formatannotation(annotations[arg]) 

238 return result 

239 

240 specs = [] 

241 if defaults: 

242 firstdefault = len(args) - len(defaults) 

243 else: 

244 firstdefault = -1 

245 

246 for i, arg in enumerate(args): 

247 spec = formatargandannotation(arg) 

248 if defaults and i >= firstdefault: 

249 spec = spec + formatvalue(defaults[i - firstdefault]) 

250 specs.append(spec) 

251 

252 if varargs is not None: 

253 specs.append(formatvarargs(formatargandannotation(varargs))) 

254 else: 

255 if kwonlyargs: 

256 specs.append("*") 

257 

258 if kwonlyargs: 

259 for kwonlyarg in kwonlyargs: 

260 spec = formatargandannotation(kwonlyarg) 

261 if kwonlydefaults and kwonlyarg in kwonlydefaults: 

262 spec += formatvalue(kwonlydefaults[kwonlyarg]) 

263 specs.append(spec) 

264 

265 if varkw is not None: 

266 specs.append(formatvarkw(formatargandannotation(varkw))) 

267 

268 result = "(" + ", ".join(specs) + ")" 

269 if "return" in annotations: 

270 result += formatreturns(formatannotation(annotations["return"])) 

271 return result 

272 

273 

274def dataclass_fields(cls: Type[Any]) -> Iterable[dataclasses.Field[Any]]: 

275 """Return a sequence of all dataclasses.Field objects associated 

276 with a class as an already processed dataclass. 

277 

278 The class must **already be a dataclass** for Field objects to be returned. 

279 

280 """ 

281 

282 if dataclasses.is_dataclass(cls): 

283 return dataclasses.fields(cls) 

284 else: 

285 return [] 

286 

287 

288def local_dataclass_fields(cls: Type[Any]) -> Iterable[dataclasses.Field[Any]]: 

289 """Return a sequence of all dataclasses.Field objects associated with 

290 an already processed dataclass, excluding those that originate from a 

291 superclass. 

292 

293 The class must **already be a dataclass** for Field objects to be returned. 

294 

295 """ 

296 

297 if dataclasses.is_dataclass(cls): 

298 super_fields: Set[dataclasses.Field[Any]] = set() 

299 for sup in cls.__bases__: 

300 super_fields.update(dataclass_fields(sup)) 

301 return [f for f in dataclasses.fields(cls) if f not in super_fields] 

302 else: 

303 return []