Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/util/compat.py: 73%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

161 statements  

1# util/compat.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9"""Handle Python version/platform incompatibilities.""" 

10 

11from __future__ import annotations 

12 

13import base64 

14import dataclasses 

15import hashlib 

16from importlib import metadata as importlib_metadata 

17import inspect 

18import operator 

19import platform 

20import sys 

21import sysconfig 

22import typing 

23from typing import Any 

24from typing import Callable 

25from typing import Dict 

26from typing import Iterable 

27from typing import List 

28from typing import Mapping 

29from typing import Optional 

30from typing import Sequence 

31from typing import Set 

32from typing import Tuple 

33from typing import Type 

34from typing import TYPE_CHECKING 

35 

36py314b1 = sys.version_info >= (3, 14, 0, "beta", 1) 

37py314 = sys.version_info >= (3, 14) 

38py313 = sys.version_info >= (3, 13) 

39py312 = sys.version_info >= (3, 12) 

40py311 = sys.version_info >= (3, 11) 

41pypy = platform.python_implementation() == "PyPy" 

42cpython = platform.python_implementation() == "CPython" 

43freethreading = bool(sysconfig.get_config_var("Py_GIL_DISABLED")) 

44 

45win32 = sys.platform.startswith("win") 

46osx = sys.platform.startswith("darwin") 

47arm = "aarch" in platform.machine().lower() 

48is64bit = sys.maxsize > 2**32 

49 

50has_refcount_gc = bool(cpython) 

51 

52dottedgetter = operator.attrgetter 

53 

54if py314 or TYPE_CHECKING: 

55 from string.templatelib import Template as Template 

56else: 

57 

58 class Template: # type: ignore[no-redef] 

59 """Minimal Template for Python < 3.14 (test usage only).""" 

60 

61 def __init__(self, *parts: Any): 

62 self._parts = parts 

63 

64 @property 

65 def strings(self) -> Tuple[str, ...]: 

66 return tuple(p for p in self._parts if isinstance(p, str)) 

67 

68 @property 

69 def interpolations(self) -> Tuple[Any, ...]: 

70 return tuple(p for p in self._parts if not isinstance(p, str)) 

71 

72 def __iter__(self) -> Any: 

73 return iter(self._parts) 

74 

75 

76class FullArgSpec(typing.NamedTuple): 

77 args: List[str] 

78 varargs: Optional[str] 

79 varkw: Optional[str] 

80 defaults: Optional[Tuple[Any, ...]] 

81 kwonlyargs: List[str] 

82 kwonlydefaults: Optional[Dict[str, Any]] 

83 annotations: Dict[str, Any] 

84 

85 

86def inspect_getfullargspec(func: Callable[..., Any]) -> FullArgSpec: 

87 """Fully vendored version of getfullargspec from Python 3.3.""" 

88 

89 if inspect.ismethod(func): 

90 func = func.__func__ 

91 if not inspect.isfunction(func): 

92 raise TypeError(f"{func!r} is not a Python function") 

93 

94 co = func.__code__ 

95 if not inspect.iscode(co): 

96 raise TypeError(f"{co!r} is not a code object") 

97 

98 nargs = co.co_argcount 

99 names = co.co_varnames 

100 nkwargs = co.co_kwonlyargcount 

101 args = list(names[:nargs]) 

102 kwonlyargs = list(names[nargs : nargs + nkwargs]) 

103 

104 nargs += nkwargs 

105 varargs = None 

106 if co.co_flags & inspect.CO_VARARGS: 

107 varargs = co.co_varnames[nargs] 

108 nargs = nargs + 1 

109 varkw = None 

110 if co.co_flags & inspect.CO_VARKEYWORDS: 

111 varkw = co.co_varnames[nargs] 

112 

113 return FullArgSpec( 

114 args, 

115 varargs, 

116 varkw, 

117 func.__defaults__, 

118 kwonlyargs, 

119 func.__kwdefaults__, 

120 func.__annotations__, 

121 ) 

122 

123 

124# python stubs don't have a public type for this. not worth 

125# making a protocol 

126def md5_not_for_security() -> Any: 

127 return hashlib.md5(usedforsecurity=False) 

128 

129 

130def importlib_metadata_get(group): 

131 ep = importlib_metadata.entry_points() 

132 if typing.TYPE_CHECKING or hasattr(ep, "select"): 

133 return ep.select(group=group) 

134 else: 

135 return ep.get(group, ()) 

136 

137 

138def b(s): 

139 return s.encode("latin-1") 

140 

141 

142def b64decode(x: str) -> bytes: 

143 return base64.b64decode(x.encode("ascii")) 

144 

145 

146def b64encode(x: bytes) -> str: 

147 return base64.b64encode(x).decode("ascii") 

148 

149 

150def decode_backslashreplace(text: bytes, encoding: str) -> str: 

151 return text.decode(encoding, errors="backslashreplace") 

152 

153 

154def cmp(a, b): 

155 return (a > b) - (a < b) 

156 

157 

158def _formatannotation(annotation, base_module=None): 

