Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/annotation.py: 48%

166 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1# sql/annotation.py 

2# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8"""The :class:`.Annotated` class and related routines; creates hash-equivalent 

9copies of SQL constructs which contain context-specific markers and 

10associations. 

11 

12""" 

13 

14from . import operators 

15from .base import HasCacheKey 

16from .traversals import anon_map 

17from .visitors import InternalTraversal 

18from .. import util 

19 

20EMPTY_ANNOTATIONS = util.immutabledict() 

21 

22 

23class SupportsAnnotations(object): 

24 _annotations = EMPTY_ANNOTATIONS 

25 

26 @util.memoized_property 

27 def _annotations_cache_key(self): 

28 anon_map_ = anon_map() 

29 

30 return self._gen_annotations_cache_key(anon_map_) 

31 

32 def _gen_annotations_cache_key(self, anon_map): 

33 return ( 

34 "_annotations", 

35 tuple( 

36 ( 

37 key, 

38 value._gen_cache_key(anon_map, []) 

39 if isinstance(value, HasCacheKey) 

40 else value, 

41 ) 

42 for key, value in [ 

43 (key, self._annotations[key]) 

44 for key in sorted(self._annotations) 

45 ] 

46 ), 

47 ) 

48 

49 

50class SupportsCloneAnnotations(SupportsAnnotations): 

51 

52 _clone_annotations_traverse_internals = [ 

53 ("_annotations", InternalTraversal.dp_annotations_key) 

54 ] 

55 

56 def _annotate(self, values): 

57 """return a copy of this ClauseElement with annotations 

58 updated by the given dictionary. 

59 

60 """ 

61 new = self._clone() 

62 new._annotations = new._annotations.union(values) 

63 new.__dict__.pop("_annotations_cache_key", None) 

64 new.__dict__.pop("_generate_cache_key", None) 

65 return new 

66 

67 def _with_annotations(self, values): 

68 """return a copy of this ClauseElement with annotations 

69 replaced by the given dictionary. 

70 

71 """ 

72 new = self._clone() 

73 new._annotations = util.immutabledict(values) 

74 new.__dict__.pop("_annotations_cache_key", None) 

75 new.__dict__.pop("_generate_cache_key", None) 

76 return new 

77 

78 def _deannotate(self, values=None, clone=False): 

79 """return a copy of this :class:`_expression.ClauseElement` 

80 with annotations 

81 removed. 

82 

83 :param values: optional tuple of individual values 

84 to remove. 

85 

86 """ 

87 if clone or self._annotations: 

88 # clone is used when we are also copying 

89 # the expression for a deep deannotation 

90 new = self._clone() 

91 new._annotations = util.immutabledict() 

92 new.__dict__.pop("_annotations_cache_key", None) 

93 return new 

94 else: 

95 return self 

96 

97 

98class SupportsWrappingAnnotations(SupportsAnnotations): 

99 def _annotate(self, values): 

100 """return a copy of this ClauseElement with annotations 

101 updated by the given dictionary. 

102 

103 """ 

104 return Annotated(self, values) 

105 

106 def _with_annotations(self, values): 

107 """return a copy of this ClauseElement with annotations 

108 replaced by the given dictionary. 

109 

110 """ 

111 return Annotated(self, values) 

112 

113 def _deannotate(self, values=None, clone=False): 

114 """return a copy of this :class:`_expression.ClauseElement` 

115 with annotations 

116 removed. 

117 

118 :param values: optional tuple of individual values 

119 to remove. 

120 

121 """ 

122 if clone: 

123 s = self._clone() 

124 return s 

125 else: 

126 return self 

127 

128 

129class Annotated(object): 

130 """clones a SupportsAnnotated and applies an 'annotations' dictionary. 

131 

132 Unlike regular clones, this clone also mimics __hash__() and 

133 __cmp__() of the original element so that it takes its place 

134 in hashed collections. 

135 

136 A reference to the original element is maintained, for the important 

