1# sql/annotation.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8"""The :class:`.Annotated` class and related routines; creates hash-equivalent
9copies of SQL constructs which contain context-specific markers and
10associations.
11
12Note that the :class:`.Annotated` concept as implemented in this module is not
13related in any way to the pep-593 concept of "Annotated".
14
15
16"""
17
18from __future__ import annotations
19
20import typing
21from typing import Any
22from typing import Callable
23from typing import cast
24from typing import Dict
25from typing import FrozenSet
26from typing import Mapping
27from typing import Optional
28from typing import overload
29from typing import Sequence
30from typing import Tuple
31from typing import Type
32from typing import TYPE_CHECKING
33from typing import TypeVar
34
35from . import operators
36from .cache_key import HasCacheKey
37from .visitors import anon_map
38from .visitors import ExternallyTraversible
39from .visitors import InternalTraversal
40from .. import util
41from ..util.typing import Literal
42from ..util.typing import Self
43
44if TYPE_CHECKING:
45 from .base import _EntityNamespace
46 from .visitors import _TraverseInternalsType
47
48_AnnotationDict = Mapping[str, Any]
49
50EMPTY_ANNOTATIONS: util.immutabledict[str, Any] = util.EMPTY_DICT
51
52
53class SupportsAnnotations(ExternallyTraversible):
54 __slots__ = ()
55
56 _annotations: util.immutabledict[str, Any] = EMPTY_ANNOTATIONS
57
58 proxy_set: util.generic_fn_descriptor[FrozenSet[Any]]
59
60 _is_immutable: bool
61
62 def _annotate(self, values: _AnnotationDict) -> Self:
63 raise NotImplementedError()
64
65 @overload
66 def _deannotate(
67 self,
68 values: Literal[None] = ...,
69 clone: bool = ...,
70 ) -> Self: ...
71
72 @overload
73 def _deannotate(
74 self,
75 values: Sequence[str] = ...,
76 clone: bool = ...,
77 ) -> SupportsAnnotations: ...
78
79 def _deannotate(
80 self,
81 values: Optional[Sequence[str]] = None,
82 clone: bool = False,
83 ) -> SupportsAnnotations:
84 raise NotImplementedError()
85
86 @util.memoized_property
87 def _annotations_cache_key(self) -> Tuple[Any, ...]:
88 anon_map_ = anon_map()
89
90 return self._gen_annotations_cache_key(anon_map_)
91
92 def _gen_annotations_cache_key(
93 self, anon_map: anon_map
94 ) -> Tuple[Any, ...]:
95 return (
96 "_annotations",
97 tuple(
98 (
99 key,
100 (
101 value._gen_cache_key(anon_map, [])
102 if isinstance(value, HasCacheKey)
103 else value
104 ),
105 )
106 for key, value in [
107 (key, self._annotations[key])
108 for key in sorted(self._annotations)
109 ]
110 ),
111 )
112
113
114class SupportsWrappingAnnotations(SupportsAnnotations):
115 __slots__ = ()
116
117 _constructor: Callable[..., SupportsWrappingAnnotations]
118
119 if TYPE_CHECKING:
120
121 @util.ro_non_memoized_property
122 def entity_namespace(self) -> _EntityNamespace: ...
123
124 def _annotate(self, values: _AnnotationDict) -> Self:
125 """return a copy of this ClauseElement with annotations
126 updated by the given dictionary.
127
128 """
129 return Annotated._as_annotated_instance(self, values) # type: ignore
130
131 def _with_annotations(self, values: _AnnotationDict) -> Self:
132 """return a copy of this ClauseElement with annotations
133 replaced by the given dictionary.
134
135 """
136 return Annotated._as_annotated_instance(self, values) # type: ignore
137
138 @overload
139 def _deannotate(
140 self,
141 values: Literal[None] = ...,
142 clone: bool = ...,
143 ) -> Self: ...
144
145 @overload
146 def _deannotate(
147 self,
148 values: Sequence[str] = ...,
149 clone: bool = ...,
150 ) -> SupportsAnnotations: ...
151
152 def _deannotate(
153 self,
154 values: Optional[Sequence[str]] = None,
155 clone: bool = False,
156 ) -> SupportsAnnotations:
157 """return a copy of this :class:`_expression.ClauseElement`
158 with annotations
159 removed.
160
161 :param values: optional tuple of individual values
162 to remove.
163
164 """
165 if clone:
166 s = self._clone()
167 return s
168 else:
169 return self
170
171
172class SupportsCloneAnnotations(SupportsWrappingAnnotations):
173 # SupportsCloneAnnotations extends from SupportsWrappingAnnotations
174 # to support the structure of having the base ClauseElement
175 # be a subclass of SupportsWrappingAnnotations. Any ClauseElement
176 # subclass that wants to extend from SupportsCloneAnnotations
177 # will inherently also be subclassing SupportsWrappingAnnotations, so
178 # make that specific here.
179
180 if not typing.TYPE_CHECKING:
181 __slots__ = ()
182
183 _clone_annotations_traverse_internals: _TraverseInternalsType = [
184 ("_annotations", InternalTraversal.dp_annotations_key)
185 ]
186
187 def _annotate(self, values: _AnnotationDict) -> Self:
188 """return a copy of this ClauseElement with annotations
189 updated by the given dictionary.
190
191 """
192 new = self._clone()
193 new._annotations = new._annotations.union(values)
194 new.__dict__.pop("_annotations_cache_key", None)
195 new.__dict__.pop("_generate_cache_key", None)
196 return new
197
198 def _with_annotations(self, values: _AnnotationDict) -> Self:
199 """return a copy of this ClauseElement with annotations
200 replaced by the given dictionary.
201
202 """
203 new = self._clone()
204 new._annotations = util.immutabledict(values)
205 new.__dict__.pop("_annotations_cache_key", None)
206 new.__dict__.pop("_generate_cache_key", None)
207 return new
208
209 @overload
210 def _deannotate(
211 self,
212 values: Literal[None] = ...,
213 clone: bool = ...,
214 ) -> Self: ...
215
216 @overload
217 def _deannotate(
218 self,
219 values: Sequence[str] = ...,
220 clone: bool = ...,
221 ) -> SupportsAnnotations: ...
222
223 def _deannotate(
224 self,
225 values: Optional[Sequence[str]] = None,
226 clone: bool = False,
227 ) -> SupportsAnnotations:
228 """return a copy of this :class:`_expression.ClauseElement`
229 with annotations
230 removed.
231
232 :param values: optional tuple of individual values
233 to remove.
234
235 """
236 if clone or self._annotations:
237 # clone is used when we are also copying
238 # the expression for a deep deannotation
239 new = self._clone()
240 new._annotations = util.immutabledict()
241 new.__dict__.pop("_annotations_cache_key", None)
242 return new
243 else:
244 return self
245
246
247class Annotated(SupportsAnnotations):
248 """clones a SupportsAnnotations and applies an 'annotations' dictionary.
249
250 Unlike regular clones, this clone also mimics __hash__() and
251 __eq__() of the original element so that it takes its place
252 in hashed collections.
253
254 A reference to the original element is maintained, for the important
255 reason of keeping its hash value current. When GC'ed, the
256 hash value may be reused, causing conflicts.
257
258 .. note:: The rationale for Annotated producing a brand new class,
259 rather than placing the functionality directly within ClauseElement,
260 is **performance**. The __hash__() method is absent on plain
261 ClauseElement which leads to significantly reduced function call
262 overhead, as the use of sets and dictionaries against ClauseElement
263 objects is prevalent, but most are not "annotated".
264
265 """
266
267 _is_column_operators = False
268
269 @classmethod
270 def _as_annotated_instance(
271 cls, element: SupportsWrappingAnnotations, values: _AnnotationDict
272 ) -> Annotated:
273 try:
274 cls = annotated_classes[element.__class__]
275 except KeyError:
276 cls = _new_annotation_type(element.__class__, cls)
277 return cls(element, values)
278
279 _annotations: util.immutabledict[str, Any]
280 __element: SupportsWrappingAnnotations
281 _hash: int
282
283 def __new__(cls: Type[Self], *args: Any) -> Self:
284 return object.__new__(cls)
285
286 def __init__(
287 self, element: SupportsWrappingAnnotations, values: _AnnotationDict
288 ):
289 self.__dict__ = element.__dict__.copy()
290 self.__dict__.pop("_annotations_cache_key", None)
291 self.__dict__.pop("_generate_cache_key", None)
292 self.__element = element
293 self._annotations = util.immutabledict(values)
294 self._hash = hash(element)
295
296 def _annotate(self, values: _AnnotationDict) -> Self:
297 _values = self._annotations.union(values)
298 new = self._with_annotations(_values)
299 return new
300
301 def _with_annotations(self, values: _AnnotationDict) -> Self:
302 clone = self.__class__.__new__(self.__class__)
303 clone.__dict__ = self.__dict__.copy()
304 clone.__dict__.pop("_annotations_cache_key", None)
305 clone.__dict__.pop("_generate_cache_key", None)
306 clone._annotations = util.immutabledict(values)
307 return clone
308
309 @overload
310 def _deannotate(
311 self,
312 values: Literal[None] = ...,
313 clone: bool = ...,
314 ) -> Self: ...
315
316 @overload
317 def _deannotate(
318 self,
319 values: Sequence[str] = ...,
320 clone: bool = ...,
321 ) -> Annotated: ...
322
323 def _deannotate(
324 self,
325 values: Optional[Sequence[str]] = None,
326 clone: bool = True,
327 ) -> SupportsAnnotations:
328 if values is None:
329 return self.__element
330 else:
331 return self._with_annotations(
332 util.immutabledict(
333 {
334 key: value
335 for key, value in self._annotations.items()
336 if key not in values
337 }
338 )
339 )
340
341 if not typing.TYPE_CHECKING:
342 # manually proxy some methods that need extra attention
343 def _compiler_dispatch(self, visitor: Any, **kw: Any) -> Any:
344 return self.__element.__class__._compiler_dispatch(
345 self, visitor, **kw
346 )
347
348 @property
349 def _constructor(self):
350 return self.__element._constructor
351
352 def _clone(self, **kw: Any) -> Self:
353 clone = self.__element._clone(**kw)
354 if clone is self.__element:
355 # detect immutable, don't change anything
356 return self
357 else:
358 # update the clone with any changes that have occurred
359 # to this object's __dict__.
360 clone.__dict__.update(self.__dict__)
361 return self.__class__(clone, self._annotations)
362
363 def __reduce__(self) -> Tuple[Type[Annotated], Tuple[Any, ...]]:
364 return self.__class__, (self.__element, self._annotations)
365
366 def __hash__(self) -> int:
367 return self._hash
368
369 def __eq__(self, other: Any) -> bool:
370 if self._is_column_operators:
371 return self.__element.__class__.__eq__(self, other)
372 else:
373 return hash(other) == hash(self)
374
375 @util.ro_non_memoized_property
376 def entity_namespace(self) -> _EntityNamespace:
377 if "entity_namespace" in self._annotations:
378 return cast(
379 SupportsWrappingAnnotations,
380 self._annotations["entity_namespace"],
381 ).entity_namespace
382 else:
383 return self.__element.entity_namespace
384
385
386# hard-generate Annotated subclasses. this technique
387# is used instead of on-the-fly types (i.e. type.__new__())
388# so that the resulting objects are pickleable; additionally, other
389# decisions can be made up front about the type of object being annotated
390# just once per class rather than per-instance.
391annotated_classes: Dict[Type[SupportsWrappingAnnotations], Type[Annotated]] = (
392 {}
393)
394
395_SA = TypeVar("_SA", bound="SupportsAnnotations")
396
397
398def _safe_annotate(to_annotate: _SA, annotations: _AnnotationDict) -> _SA:
399 try:
400 _annotate = to_annotate._annotate
401 except AttributeError:
402 # skip objects that don't actually have an `_annotate`
403 # attribute, namely QueryableAttribute inside of a join
404 # condition
405 return to_annotate
406 else:
407 return _annotate(annotations)
408
409
410def _deep_annotate(
411 element: _SA,
412 annotations: _AnnotationDict,
413 exclude: Optional[Sequence[SupportsAnnotations]] = None,
414 *,
415 detect_subquery_cols: bool = False,
416 ind_cols_on_fromclause: bool = False,
417 annotate_callable: Optional[
418 Callable[[SupportsAnnotations, _AnnotationDict], SupportsAnnotations]
419 ] = None,
420) -> _SA:
421 """Deep copy the given ClauseElement, annotating each element
422 with the given annotations dictionary.
423
424 Elements within the exclude collection will be cloned but not annotated.
425
426 """
427
428 # annotated objects hack the __hash__() method so if we want to
429 # uniquely process them we have to use id()
430
431 cloned_ids: Dict[int, SupportsAnnotations] = {}
432
433 def clone(elem: SupportsAnnotations, **kw: Any) -> SupportsAnnotations:
434 # ind_cols_on_fromclause means make sure an AnnotatedFromClause
435 # has its own .c collection independent of that which its proxying.
436 # this is used specifically by orm.LoaderCriteriaOption to break
437 # a reference cycle that it's otherwise prone to building,
438 # see test_relationship_criteria->
439 # test_loader_criteria_subquery_w_same_entity. logic here was
440 # changed for #8796 and made explicit; previously it occurred
441 # by accident
442
443 kw["detect_subquery_cols"] = detect_subquery_cols
444 id_ = id(elem)
445
446 if id_ in cloned_ids:
447 return cloned_ids[id_]
448
449 if (
450 exclude
451 and hasattr(elem, "proxy_set")
452 and elem.proxy_set.intersection(exclude)
453 ):
454 newelem = elem._clone(clone=clone, **kw)
455 elif annotations != elem._annotations:
456 if detect_subquery_cols and elem._is_immutable:
457 to_annotate = elem._clone(clone=clone, **kw)
458 else:
459 to_annotate = elem
460 if annotate_callable:
461 newelem = annotate_callable(to_annotate, annotations)
462 else:
463 newelem = _safe_annotate(to_annotate, annotations)
464 else:
465 newelem = elem
466
467 newelem._copy_internals(
468 clone=clone,
469 ind_cols_on_fromclause=ind_cols_on_fromclause,
470 _annotations_traversal=True,
471 )
472
473 cloned_ids[id_] = newelem
474 return newelem
475
476 if element is not None:
477 element = cast(_SA, clone(element))
478 clone = None # type: ignore # remove gc cycles
479 return element
480
481
482@overload
483def _deep_deannotate(
484 element: Literal[None], values: Optional[Sequence[str]] = None
485) -> Literal[None]: ...
486
487
488@overload
489def _deep_deannotate(
490 element: _SA, values: Optional[Sequence[str]] = None
491) -> _SA: ...
492
493
494def _deep_deannotate(
495 element: Optional[_SA], values: Optional[Sequence[str]] = None
496) -> Optional[_SA]:
497 """Deep copy the given element, removing annotations."""
498
499 cloned: Dict[Any, SupportsAnnotations] = {}
500
501 def clone(elem: SupportsAnnotations, **kw: Any) -> SupportsAnnotations:
502 key: Any
503 if values:
504 key = id(elem)
505 else:
506 key = elem
507
508 if key not in cloned:
509 newelem = elem._deannotate(values=values, clone=True)
510 newelem._copy_internals(clone=clone, _annotations_traversal=True)
511 cloned[key] = newelem
512 return newelem
513 else:
514 return cloned[key]
515
516 if element is not None:
517 element = cast(_SA, clone(element))
518 clone = None # type: ignore # remove gc cycles
519 return element
520
521
522def _shallow_annotate(element: _SA, annotations: _AnnotationDict) -> _SA:
523 """Annotate the given ClauseElement and copy its internals so that
524 internal objects refer to the new annotated object.
525
526 Basically used to apply a "don't traverse" annotation to a
527 selectable, without digging throughout the whole
528 structure wasting time.
529 """
530 element = element._annotate(annotations)
531 element._copy_internals(_annotations_traversal=True)
532 return element
533
534
535def _new_annotation_type(
536 cls: Type[SupportsWrappingAnnotations], base_cls: Type[Annotated]
537) -> Type[Annotated]:
538 """Generates a new class that subclasses Annotated and proxies a given
539 element type.
540
541 """
542 if issubclass(cls, Annotated):
543 return cls
544 elif cls in annotated_classes:
545 return annotated_classes[cls]
546
547 for super_ in cls.__mro__:
548 # check if an Annotated subclass more specific than
549 # the given base_cls is already registered, such
550 # as AnnotatedColumnElement.
551 if super_ in annotated_classes:
552 base_cls = annotated_classes[super_]
553 break
554
555 annotated_classes[cls] = anno_cls = cast(
556 Type[Annotated],
557 type("Annotated%s" % cls.__name__, (base_cls, cls), {}),
558 )
559 globals()["Annotated%s" % cls.__name__] = anno_cls
560
561 if "_traverse_internals" in cls.__dict__:
562 anno_cls._traverse_internals = list(cls._traverse_internals) + [
563 ("_annotations", InternalTraversal.dp_annotations_key)
564 ]
565 elif cls.__dict__.get("inherit_cache", False):
566 anno_cls._traverse_internals = list(cls._traverse_internals) + [
567 ("_annotations", InternalTraversal.dp_annotations_key)
568 ]
569
570 # some classes include this even if they have traverse_internals
571 # e.g. BindParameter, add it if present.
572 if cls.__dict__.get("inherit_cache", False):
573 anno_cls.inherit_cache = True # type: ignore
574 elif "inherit_cache" in cls.__dict__:
575 anno_cls.inherit_cache = cls.__dict__["inherit_cache"] # type: ignore
576
577 anno_cls._is_column_operators = issubclass(cls, operators.ColumnOperators)
578
579 return anno_cls
580
581
582def _prepare_annotations(
583 target_hierarchy: Type[SupportsWrappingAnnotations],
584 base_cls: Type[Annotated],
585) -> None:
586 for cls in util.walk_subclasses(target_hierarchy):
587 _new_annotation_type(cls, base_cls)