Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/annotation.py: 47%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

172 statements  

1# sql/annotation.py 

2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8"""The :class:`.Annotated` class and related routines; creates hash-equivalent 

9copies of SQL constructs which contain context-specific markers and 

10associations. 

11 

12""" 

13 

14from . import operators 

15from .base import HasCacheKey 

16from .traversals import anon_map 

17from .visitors import InternalTraversal 

18from .. import util 

19 

20EMPTY_ANNOTATIONS = util.immutabledict() 

21 

22 

23class SupportsAnnotations(object): 

24 _annotations = EMPTY_ANNOTATIONS 

25 

26 @util.memoized_property 

27 def _annotations_cache_key(self): 

28 anon_map_ = anon_map() 

29 

30 return self._gen_annotations_cache_key(anon_map_) 

31 

32 def _gen_annotations_cache_key(self, anon_map): 

33 return ( 

34 "_annotations", 

35 tuple( 

36 ( 

37 key, 

38 value._gen_cache_key(anon_map, []) 

39 if isinstance(value, HasCacheKey) 

40 else value, 

41 ) 

42 for key, value in [ 

43 (key, self._annotations[key]) 

44 for key in sorted(self._annotations) 

45 ] 

46 ), 

47 ) 

48 

49 

50class SupportsCloneAnnotations(SupportsAnnotations): 

51 

52 _clone_annotations_traverse_internals = [ 

53 ("_annotations", InternalTraversal.dp_annotations_key) 

54 ] 

55 

56 def _annotate(self, values): 

57 """return a copy of this ClauseElement with annotations 

58 updated by the given dictionary. 

59 

60 """ 

61 new = self._clone() 

62 new._annotations = new._annotations.union(values) 

63 new.__dict__.pop("_annotations_cache_key", None) 

64 new.__dict__.pop("_generate_cache_key", None) 

65 return new 

66 

67 def _with_annotations(self, values): 

68 """return a copy of this ClauseElement with annotations 

69 replaced by the given dictionary. 

70 

71 """ 

72 new = self._clone() 

73 new._annotations = util.immutabledict(values) 

74 new.__dict__.pop("_annotations_cache_key", None) 

75 new.__dict__.pop("_generate_cache_key", None) 

76 return new 

77 

78 def _deannotate(self, values=None, clone=False): 

79 """return a copy of this :class:`_expression.ClauseElement` 

80 with annotations 

81 removed. 

82 

83 :param values: optional tuple of individual values 

84 to remove. 

85 

86 """ 

87 if clone or self._annotations: 

88 # clone is used when we are also copying 

89 # the expression for a deep deannotation 

90 new = self._clone() 

91 new._annotations = util.immutabledict() 

92 new.__dict__.pop("_annotations_cache_key", None) 

93 return new 

94 else: 

95 return self 

96 

97 

98class SupportsWrappingAnnotations(SupportsAnnotations): 

99 def _annotate(self, values): 

100 """return a copy of this ClauseElement with annotations 

101 updated by the given dictionary. 

102 

103 """ 

104 return Annotated(self, values) 

105 

106 def _with_annotations(self, values): 

107 """return a copy of this ClauseElement with annotations 

108 replaced by the given dictionary. 

109 

110 """ 

111 return Annotated(self, values) 

112 

113 def _deannotate(self, values=None, clone=False): 

114 """return a copy of this :class:`_expression.ClauseElement` 

115 with annotations 

116 removed. 

117 

118 :param values: optional tuple of individual values 

119 to remove. 

120 

121 """ 

122 if clone: 

123 s = self._clone() 

124 return s 

125 else: 

126 return self 

127 

128 

129class Annotated(object): 

130 """clones a SupportsAnnotated and applies an 'annotations' dictionary. 

131 

132 Unlike regular clones, this clone also mimics __hash__() and 

133 __cmp__() of the original element so that it takes its place 

134 in hashed collections. 

135 

136 A reference to the original element is maintained, for the important 

137 reason of keeping its hash value current. When GC'ed, the 

138 hash value may be reused, causing conflicts. 

139 

140 .. note:: The rationale for Annotated producing a brand new class, 

141 rather than placing the functionality directly within ClauseElement, 

142 is **performance**. The __hash__() method is absent on plain 

143 ClauseElement which leads to significantly reduced function call 

144 overhead, as the use of sets and dictionaries against ClauseElement 

145 objects is prevalent, but most are not "annotated". 

146 

147 """ 

