Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/util/compat.py: 74%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

147 statements  

1# util/compat.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9"""Handle Python version/platform incompatibilities.""" 

10 

11from __future__ import annotations 

12 

13import base64 

14import dataclasses 

15import hashlib 

16from importlib import metadata as importlib_metadata 

17import inspect 

18import operator 

19import platform 

20import sys 

21import sysconfig 

22import typing 

23from typing import Any 

24from typing import Callable 

25from typing import Dict 

26from typing import Iterable 

27from typing import List 

28from typing import Mapping 

29from typing import Optional 

30from typing import Sequence 

31from typing import Set 

32from typing import Tuple 

33from typing import Type 

34 

35py314b1 = sys.version_info >= (3, 14, 0, "beta", 1) 

36py314 = sys.version_info >= (3, 14) 

37py313 = sys.version_info >= (3, 13) 

38py312 = sys.version_info >= (3, 12) 

39py311 = sys.version_info >= (3, 11) 

40pypy = platform.python_implementation() == "PyPy" 

41cpython = platform.python_implementation() == "CPython" 

42freethreading = bool(sysconfig.get_config_var("Py_GIL_DISABLED")) 

43 

44win32 = sys.platform.startswith("win") 

45osx = sys.platform.startswith("darwin") 

46arm = "aarch" in platform.machine().lower() 

47is64bit = sys.maxsize > 2**32 

48 

49has_refcount_gc = bool(cpython) 

50 

51dottedgetter = operator.attrgetter 

52 

53 

54class FullArgSpec(typing.NamedTuple): 

55 args: List[str] 

56 varargs: Optional[str] 

57 varkw: Optional[str] 

58 defaults: Optional[Tuple[Any, ...]] 

59 kwonlyargs: List[str] 

60 kwonlydefaults: Optional[Dict[str, Any]] 

61 annotations: Dict[str, Any] 

62 

63 

64def inspect_getfullargspec(func: Callable[..., Any]) -> FullArgSpec: 

65 """Fully vendored version of getfullargspec from Python 3.3.""" 

66 

67 if inspect.ismethod(func): 

68 func = func.__func__ 

69 if not inspect.isfunction(func): 

70 raise TypeError(f"{func!r} is not a Python function") 

71 

72 co = func.__code__ 

73 if not inspect.iscode(co): 

74 raise TypeError(f"{co!r} is not a code object") 

75 

76 nargs = co.co_argcount 

77 names = co.co_varnames 

78 nkwargs = co.co_kwonlyargcount 

79 args = list(names[:nargs]) 

80 kwonlyargs = list(names[nargs : nargs + nkwargs]) 

81 

82 nargs += nkwargs 

83 varargs = None 

84 if co.co_flags & inspect.CO_VARARGS: 

85 varargs = co.co_varnames[nargs] 

86 nargs = nargs + 1 

87 varkw = None 

88 if co.co_flags & inspect.CO_VARKEYWORDS: 

89 varkw = co.co_varnames[nargs] 

90 

91 return FullArgSpec( 

92 args, 

93 varargs, 

94 varkw, 

95 func.__defaults__, 

96 kwonlyargs, 

97 func.__kwdefaults__, 

98 func.__annotations__, 

99 ) 

100 

101 

102# python stubs don't have a public type for this. not worth 

103# making a protocol 

104def md5_not_for_security() -> Any: 

105 return hashlib.md5(usedforsecurity=False) 

106 

107 

108def importlib_metadata_get(group): 

109 ep = importlib_metadata.entry_points() 

110 if typing.TYPE_CHECKING or hasattr(ep, "select"): 

111 return ep.select(group=group) 

112 else: 

113 return ep.get(group, ()) 

114 

115 

116def b(s): 

117 return s.encode("latin-1") 

118 

119 

120def b64decode(x: str) -> bytes: 

121 return base64.b64decode(x.encode("ascii")) 

122 

123 

124def b64encode(x: bytes) -> str: 

125 return base64.b64encode(x).decode("ascii") 

126 

127 

128def decode_backslashreplace(text: bytes, encoding: str) -> str: 

129 return text.decode(encoding, errors="backslashreplace") 

130 

131 

132def cmp(a, b): 

133 return (a > b) - (a < b) 

134 

135 

136def _formatannotation(annotation, base_module=None): 

137 """vendored from python 3.7""" 

138 

139 if isinstance(annotation, str): 

140 return annotation 

141 

142 if getattr(annotation, "__module__", None) == "typing": 

143 return repr(annotation).replace("typing.", "").replace("~", "") 

144 if isinstance(annotation, type): 

145 if annotation.__module__ in ("builtins", base_module): 