159 """vendored from python 3.7""" 

160 

161 if isinstance(annotation, str): 

162 return annotation 

163 

164 if getattr(annotation, "__module__", None) == "typing": 

165 return repr(annotation).replace("typing.", "").replace("~", "") 

166 if isinstance(annotation, type): 

167 if annotation.__module__ in ("builtins", base_module): 

168 return repr(annotation.__qualname__) 

169 return annotation.__module__ + "." + annotation.__qualname__ 

170 elif isinstance(annotation, typing.TypeVar): 

171 return repr(annotation).replace("~", "") 

172 return repr(annotation).replace("~", "") 

173 

174 

175def inspect_formatargspec( 

176 args: List[str], 

177 varargs: Optional[str] = None, 

178 varkw: Optional[str] = None, 

179 defaults: Optional[Sequence[Any]] = None, 

180 kwonlyargs: Optional[Sequence[str]] = (), 

181 kwonlydefaults: Optional[Mapping[str, Any]] = {}, 

182 annotations: Mapping[str, Any] = {}, 

183 formatarg: Callable[[str], str] = str, 

184 formatvarargs: Callable[[str], str] = lambda name: "*" + name, 

185 formatvarkw: Callable[[str], str] = lambda name: "**" + name, 

186 formatvalue: Callable[[Any], str] = lambda value: "=" + repr(value), 

187 formatreturns: Callable[[Any], str] = lambda text: " -> " + str(text), 

188 formatannotation: Callable[[Any], str] = _formatannotation, 

189) -> str: 

190 """Copy formatargspec from python 3.7 standard library. 

191 

192 Python 3 has deprecated formatargspec and requested that Signature 

193 be used instead, however this requires a full reimplementation 

194 of formatargspec() in terms of creating Parameter objects and such. 

195 Instead of introducing all the object-creation overhead and having 

196 to reinvent from scratch, just copy their compatibility routine. 

197 

198 Ultimately we would need to rewrite our "decorator" routine completely 

199 which is not really worth it right now, until all Python 2.x support 

200 is dropped. 

201 

202 """ 

203 

204 kwonlydefaults = kwonlydefaults or {} 

205 annotations = annotations or {} 

206 

207 def formatargandannotation(arg): 

208 result = formatarg(arg) 

209 if arg in annotations: 

210 result += ": " + formatannotation(annotations[arg]) 

211 return result 

212 

213 specs = [] 

214 if defaults: 

215 firstdefault = len(args) - len(defaults) 

216 else: 

217 firstdefault = -1 

218 

219 for i, arg in enumerate(args): 

220 spec = formatargandannotation(arg) 

221 if defaults and i >= firstdefault: 

222 spec = spec + formatvalue(defaults[i - firstdefault]) 

223 specs.append(spec) 

224 

225 if varargs is not None: 

226 specs.append(formatvarargs(formatargandannotation(varargs))) 

227 else: 

228 if kwonlyargs: 

229 specs.append("*") 

230 

231 if kwonlyargs: 

232 for kwonlyarg in kwonlyargs: 

233 spec = formatargandannotation(kwonlyarg) 

234 if kwonlydefaults and kwonlyarg in kwonlydefaults: 

235 spec += formatvalue(kwonlydefaults[kwonlyarg]) 

236 specs.append(spec) 

237 

238 if varkw is not None: 

239 specs.append(formatvarkw(formatargandannotation(varkw))) 

240 

241 result = "(" + ", ".join(specs) + ")" 

242 if "return" in annotations: 

243 result += formatreturns(formatannotation(annotations["return"])) 

244 return result 

245 

246 

247def dataclass_fields(cls: Type[Any]) -> Iterable[dataclasses.Field[Any]]: 

248 """Return a sequence of all dataclasses.Field objects associated 

249 with a class as an already processed dataclass. 

250 

251 The class must **already be a dataclass** for Field objects to be returned. 

252 

253 """ 

254 

255 if dataclasses.is_dataclass(cls): 

256 return dataclasses.fields(cls) 

257 else: 

258 return [] 

259 

260 

261def local_dataclass_fields(cls: Type[Any]) -> Iterable[dataclasses.Field[Any]]: 

262 """Return a sequence of all dataclasses.Field objects associated with 

263 an already processed dataclass, excluding those that originate from a 

264 superclass. 

265 

266 The class must **already be a dataclass** for Field objects to be returned. 

267 

268 """ 

269 

270 if dataclasses.is_dataclass(cls): 

271 super_fields: Set[dataclasses.Field[Any]] = set() 

272 for sup in cls.__bases__: 

273 super_fields.update(dataclass_fields(sup)) 

274 return [f for f in dataclasses.fields(cls) if f not in super_fields] 

275 else: 

276 return [] 

277 

278 

279if freethreading: 

280 import threading 

281 

282 mini_gil = threading.RLock() 

283 """provide a threading.RLock() under python freethreading only""" 

284else: 

285 import contextlib 

286 

287 mini_gil = contextlib.nullcontext() # type: ignore[assignment]