Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/util/compat.py: 69%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

154 statements  

1# util/compat.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9"""Handle Python version/platform incompatibilities.""" 

10 

11from __future__ import annotations 

12 

13import base64 

14import dataclasses 

15import hashlib 

16from importlib import metadata as importlib_metadata 

17import inspect 

18import operator 

19import platform 

20import sys 

21import typing 

22from typing import Any 

23from typing import Callable 

24from typing import Dict 

25from typing import Iterable 

26from typing import List 

27from typing import Mapping 

28from typing import Optional 

29from typing import Sequence 

30from typing import Set 

31from typing import Tuple 

32from typing import Type 

33 

34py314b1 = sys.version_info >= (3, 14, 0, "beta", 1) 

35py314 = sys.version_info >= (3, 14) 

36py313 = sys.version_info >= (3, 13) 

37py312 = sys.version_info >= (3, 12) 

38py311 = sys.version_info >= (3, 11) 

39py310 = sys.version_info >= (3, 10) 

40pypy = platform.python_implementation() == "PyPy" 

41cpython = platform.python_implementation() == "CPython" 

42 

43win32 = sys.platform.startswith("win") 

44osx = sys.platform.startswith("darwin") 

45arm = "aarch" in platform.machine().lower() 

46is64bit = sys.maxsize > 2**32 

47 

48has_refcount_gc = bool(cpython) 

49 

50dottedgetter = operator.attrgetter 

51 

52 

53class FullArgSpec(typing.NamedTuple): 

54 args: List[str] 

55 varargs: Optional[str] 

56 varkw: Optional[str] 

57 defaults: Optional[Tuple[Any, ...]] 

58 kwonlyargs: List[str] 

59 kwonlydefaults: Optional[Dict[str, Any]] 

60 annotations: Dict[str, Any] 

61 

62 

63def inspect_getfullargspec(func: Callable[..., Any]) -> FullArgSpec: 

64 """Fully vendored version of getfullargspec from Python 3.3.""" 

65 

66 if inspect.ismethod(func): 

67 func = func.__func__ 

68 if not inspect.isfunction(func): 

69 raise TypeError(f"{func!r} is not a Python function") 

70 

71 co = func.__code__ 

72 if not inspect.iscode(co): 

73 raise TypeError(f"{co!r} is not a code object") 

74 

75 nargs = co.co_argcount 

76 names = co.co_varnames 

77 nkwargs = co.co_kwonlyargcount 

78 args = list(names[:nargs]) 

79 kwonlyargs = list(names[nargs : nargs + nkwargs]) 

80 

81 nargs += nkwargs 

82 varargs = None 

83 if co.co_flags & inspect.CO_VARARGS: 

84 varargs = co.co_varnames[nargs] 

85 nargs = nargs + 1 

86 varkw = None 

87 if co.co_flags & inspect.CO_VARKEYWORDS: 

88 varkw = co.co_varnames[nargs] 

89 

90 return FullArgSpec( 

91 args, 

92 varargs, 

93 varkw, 

94 func.__defaults__, 

95 kwonlyargs, 

96 func.__kwdefaults__, 

97 func.__annotations__, 

98 ) 

99 

100 

101# python stubs don't have a public type for this. not worth 

102# making a protocol 

103def md5_not_for_security() -> Any: 

104 return hashlib.md5(usedforsecurity=False) 

105 

106 

107if py310: 

108 anext_ = anext 

109else: 

110 _NOT_PROVIDED = object() 

111 from collections.abc import AsyncIterator 

112 

113 async def anext_(async_iterator, default=_NOT_PROVIDED): 

114 """vendored from https://github.com/python/cpython/pull/8895""" 

115 

116 if not isinstance(async_iterator, AsyncIterator): 

117 raise TypeError( 

118 f"anext expected an AsyncIterator, got {type(async_iterator)}" 

119 ) 

120 anxt = type(async_iterator).__anext__ 

121 try: 

122 return await anxt(async_iterator) 

123 except StopAsyncIteration: 

124 if default is _NOT_PROVIDED: 

125 raise 

126 return default 

127 

128 

129def importlib_metadata_get(group): 

130 ep = importlib_metadata.entry_points() 

131 if typing.TYPE_CHECKING or hasattr(ep, "select"): 

132 return ep.select(group=group) 

133 else: 

134 return ep.get(group, ()) 

135 

136 

137def b(s): 

138 return s.encode("latin-1") 

139 

140 

141def b64decode(x: str) -> bytes: 

142 return base64.b64decode(x.encode("ascii")) 

143 

144 

145def b64encode(x: bytes) -> str: 

146 return base64.b64encode(x).decode("ascii") 

147 

148 

149def decode_backslashreplace(text: bytes, encoding: str) -> str: 

150 return text.decode(encoding, errors="backslashreplace") 

151 

152 

153def cmp(a, b): 