148 

149 _is_column_operators = False 

150 

151 def __new__(cls, *args): 

152 if not args: 

153 # clone constructor 

154 return object.__new__(cls) 

155 else: 

156 element, values = args 

157 # pull appropriate subclass from registry of annotated 

158 # classes 

159 try: 

160 cls = annotated_classes[element.__class__] 

161 except KeyError: 

162 cls = _new_annotation_type(element.__class__, cls) 

163 return object.__new__(cls) 

164 

165 def __init__(self, element, values): 

166 self.__dict__ = element.__dict__.copy() 

167 self.__dict__.pop("_annotations_cache_key", None) 

168 self.__dict__.pop("_generate_cache_key", None) 

169 self.__element = element 

170 self._annotations = util.immutabledict(values) 

171 self._hash = hash(element) 

172 

173 def _annotate(self, values): 

174 _values = self._annotations.union(values) 

175 return self._with_annotations(_values) 

176 

177 def _with_annotations(self, values): 

178 clone = self.__class__.__new__(self.__class__) 

179 clone.__dict__ = self.__dict__.copy() 

180 clone.__dict__.pop("_annotations_cache_key", None) 

181 clone.__dict__.pop("_generate_cache_key", None) 

182 clone._annotations = values 

183 return clone 

184 

185 def _deannotate(self, values=None, clone=True): 

186 if values is None: 

187 return self.__element 

188 else: 

189 return self._with_annotations( 

190 util.immutabledict( 

191 { 

192 key: value 

193 for key, value in self._annotations.items() 

194 if key not in values 

195 } 

196 ) 

197 ) 

198 

199 def _compiler_dispatch(self, visitor, **kw): 

200 return self.__element.__class__._compiler_dispatch(self, visitor, **kw) 

201 

202 @property 

203 def _constructor(self): 

204 return self.__element._constructor 

205 

206 def _clone(self, **kw): 

207 clone = self.__element._clone(**kw) 

208 if clone is self.__element: 

209 # detect immutable, don't change anything 

210 return self 

211 else: 

212 # update the clone with any changes that have occurred 

213 # to this object's __dict__. 

214 clone.__dict__.update(self.__dict__) 

215 return self.__class__(clone, self._annotations) 

216 

217 def __reduce__(self): 

218 return self.__class__, (self.__element, self._annotations) 

219 

220 def __hash__(self): 

221 return self._hash 

222 

223 def __eq__(self, other): 

224 if self._is_column_operators: 

225 return self.__element.__class__.__eq__(self, other) 

226 else: 

227 return hash(other) == hash(self) 

228 

229 @property 

230 def entity_namespace(self): 

231 if "entity_namespace" in self._annotations: 

232 return self._annotations["entity_namespace"].entity_namespace 

233 else: 

234 return self.__element.entity_namespace 

235 

236 

237# hard-generate Annotated subclasses. this technique 

238# is used instead of on-the-fly types (i.e. type.__new__()) 

239# so that the resulting objects are pickleable; additionally, other 

240# decisions can be made up front about the type of object being annotated 

241# just once per class rather than per-instance. 

242annotated_classes = {} 

243 

244 

245def _safe_annotate(to_annotate, annotations): 

246 try: 

247 _annotate = to_annotate._annotate 

248 except AttributeError: 

249 # skip objects that don't actually have an `_annotate` 

250 # attribute, namely QueryableAttribute inside of a join 

251 # condition 

252 return to_annotate 

253 else: 

254 return _annotate(annotations) 

255 

256 

257def _deep_annotate( 

258 element, annotations, exclude=None, detect_subquery_cols=False 

259): 