146 return repr(annotation.__qualname__) 

147 return annotation.__module__ + "." + annotation.__qualname__ 

148 elif isinstance(annotation, typing.TypeVar): 

149 return repr(annotation).replace("~", "") 

150 return repr(annotation).replace("~", "") 

151 

152 

153def inspect_formatargspec( 

154 args: List[str], 

155 varargs: Optional[str] = None, 

156 varkw: Optional[str] = None, 

157 defaults: Optional[Sequence[Any]] = None, 

158 kwonlyargs: Optional[Sequence[str]] = (), 

159 kwonlydefaults: Optional[Mapping[str, Any]] = {}, 

160 annotations: Mapping[str, Any] = {}, 

161 formatarg: Callable[[str], str] = str, 

162 formatvarargs: Callable[[str], str] = lambda name: "*" + name, 

163 formatvarkw: Callable[[str], str] = lambda name: "**" + name, 

164 formatvalue: Callable[[Any], str] = lambda value: "=" + repr(value), 

165 formatreturns: Callable[[Any], str] = lambda text: " -> " + str(text), 

166 formatannotation: Callable[[Any], str] = _formatannotation, 

167) -> str: 

168 """Copy formatargspec from python 3.7 standard library. 

169 

170 Python 3 has deprecated formatargspec and requested that Signature 

171 be used instead, however this requires a full reimplementation 

172 of formatargspec() in terms of creating Parameter objects and such. 

173 Instead of introducing all the object-creation overhead and having 

174 to reinvent from scratch, just copy their compatibility routine. 

175 

176 Ultimately we would need to rewrite our "decorator" routine completely 

177 which is not really worth it right now, until all Python 2.x support 

178 is dropped. 

179 

180 """ 

181 

182 kwonlydefaults = kwonlydefaults or {} 

183 annotations = annotations or {} 

184 

185 def formatargandannotation(arg): 

186 result = formatarg(arg) 

187 if arg in annotations: 

188 result += ": " + formatannotation(annotations[arg]) 

189 return result 

190 

191 specs = [] 

192 if defaults: 

193 firstdefault = len(args) - len(defaults) 

194 else: 

195 firstdefault = -1 

196 

197 for i, arg in enumerate(args): 

198 spec = formatargandannotation(arg) 

199 if defaults and i >= firstdefault: 

200 spec = spec + formatvalue(defaults[i - firstdefault]) 

201 specs.append(spec) 

202 

203 if varargs is not None: 

204 specs.append(formatvarargs(formatargandannotation(varargs))) 

205 else: 

206 if kwonlyargs: 

207 specs.append("*") 

208 

209 if kwonlyargs: 

210 for kwonlyarg in kwonlyargs: 

211 spec = formatargandannotation(kwonlyarg) 

212 if kwonlydefaults and kwonlyarg in kwonlydefaults: 

213 spec += formatvalue(kwonlydefaults[kwonlyarg]) 

214 specs.append(spec) 

215 

216 if varkw is not None: 

217 specs.append(formatvarkw(formatargandannotation(varkw))) 

218 

219 result = "(" + ", ".join(specs) + ")" 

220 if "return" in annotations: 

221 result += formatreturns(formatannotation(annotations["return"])) 

222 return result 

223 

224 

225def dataclass_fields(cls: Type[Any]) -> Iterable[dataclasses.Field[Any]]: 

226 """Return a sequence of all dataclasses.Field objects associated 

227 with a class as an already processed dataclass. 

228 

229 The class must **already be a dataclass** for Field objects to be returned. 

230 

231 """ 

232 

233 if dataclasses.is_dataclass(cls): 

234 return dataclasses.fields(cls) 

235 else: 

236 return [] 

237 

238 

239def local_dataclass_fields(cls: Type[Any]) -> Iterable[dataclasses.Field[Any]]: 

240 """Return a sequence of all dataclasses.Field objects associated with 

241 an already processed dataclass, excluding those that originate from a 

242 superclass. 

243 

244 The class must **already be a dataclass** for Field objects to be returned. 

245 

246 """ 

247 

248 if dataclasses.is_dataclass(cls): 

249 super_fields: Set[dataclasses.Field[Any]] = set() 

250 for sup in cls.__bases__: 

251 super_fields.update(dataclass_fields(sup)) 

252 return [f for f in dataclasses.fields(cls) if f not in super_fields] 

253 else: 

254 return [] 

255 

256 

257if freethreading: 

258 import threading 

259 

260 mini_gil = threading.RLock() 

261 """provide a threading.RLock() under python freethreading only""" 

262else: 

263 import contextlib 

264 

265 mini_gil = contextlib.nullcontext() # type: ignore[assignment]