154 return (a > b) - (a < b) 

155 

156 

157def _formatannotation(annotation, base_module=None): 

158 """vendored from python 3.7""" 

159 

160 if isinstance(annotation, str): 

161 return annotation 

162 

163 if getattr(annotation, "__module__", None) == "typing": 

164 return repr(annotation).replace("typing.", "").replace("~", "") 

165 if isinstance(annotation, type): 

166 if annotation.__module__ in ("builtins", base_module): 

167 return repr(annotation.__qualname__) 

168 return annotation.__module__ + "." + annotation.__qualname__ 

169 elif isinstance(annotation, typing.TypeVar): 

170 return repr(annotation).replace("~", "") 

171 return repr(annotation).replace("~", "") 

172 

173 

174def inspect_formatargspec( 

175 args: List[str], 

176 varargs: Optional[str] = None, 

177 varkw: Optional[str] = None, 

178 defaults: Optional[Sequence[Any]] = None, 

179 kwonlyargs: Optional[Sequence[str]] = (), 

180 kwonlydefaults: Optional[Mapping[str, Any]] = {}, 

181 annotations: Mapping[str, Any] = {}, 

182 formatarg: Callable[[str], str] = str, 

183 formatvarargs: Callable[[str], str] = lambda name: "*" + name, 

184 formatvarkw: Callable[[str], str] = lambda name: "**" + name, 

185 formatvalue: Callable[[Any], str] = lambda value: "=" + repr(value), 

186 formatreturns: Callable[[Any], str] = lambda text: " -> " + str(text), 

187 formatannotation: Callable[[Any], str] = _formatannotation, 

188) -> str: 

189 """Copy formatargspec from python 3.7 standard library. 

190 

191 Python 3 has deprecated formatargspec and requested that Signature 

192 be used instead, however this requires a full reimplementation 

193 of formatargspec() in terms of creating Parameter objects and such. 

194 Instead of introducing all the object-creation overhead and having 

195 to reinvent from scratch, just copy their compatibility routine. 

196 

197 Ultimately we would need to rewrite our "decorator" routine completely 

198 which is not really worth it right now, until all Python 2.x support 

199 is dropped. 

200 

201 """ 

202 

203 kwonlydefaults = kwonlydefaults or {} 

204 annotations = annotations or {} 

205 

206 def formatargandannotation(arg): 

207 result = formatarg(arg) 

208 if arg in annotations: 

209 result += ": " + formatannotation(annotations[arg]) 

210 return result 

211 

212 specs = [] 

213 if defaults: 

214 firstdefault = len(args) - len(defaults) 

215 else: 

216 firstdefault = -1 

217 

218 for i, arg in enumerate(args): 

219 spec = formatargandannotation(arg) 

220 if defaults and i >= firstdefault: 

221 spec = spec + formatvalue(defaults[i - firstdefault]) 

222 specs.append(spec) 

223 

224 if varargs is not None: 

225 specs.append(formatvarargs(formatargandannotation(varargs))) 

226 else: 

227 if kwonlyargs: 

228 specs.append("*") 

229 

230 if kwonlyargs: 

231 for kwonlyarg in kwonlyargs: 

232 spec = formatargandannotation(kwonlyarg) 

233 if kwonlydefaults and kwonlyarg in kwonlydefaults: 

234 spec += formatvalue(kwonlydefaults[kwonlyarg]) 

235 specs.append(spec) 

236 

237 if varkw is not None: 

238 specs.append(formatvarkw(formatargandannotation(varkw))) 

239 

240 result = "(" + ", ".join(specs) + ")" 

241 if "return" in annotations: 

242 result += formatreturns(formatannotation(annotations["return"])) 

243 return result 

244 

245 

246def dataclass_fields(cls: Type[Any]) -> Iterable[dataclasses.Field[Any]]: 

247 """Return a sequence of all dataclasses.Field objects associated 

248 with a class as an already processed dataclass. 

249 

250 The class must **already be a dataclass** for Field objects to be returned. 

251 

252 """ 

253 

254 if dataclasses.is_dataclass(cls): 

255 return dataclasses.fields(cls) 

256 else: 

257 return [] 

258 

259 

260def local_dataclass_fields(cls: Type[Any]) -> Iterable[dataclasses.Field[Any]]: 

261 """Return a sequence of all dataclasses.Field objects associated with 

262 an already processed dataclass, excluding those that originate from a 

263 superclass. 

264 

265 The class must **already be a dataclass** for Field objects to be returned. 

266 

267 """ 

268 

269 if dataclasses.is_dataclass(cls): 

270 super_fields: Set[dataclasses.Field[Any]] = set() 

271 for sup in cls.__bases__: 

272 super_fields.update(dataclass_fields(sup)) 

273 return [f for f in dataclasses.fields(cls) if f not in super_fields] 

274 else: 

275 return []