1# sql/annotation.py
2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8"""The :class:`.Annotated` class and related routines; creates hash-equivalent
9copies of SQL constructs which contain context-specific markers and
10associations.
11
12"""
13
14from . import operators
15from .base import HasCacheKey
16from .traversals import anon_map
17from .visitors import InternalTraversal
18from .. import util
19
20EMPTY_ANNOTATIONS = util.immutabledict()
21
22
23class SupportsAnnotations(object):
24 _annotations = EMPTY_ANNOTATIONS
25
26 @util.memoized_property
27 def _annotations_cache_key(self):
28 anon_map_ = anon_map()
29
30 return self._gen_annotations_cache_key(anon_map_)
31
32 def _gen_annotations_cache_key(self, anon_map):
33 return (
34 "_annotations",
35 tuple(
36 (
37 key,
38 value._gen_cache_key(anon_map, [])
39 if isinstance(value, HasCacheKey)
40 else value,
41 )
42 for key, value in [
43 (key, self._annotations[key])
44 for key in sorted(self._annotations)
45 ]
46 ),
47 )
48
49
50class SupportsCloneAnnotations(SupportsAnnotations):
51
52 _clone_annotations_traverse_internals = [
53 ("_annotations", InternalTraversal.dp_annotations_key)
54 ]
55
56 def _annotate(self, values):
57 """return a copy of this ClauseElement with annotations
58 updated by the given dictionary.
59
60 """
61 new = self._clone()
62 new._annotations = new._annotations.union(values)
63 new.__dict__.pop("_annotations_cache_key", None)
64 new.__dict__.pop("_generate_cache_key", None)
65 return new
66
67 def _with_annotations(self, values):
68 """return a copy of this ClauseElement with annotations
69 replaced by the given dictionary.
70
71 """
72 new = self._clone()
73 new._annotations = util.immutabledict(values)
74 new.__dict__.pop("_annotations_cache_key", None)
75 new.__dict__.pop("_generate_cache_key", None)
76 return new
77
78 def _deannotate(self, values=None, clone=False):
79 """return a copy of this :class:`_expression.ClauseElement`
80 with annotations
81 removed.
82
83 :param values: optional tuple of individual values
84 to remove.
85
86 """
87 if clone or self._annotations:
88 # clone is used when we are also copying
89 # the expression for a deep deannotation
90 new = self._clone()
91 new._annotations = util.immutabledict()
92 new.__dict__.pop("_annotations_cache_key", None)
93 return new
94 else:
95 return self
96
97
98class SupportsWrappingAnnotations(SupportsAnnotations):
99 def _annotate(self, values):
100 """return a copy of this ClauseElement with annotations
101 updated by the given dictionary.
102
103 """
104 return Annotated(self, values)
105
106 def _with_annotations(self, values):
107 """return a copy of this ClauseElement with annotations
108 replaced by the given dictionary.
109
110 """
111 return Annotated(self, values)
112
113 def _deannotate(self, values=None, clone=False):
114 """return a copy of this :class:`_expression.ClauseElement`
115 with annotations
116 removed.
117
118 :param values: optional tuple of individual values
119 to remove.
120
121 """
122 if clone:
123 s = self._clone()
124 return s
125 else:
126 return self
127
128
129class Annotated(object):
130 """clones a SupportsAnnotated and applies an 'annotations' dictionary.
131
132 Unlike regular clones, this clone also mimics __hash__() and
133 __cmp__() of the original element so that it takes its place
134 in hashed collections.
135
136 A reference to the original element is maintained, for the important
137 reason of keeping its hash value current. When GC'ed, the
138 hash value may be reused, causing conflicts.
139
140 .. note:: The rationale for Annotated producing a brand new class,
141 rather than placing the functionality directly within ClauseElement,
142 is **performance**. The __hash__() method is absent on plain
143 ClauseElement which leads to significantly reduced function call
144 overhead, as the use of sets and dictionaries against ClauseElement
145 objects is prevalent, but most are not "annotated".
146
147 """
148
149 _is_column_operators = False
150
151 def __new__(cls, *args):
152 if not args:
153 # clone constructor
154 return object.__new__(cls)
155 else:
156 element, values = args
157 # pull appropriate subclass from registry of annotated
158 # classes
159 try:
160 cls = annotated_classes[element.__class__]
161 except KeyError:
162 cls = _new_annotation_type(element.__class__, cls)
163 return object.__new__(cls)
164
165 def __init__(self, element, values):
166 self.__dict__ = element.__dict__.copy()
167 self.__dict__.pop("_annotations_cache_key", None)
168 self.__dict__.pop("_generate_cache_key", None)
169 self.__element = element
170 self._annotations = util.immutabledict(values)
171 self._hash = hash(element)
172
173 def _annotate(self, values):
174 _values = self._annotations.union(values)
175 return self._with_annotations(_values)
176
177 def _with_annotations(self, values):
178 clone = self.__class__.__new__(self.__class__)
179 clone.__dict__ = self.__dict__.copy()
180 clone.__dict__.pop("_annotations_cache_key", None)
181 clone.__dict__.pop("_generate_cache_key", None)
182 clone._annotations = values
183 return clone
184
185 def _deannotate(self, values=None, clone=True):
186 if values is None:
187 return self.__element
188 else:
189 return self._with_annotations(
190 util.immutabledict(
191 {
192 key: value
193 for key, value in self._annotations.items()
194 if key not in values
195 }
196 )
197 )
198
199 def _compiler_dispatch(self, visitor, **kw):
200 return self.__element.__class__._compiler_dispatch(self, visitor, **kw)
201
202 @property
203 def _constructor(self):
204 return self.__element._constructor
205
206 def _clone(self, **kw):
207 clone = self.__element._clone(**kw)
208 if clone is self.__element:
209 # detect immutable, don't change anything
210 return self
211 else:
212 # update the clone with any changes that have occurred
213 # to this object's __dict__.
214 clone.__dict__.update(self.__dict__)
215 return self.__class__(clone, self._annotations)
216
217 def __reduce__(self):
218 return self.__class__, (self.__element, self._annotations)
219
220 def __hash__(self):
221 return self._hash
222
223 def __eq__(self, other):
224 if self._is_column_operators:
225 return self.__element.__class__.__eq__(self, other)
226 else:
227 return hash(other) == hash(self)
228
229 @property
230 def entity_namespace(self):
231 if "entity_namespace" in self._annotations:
232 return self._annotations["entity_namespace"].entity_namespace
233 else:
234 return self.__element.entity_namespace
235
236
237# hard-generate Annotated subclasses. this technique
238# is used instead of on-the-fly types (i.e. type.__new__())
239# so that the resulting objects are pickleable; additionally, other
240# decisions can be made up front about the type of object being annotated
241# just once per class rather than per-instance.
242annotated_classes = {}
243
244
245def _safe_annotate(to_annotate, annotations):
246 try:
247 _annotate = to_annotate._annotate
248 except AttributeError:
249 # skip objects that don't actually have an `_annotate`
250 # attribute, namely QueryableAttribute inside of a join
251 # condition
252 return to_annotate
253 else:
254 return _annotate(annotations)
255
256
257def _deep_annotate(
258 element, annotations, exclude=None, detect_subquery_cols=False
259):
260 """Deep copy the given ClauseElement, annotating each element
261 with the given annotations dictionary.
262
263 Elements within the exclude collection will be cloned but not annotated.
264
265 """
266
267 # annotated objects hack the __hash__() method so if we want to
268 # uniquely process them we have to use id()
269
270 cloned_ids = {}
271
272 def clone(elem, **kw):
273 kw["detect_subquery_cols"] = detect_subquery_cols
274 id_ = id(elem)
275
276 if id_ in cloned_ids:
277 return cloned_ids[id_]
278
279 if (
280 exclude
281 and hasattr(elem, "proxy_set")
282 and elem.proxy_set.intersection(exclude)
283 ):
284 newelem = elem._clone(clone=clone, **kw)
285 elif annotations != elem._annotations:
286 if detect_subquery_cols and elem._is_immutable:
287 newelem = _safe_annotate(
288 elem._clone(clone=clone, **kw), annotations
289 )
290 else:
291 newelem = _safe_annotate(elem, annotations)
292 else:
293 newelem = elem
294 newelem._copy_internals(clone=clone)
295 cloned_ids[id_] = newelem
296 return newelem
297
298 if element is not None:
299 element = clone(element)
300 clone = None # remove gc cycles
301 return element
302
303
304def _deep_deannotate(element, values=None):
305 """Deep copy the given element, removing annotations."""
306
307 cloned = {}
308
309 def clone(elem, **kw):
310 if values:
311 key = id(elem)
312 else:
313 key = elem
314
315 if key not in cloned:
316 newelem = elem._deannotate(values=values, clone=True)
317 newelem._copy_internals(clone=clone)
318 cloned[key] = newelem
319 return newelem
320 else:
321 return cloned[key]
322
323 if element is not None:
324 element = clone(element)
325 clone = None # remove gc cycles
326 return element
327
328
329def _shallow_annotate(element, annotations):
330 """Annotate the given ClauseElement and copy its internals so that
331 internal objects refer to the new annotated object.
332
333 Basically used to apply a "don't traverse" annotation to a
334 selectable, without digging throughout the whole
335 structure wasting time.
336 """
337 element = element._annotate(annotations)
338 element._copy_internals()
339 return element
340
341
342def _new_annotation_type(cls, base_cls):
343 if issubclass(cls, Annotated):
344 return cls
345 elif cls in annotated_classes:
346 return annotated_classes[cls]
347
348 for super_ in cls.__mro__:
349 # check if an Annotated subclass more specific than
350 # the given base_cls is already registered, such
351 # as AnnotatedColumnElement.
352 if super_ in annotated_classes:
353 base_cls = annotated_classes[super_]
354 break
355
356 annotated_classes[cls] = anno_cls = type(
357 "Annotated%s" % cls.__name__, (base_cls, cls), {}
358 )
359 globals()["Annotated%s" % cls.__name__] = anno_cls
360
361 if "_traverse_internals" in cls.__dict__:
362 anno_cls._traverse_internals = list(cls._traverse_internals) + [
363 ("_annotations", InternalTraversal.dp_annotations_key)
364 ]
365 elif cls.__dict__.get("inherit_cache", False):
366 anno_cls._traverse_internals = list(cls._traverse_internals) + [
367 ("_annotations", InternalTraversal.dp_annotations_key)
368 ]
369
370 # some classes include this even if they have traverse_internals
371 # e.g. BindParameter, add it if present.
372 if cls.__dict__.get("inherit_cache", False):
373 anno_cls.inherit_cache = True
374
375 anno_cls._is_column_operators = issubclass(cls, operators.ColumnOperators)
376
377 return anno_cls
378
379
380def _prepare_annotations(target_hierarchy, base_cls):
381 for cls in util.walk_subclasses(target_hierarchy):
382 _new_annotation_type(cls, base_cls)