137 reason of keeping its hash value current. When GC'ed, the 

138 hash value may be reused, causing conflicts. 

139 

140 .. note:: The rationale for Annotated producing a brand new class, 

141 rather than placing the functionality directly within ClauseElement, 

142 is **performance**. The __hash__() method is absent on plain 

143 ClauseElement which leads to significantly reduced function call 

144 overhead, as the use of sets and dictionaries against ClauseElement 

145 objects is prevalent, but most are not "annotated". 

146 

147 """ 

148 

149 _is_column_operators = False 

150 

151 def __new__(cls, *args): 

152 if not args: 

153 # clone constructor 

154 return object.__new__(cls) 

155 else: 

156 element, values = args 

157 # pull appropriate subclass from registry of annotated 

158 # classes 

159 try: 

160 cls = annotated_classes[element.__class__] 

161 except KeyError: 

162 cls = _new_annotation_type(element.__class__, cls) 

163 return object.__new__(cls) 

164 

165 def __init__(self, element, values): 

166 self.__dict__ = element.__dict__.copy() 

167 self.__dict__.pop("_annotations_cache_key", None) 

168 self.__dict__.pop("_generate_cache_key", None) 

169 self.__element = element 

170 self._annotations = util.immutabledict(values) 

171 self._hash = hash(element) 

172 

173 def _annotate(self, values): 

174 _values = self._annotations.union(values) 

175 return self._with_annotations(_values) 

176 

177 def _with_annotations(self, values): 

178 clone = self.__class__.__new__(self.__class__) 

179 clone.__dict__ = self.__dict__.copy() 

180 clone.__dict__.pop("_annotations_cache_key", None) 

181 clone.__dict__.pop("_generate_cache_key", None) 

182 clone._annotations = values 

183 return clone 

184 

185 def _deannotate(self, values=None, clone=True): 

186 if values is None: 

187 return self.__element 

188 else: 

189 return self._with_annotations( 

190 util.immutabledict( 

191 { 

192 key: value 

193 for key, value in self._annotations.items() 

194 if key not in values 

195 } 

196 ) 

197 ) 

198 

199 def _compiler_dispatch(self, visitor, **kw): 

200 return self.__element.__class__._compiler_dispatch(self, visitor, **kw) 

201 

202 @property 

203 def _constructor(self): 

204 return self.__element._constructor 

205 

206 def _clone(self, **kw): 

207 clone = self.__element._clone(**kw) 

208 if clone is self.__element: 

209 # detect immutable, don't change anything 

210 return self 

211 else: 

212 # update the clone with any changes that have occurred 

213 # to this object's __dict__. 

214 clone.__dict__.update(self.__dict__) 

215 return self.__class__(clone, self._annotations) 

216 

217 def __reduce__(self): 

218 return self.__class__, (self.__element, self._annotations) 

219 

220 def __hash__(self): 

221 return self._hash 

222 

223 def __eq__(self, other): 

224 if self._is_column_operators: 

225 return self.__element.__class__.__eq__(self, other) 

226 else: 

227 return hash(other) == hash(self) 

228 

229 @property 

230 def entity_namespace(self): 

231 if "entity_namespace" in self._annotations: 

232 return self._annotations["entity_namespace"].entity_namespace 

233 else: 

234 return self.__element.entity_namespace 

235 

236 

237# hard-generate Annotated subclasses. this technique 

238# is used instead of on-the-fly types (i.e. type.__new__()) 

239# so that the resulting objects are pickleable; additionally, other 

240# decisions can be made up front about the type of object being annotated 

241# just once per class rather than per-instance. 

242annotated_classes = {} 

243 

244 

245def _deep_annotate( 

246 element, annotations, exclude=None, detect_subquery_cols=False 

247): 

248 """Deep copy the given ClauseElement, annotating each element 

249 with the given annotations dictionary. 

250 

251 Elements within the exclude collection will be cloned but not annotated. 

252 