260 """Deep copy the given ClauseElement, annotating each element 

261 with the given annotations dictionary. 

262 

263 Elements within the exclude collection will be cloned but not annotated. 

264 

265 """ 

266 

267 # annotated objects hack the __hash__() method so if we want to 

268 # uniquely process them we have to use id() 

269 

270 cloned_ids = {} 

271 

272 def clone(elem, **kw): 

273 kw["detect_subquery_cols"] = detect_subquery_cols 

274 id_ = id(elem) 

275 

276 if id_ in cloned_ids: 

277 return cloned_ids[id_] 

278 

279 if ( 

280 exclude 

281 and hasattr(elem, "proxy_set") 

282 and elem.proxy_set.intersection(exclude) 

283 ): 

284 newelem = elem._clone(clone=clone, **kw) 

285 elif annotations != elem._annotations: 

286 if detect_subquery_cols and elem._is_immutable: 

287 newelem = _safe_annotate( 

288 elem._clone(clone=clone, **kw), annotations 

289 ) 

290 else: 

291 newelem = _safe_annotate(elem, annotations) 

292 else: 

293 newelem = elem 

294 newelem._copy_internals(clone=clone) 

295 cloned_ids[id_] = newelem 

296 return newelem 

297 

298 if element is not None: 

299 element = clone(element) 

300 clone = None # remove gc cycles 

301 return element 

302 

303 

304def _deep_deannotate(element, values=None): 

305 """Deep copy the given element, removing annotations.""" 

306 

307 cloned = {} 

308 

309 def clone(elem, **kw): 

310 if values: 

311 key = id(elem) 

312 else: 

313 key = elem 

314 

315 if key not in cloned: 

316 newelem = elem._deannotate(values=values, clone=True) 

317 newelem._copy_internals(clone=clone) 

318 cloned[key] = newelem 

319 return newelem 

320 else: 

321 return cloned[key] 

322 

323 if element is not None: 

324 element = clone(element) 

325 clone = None # remove gc cycles 

326 return element 

327 

328 

329def _shallow_annotate(element, annotations): 

330 """Annotate the given ClauseElement and copy its internals so that 

331 internal objects refer to the new annotated object. 

332 

333 Basically used to apply a "don't traverse" annotation to a 

334 selectable, without digging throughout the whole 

335 structure wasting time. 

336 """ 

337 element = element._annotate(annotations) 

338 element._copy_internals() 

339 return element 

340 

341 

342def _new_annotation_type(cls, base_cls): 

343 if issubclass(cls, Annotated): 

344 return cls 

345 elif cls in annotated_classes: 

346 return annotated_classes[cls] 

347 

348 for super_ in cls.__mro__: 

349 # check if an Annotated subclass more specific than 

350 # the given base_cls is already registered, such 

351 # as AnnotatedColumnElement. 

352 if super_ in annotated_classes: 

353 base_cls = annotated_classes[super_] 

354 break 

355 

356 annotated_classes[cls] = anno_cls = type( 

357 "Annotated%s" % cls.__name__, (base_cls, cls), {} 

358 ) 

359 globals()["Annotated%s" % cls.__name__] = anno_cls 

360 

361 if "_traverse_internals" in cls.__dict__: 

362 anno_cls._traverse_internals = list(cls._traverse_internals) + [ 

363 ("_annotations", InternalTraversal.dp_annotations_key) 

364 ] 

365 elif cls.__dict__.get("inherit_cache", False): 

366 anno_cls._traverse_internals = list(cls._traverse_internals) + [ 

367 ("_annotations", InternalTraversal.dp_annotations_key) 

368 ] 

369 

370 # some classes include this even if they have traverse_internals 

371 # e.g. BindParameter, add it if present. 

372 if cls.__dict__.get("inherit_cache", False): 

373 anno_cls.inherit_cache = True 

374 

375 anno_cls._is_column_operators = issubclass(cls, operators.ColumnOperators) 

376 

377 return anno_cls 

378 

379 

380def _prepare_annotations(target_hierarchy, base_cls): 

381 for cls in util.walk_subclasses(target_hierarchy): 

382 _new_annotation_type(cls, base_cls)