253 """ 

254 

255 # annotated objects hack the __hash__() method so if we want to 

256 # uniquely process them we have to use id() 

257 

258 cloned_ids = {} 

259 

260 def clone(elem, **kw): 

261 kw["detect_subquery_cols"] = detect_subquery_cols 

262 id_ = id(elem) 

263 

264 if id_ in cloned_ids: 

265 return cloned_ids[id_] 

266 

267 if ( 

268 exclude 

269 and hasattr(elem, "proxy_set") 

270 and elem.proxy_set.intersection(exclude) 

271 ): 

272 newelem = elem._clone(clone=clone, **kw) 

273 elif annotations != elem._annotations: 

274 if detect_subquery_cols and elem._is_immutable: 

275 newelem = elem._clone(clone=clone, **kw)._annotate(annotations) 

276 else: 

277 newelem = elem._annotate(annotations) 

278 else: 

279 newelem = elem 

280 newelem._copy_internals(clone=clone) 

281 cloned_ids[id_] = newelem 

282 return newelem 

283 

284 if element is not None: 

285 element = clone(element) 

286 clone = None # remove gc cycles 

287 return element 

288 

289 

290def _deep_deannotate(element, values=None): 

291 """Deep copy the given element, removing annotations.""" 

292 

293 cloned = {} 

294 

295 def clone(elem, **kw): 

296 if values: 

297 key = id(elem) 

298 else: 

299 key = elem 

300 

301 if key not in cloned: 

302 newelem = elem._deannotate(values=values, clone=True) 

303 newelem._copy_internals(clone=clone) 

304 cloned[key] = newelem 

305 return newelem 

306 else: 

307 return cloned[key] 

308 

309 if element is not None: 

310 element = clone(element) 

311 clone = None # remove gc cycles 

312 return element 

313 

314 

315def _shallow_annotate(element, annotations): 

316 """Annotate the given ClauseElement and copy its internals so that 

317 internal objects refer to the new annotated object. 

318 

319 Basically used to apply a "don't traverse" annotation to a 

320 selectable, without digging throughout the whole 

321 structure wasting time. 

322 """ 

323 element = element._annotate(annotations) 

324 element._copy_internals() 

325 return element 

326 

327 

328def _new_annotation_type(cls, base_cls): 

329 if issubclass(cls, Annotated): 

330 return cls 

331 elif cls in annotated_classes: 

332 return annotated_classes[cls] 

333 

334 for super_ in cls.__mro__: 

335 # check if an Annotated subclass more specific than 

336 # the given base_cls is already registered, such 

337 # as AnnotatedColumnElement. 

338 if super_ in annotated_classes: 

339 base_cls = annotated_classes[super_] 

340 break 

341 

342 annotated_classes[cls] = anno_cls = type( 

343 "Annotated%s" % cls.__name__, (base_cls, cls), {} 

344 ) 

345 globals()["Annotated%s" % cls.__name__] = anno_cls 

346 

347 if "_traverse_internals" in cls.__dict__: 

348 anno_cls._traverse_internals = list(cls._traverse_internals) + [ 

349 ("_annotations", InternalTraversal.dp_annotations_key) 

350 ] 

351 elif cls.__dict__.get("inherit_cache", False): 

352 anno_cls._traverse_internals = list(cls._traverse_internals) + [ 

353 ("_annotations", InternalTraversal.dp_annotations_key) 

354 ] 

355 

356 # some classes include this even if they have traverse_internals 

357 # e.g. BindParameter, add it if present. 

358 if cls.__dict__.get("inherit_cache", False): 

359 anno_cls.inherit_cache = True 

360 

361 anno_cls._is_column_operators = issubclass(cls, operators.ColumnOperators) 

362 

363 return anno_cls 

364 

365 

366def _prepare_annotations(target_hierarchy, base_cls): 

367 for cls in util.walk_subclasses(target_hierarchy): 

368 _new_annotation_type(cls, base_cls)