1# sql/annotation.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8"""The :class:`.Annotated` class and related routines; creates hash-equivalent
9copies of SQL constructs which contain context-specific markers and
10associations.
11
12Note that the :class:`.Annotated` concept as implemented in this module is not
13related in any way to the pep-593 concept of "Annotated".
14
15
16"""
17
18from __future__ import annotations
19
20import typing
21from typing import Any
22from typing import Callable
23from typing import cast
24from typing import Dict
25from typing import FrozenSet
26from typing import Mapping
27from typing import Optional
28from typing import overload
29from typing import Sequence
30from typing import Tuple
31from typing import Type
32from typing import TYPE_CHECKING
33from typing import TypeVar
34
35from . import operators
36from .cache_key import HasCacheKey
37from .visitors import anon_map
38from .visitors import ExternallyTraversible
39from .visitors import InternalTraversal
40from .. import util
41from ..util.typing import Literal
42from ..util.typing import Self
43
44if TYPE_CHECKING:
45 from .base import _EntityNamespace
46 from .visitors import _TraverseInternalsType
47
48_AnnotationDict = Mapping[str, Any]
49
50EMPTY_ANNOTATIONS: util.immutabledict[str, Any] = util.EMPTY_DICT
51
52
53class SupportsAnnotations(ExternallyTraversible):
54 __slots__ = ()
55
56 _annotations: util.immutabledict[str, Any] = EMPTY_ANNOTATIONS
57
58 proxy_set: util.generic_fn_descriptor[FrozenSet[Any]]
59
60 _is_immutable: bool
61
62 def _annotate(self, values: _AnnotationDict) -> Self:
63 raise NotImplementedError()
64
65 @overload
66 def _deannotate(
67 self,
68 values: Literal[None] = ...,
69 clone: bool = ...,
70 ) -> Self: ...
71
72 @overload
73 def _deannotate(
74 self,
75 values: Sequence[str] = ...,
76 clone: bool = ...,
77 ) -> SupportsAnnotations: ...
78
79 def _deannotate(
80 self,
81 values: Optional[Sequence[str]] = None,
82 clone: bool = False,
83 ) -> SupportsAnnotations:
84 raise NotImplementedError()
85
86 @util.memoized_property
87 def _annotations_cache_key(self) -> Tuple[Any, ...]:
88 anon_map_ = anon_map()
89
90 return self._gen_annotations_cache_key(anon_map_)
91
92 def _gen_annotations_cache_key(
93 self, anon_map: anon_map
94 ) -> Tuple[Any, ...]:
95 return (
96 "_annotations",
97 tuple(
98 (
99 key,
100 (
101 value._gen_cache_key(anon_map, [])
102 if isinstance(value, HasCacheKey)
103 else value
104 ),
105 )
106 for key, value in [
107 (key, self._annotations[key])
108 for key in sorted(self._annotations)
109 ]
110 ),
111 )
112
113
114class SupportsWrappingAnnotations(SupportsAnnotations):
115 __slots__ = ()
116
117 _constructor: Callable[..., SupportsWrappingAnnotations]
118
119 if TYPE_CHECKING:
120
121 @util.ro_non_memoized_property
122 def entity_namespace(self) -> _EntityNamespace: ...
123
124 def _annotate(self, values: _AnnotationDict) -> Self:
125 """return a copy of this ClauseElement with annotations
126 updated by the given dictionary.
127
128 """
129 return Annotated._as_annotated_instance(self, values) # type: ignore
130
131 def _with_annotations(self, values: _AnnotationDict) -> Self:
132 """return a copy of this ClauseElement with annotations
133 replaced by the given dictionary.
134
135 """
136 return Annotated._as_annotated_instance(self, values) # type: ignore
137
138 @overload
139 def _deannotate(
140 self,
141 values: Literal[None] = ...,
142 clone: bool = ...,
143 ) -> Self: ...
144
145 @overload
146 def _deannotate(
147 self,
148 values: Sequence[str] = ...,
149 clone: bool = ...,
150 ) -> SupportsAnnotations: ...
151
152 def _deannotate(
153 self,
154 values: Optional[Sequence[str]] = None,
155 clone: bool = False,
156 ) -> SupportsAnnotations:
157 """return a copy of this :class:`_expression.ClauseElement`
158 with annotations
159 removed.
160
161 :param values: optional tuple of individual values
162 to remove.
163
164 """
165 if clone:
166 s = self._clone()
167 return s
168 else:
169 return self
170
171
172class SupportsCloneAnnotations(SupportsWrappingAnnotations):
173 # SupportsCloneAnnotations extends from SupportsWrappingAnnotations
174 # to support the structure of having the base ClauseElement
175 # be a subclass of SupportsWrappingAnnotations. Any ClauseElement
176 # subclass that wants to extend from SupportsCloneAnnotations
177 # will inherently also be subclassing SupportsWrappingAnnotations, so
178 # make that specific here.
179
180 if not typing.TYPE_CHECKING:
181 __slots__ = ()
182
183 _clone_annotations_traverse_internals: _TraverseInternalsType = [
184 ("_annotations", InternalTraversal.dp_annotations_key)
185 ]
186
187 def _annotate(self, values: _AnnotationDict) -> Self:
188 """return a copy of this ClauseElement with annotations
189 updated by the given dictionary.
190
191 """
192 new = self._clone()
193 new._annotations = new._annotations.union(values)
194 new.__dict__.pop("_annotations_cache_key", None)
195 new.__dict__.pop("_generate_cache_key", None)
196 return new
197
198 def _with_annotations(self, values: _AnnotationDict) -> Self:
199 """return a copy of this ClauseElement with annotations
200 replaced by the given dictionary.
201
202 """
203 new = self._clone()
204 new._annotations = util.immutabledict(values)
205 new.__dict__.pop("_annotations_cache_key", None)
206 new.__dict__.pop("_generate_cache_key", None)
207 return new
208
209 @overload
210 def _deannotate(
211 self,
212 values: Literal[None] = ...,
213 clone: bool = ...,
214 ) -> Self: ...
215
216 @overload
217 def _deannotate(
218 self,
219 values: Sequence[str] = ...,
220 clone: bool = ...,
221 ) -> SupportsAnnotations: ...
222
223 def _deannotate(
224 self,
225 values: Optional[Sequence[str]] = None,
226 clone: bool = False,
227 ) -> SupportsAnnotations:
228 """return a copy of this :class:`_expression.ClauseElement`
229 with annotations
230 removed.
231
232 :param values: optional tuple of individual values
233 to remove.
234
235 """
236 if clone or self._annotations:
237 # clone is used when we are also copying
238 # the expression for a deep deannotation
239 new = self._clone()
240 new._annotations = util.immutabledict()
241 new.__dict__.pop("_annotations_cache_key", None)
242 return new
243 else:
244 return self
245
246
247class Annotated(SupportsAnnotations):
248 """clones a SupportsAnnotations and applies an 'annotations' dictionary.
249
250 Unlike regular clones, this clone also mimics __hash__() and
251 __eq__() of the original element so that it takes its place
252 in hashed collections.
253
254 A reference to the original element is maintained, for the important
255 reason of keeping its hash value current. When GC'ed, the
256 hash value may be reused, causing conflicts.
257
258 .. note:: The rationale for Annotated producing a brand new class,
259 rather than placing the functionality directly within ClauseElement,
260 is **performance**. The __hash__() method is absent on plain
261 ClauseElement which leads to significantly reduced function call
262 overhead, as the use of sets and dictionaries against ClauseElement
263 objects is prevalent, but most are not "annotated".
264
265 """
266
267 _is_column_operators = False
268
269 @classmethod
270 def _as_annotated_instance(
271 cls, element: SupportsWrappingAnnotations, values: _AnnotationDict
272 ) -> Annotated:
273 try:
274 cls = annotated_classes[element.__class__]
275 except KeyError:
276 cls = _new_annotation_type(element.__class__, cls)
277 return cls(element, values)
278
279 _annotations: util.immutabledict[str, Any]
280 __element: SupportsWrappingAnnotations
281 _hash: int
282
283 def __new__(cls: Type[Self], *args: Any) -> Self:
284 return object.__new__(cls)
285
286 def __init__(
287 self, element: SupportsWrappingAnnotations, values: _AnnotationDict
288 ):
289 self.__dict__ = element.__dict__.copy()
290 self.__dict__.pop("_annotations_cache_key", None)
291 self.__dict__.pop("_generate_cache_key", None)
292 self.__element = element
293 self._annotations = util.immutabledict(values)
294 self._hash = hash(element)
295
296 def _annotate(self, values: _AnnotationDict) -> Self:
297 _values = self._annotations.union(values)
298 new = self._with_annotations(_values)
299 return new
300
301 def _with_annotations(self, values: _AnnotationDict) -> Self:
302 clone = self.__class__.__new__(self.__class__)
303 clone.__dict__ = self.__dict__.copy()
304 clone.__dict__.pop("_annotations_cache_key", None)
305 clone.__dict__.pop("_generate_cache_key", None)
306 clone._annotations = util.immutabledict(values)
307 return clone
308
309 @overload
310 def _deannotate(
311 self,
312 values: Literal[None] = ...,
313 clone: bool = ...,
314 ) -> Self: ...
315
316 @overload
317 def _deannotate(
318 self,
319 values: Sequence[str] = ...,
320 clone: bool = ...,
321 ) -> Annotated: ...
322
323 def _deannotate(
324 self,
325 values: Optional[Sequence[str]] = None,
326 clone: bool = True,
327 ) -> SupportsAnnotations:
328 if values is None:
329 return self.__element
330 else:
331 return self._with_annotations(
332 util.immutabledict(
333 {
334 key: value
335 for key, value in self._annotations.items()
336 if key not in values
337 }
338 )
339 )
340
341 if not typing.TYPE_CHECKING:
342 # manually proxy some methods that need extra attention
343 def _compiler_dispatch(self, visitor: Any, **kw: Any) -> Any:
344 return self.__element.__class__._compiler_dispatch(
345 self, visitor, **kw
346 )
347
348 @property
349 def _constructor(self):
350 return self.__element._constructor
351
352 def _clone(self, **kw: Any) -> Self:
353 clone = self.__element._clone(**kw)
354 if clone is self.__element:
355 # detect immutable, don't change anything
356 return self
357 else:
358 # update the clone with any changes that have occurred
359 # to this object's __dict__.
360 clone.__dict__.update(self.__dict__)
361 return self.__class__(clone, self._annotations)
362
363 def __reduce__(self) -> Tuple[Type[Annotated], Tuple[Any, ...]]:
364 return self.__class__, (self.__element, self._annotations)
365
366 def __hash__(self) -> int:
367 return self._hash
368
369 def __eq__(self, other: Any) -> bool:
370 if self._is_column_operators:
371 return self.__element.__class__.__eq__(self, other)
372 else:
373 return hash(other) == hash(self)
374
375 @util.ro_non_memoized_property
376 def entity_namespace(self) -> _EntityNamespace:
377 if "entity_namespace" in self._annotations:
378 return cast(
379 SupportsWrappingAnnotations,
380 self._annotations["entity_namespace"],
381 ).entity_namespace
382 else:
383 return self.__element.entity_namespace
384
385
386# hard-generate Annotated subclasses. this technique
387# is used instead of on-the-fly types (i.e. type.__new__())
388# so that the resulting objects are pickleable; additionally, other
389# decisions can be made up front about the type of object being annotated
390# just once per class rather than per-instance.
391annotated_classes: Dict[Type[SupportsWrappingAnnotations], Type[Annotated]] = (
392 {}
393)
394
395_SA = TypeVar("_SA", bound="SupportsAnnotations")
396
397
398def _safe_annotate(to_annotate: _SA, annotations: _AnnotationDict) -> _SA:
399 try:
400 _annotate = to_annotate._annotate
401 except AttributeError:
402 # skip objects that don't actually have an `_annotate`
403 # attribute, namely QueryableAttribute inside of a join
404 # condition
405 return to_annotate
406 else:
407 return _annotate(annotations)
408
409
410def _deep_annotate(
411 element: _SA,
412 annotations: _AnnotationDict,
413 exclude: Optional[Sequence[SupportsAnnotations]] = None,
414 *,
415 detect_subquery_cols: bool = False,
416 ind_cols_on_fromclause: bool = False,
417 annotate_callable: Optional[
418 Callable[[SupportsAnnotations, _AnnotationDict], SupportsAnnotations]
419 ] = None,
420) -> _SA:
421 """Deep copy the given ClauseElement, annotating each element
422 with the given annotations dictionary.
423
424 Elements within the exclude collection will be cloned but not annotated.
425
426 """
427
428 # annotated objects hack the __hash__() method so if we want to
429 # uniquely process them we have to use id()
430
431 cloned_ids: Dict[int, SupportsAnnotations] = {}
432
433 def clone(elem: SupportsAnnotations, **kw: Any) -> SupportsAnnotations:
434 # ind_cols_on_fromclause means make sure an AnnotatedFromClause
435 # has its own .c collection independent of that which its proxying.
436 # this is used specifically by orm.LoaderCriteriaOption to break
437 # a reference cycle that it's otherwise prone to building,
438 # see test_relationship_criteria->
439 # test_loader_criteria_subquery_w_same_entity. logic here was
440 # changed for #8796 and made explicit; previously it occurred
441 # by accident
442
443 kw["detect_subquery_cols"] = detect_subquery_cols
444 id_ = id(elem)
445
446 if id_ in cloned_ids:
447 return cloned_ids[id_]
448
449 if (
450 exclude
451 and hasattr(elem, "proxy_set")
452 and elem.proxy_set.intersection(exclude)
453 ):
454 newelem = elem._clone(clone=clone, **kw)
455 elif annotations != elem._annotations:
456 if detect_subquery_cols and elem._is_immutable:
457 to_annotate = elem._clone(clone=clone, **kw)
458 else:
459 to_annotate = elem
460 if annotate_callable:
461 newelem = annotate_callable(to_annotate, annotations)
462 else:
463 newelem = _safe_annotate(to_annotate, annotations)
464 else:
465 newelem = elem
466
467 newelem._copy_internals(
468 clone=clone, ind_cols_on_fromclause=ind_cols_on_fromclause
469 )
470
471 cloned_ids[id_] = newelem
472 return newelem
473
474 if element is not None:
475 element = cast(_SA, clone(element))
476 clone = None # type: ignore # remove gc cycles
477 return element
478
479
480@overload
481def _deep_deannotate(
482 element: Literal[None], values: Optional[Sequence[str]] = None
483) -> Literal[None]: ...
484
485
486@overload
487def _deep_deannotate(
488 element: _SA, values: Optional[Sequence[str]] = None
489) -> _SA: ...
490
491
492def _deep_deannotate(
493 element: Optional[_SA], values: Optional[Sequence[str]] = None
494) -> Optional[_SA]:
495 """Deep copy the given element, removing annotations."""
496
497 cloned: Dict[Any, SupportsAnnotations] = {}
498
499 def clone(elem: SupportsAnnotations, **kw: Any) -> SupportsAnnotations:
500 key: Any
501 if values:
502 key = id(elem)
503 else:
504 key = elem
505
506 if key not in cloned:
507 newelem = elem._deannotate(values=values, clone=True)
508 newelem._copy_internals(clone=clone)
509 cloned[key] = newelem
510 return newelem
511 else:
512 return cloned[key]
513
514 if element is not None:
515 element = cast(_SA, clone(element))
516 clone = None # type: ignore # remove gc cycles
517 return element
518
519
520def _shallow_annotate(element: _SA, annotations: _AnnotationDict) -> _SA:
521 """Annotate the given ClauseElement and copy its internals so that
522 internal objects refer to the new annotated object.
523
524 Basically used to apply a "don't traverse" annotation to a
525 selectable, without digging throughout the whole
526 structure wasting time.
527 """
528 element = element._annotate(annotations)
529 element._copy_internals()
530 return element
531
532
533def _new_annotation_type(
534 cls: Type[SupportsWrappingAnnotations], base_cls: Type[Annotated]
535) -> Type[Annotated]:
536 """Generates a new class that subclasses Annotated and proxies a given
537 element type.
538
539 """
540 if issubclass(cls, Annotated):
541 return cls
542 elif cls in annotated_classes:
543 return annotated_classes[cls]
544
545 for super_ in cls.__mro__:
546 # check if an Annotated subclass more specific than
547 # the given base_cls is already registered, such
548 # as AnnotatedColumnElement.
549 if super_ in annotated_classes:
550 base_cls = annotated_classes[super_]
551 break
552
553 annotated_classes[cls] = anno_cls = cast(
554 Type[Annotated],
555 type("Annotated%s" % cls.__name__, (base_cls, cls), {}),
556 )
557 globals()["Annotated%s" % cls.__name__] = anno_cls
558
559 if "_traverse_internals" in cls.__dict__:
560 anno_cls._traverse_internals = list(cls._traverse_internals) + [
561 ("_annotations", InternalTraversal.dp_annotations_key)
562 ]
563 elif cls.__dict__.get("inherit_cache", False):
564 anno_cls._traverse_internals = list(cls._traverse_internals) + [
565 ("_annotations", InternalTraversal.dp_annotations_key)
566 ]
567
568 # some classes include this even if they have traverse_internals
569 # e.g. BindParameter, add it if present.
570 if cls.__dict__.get("inherit_cache", False):
571 anno_cls.inherit_cache = True # type: ignore
572 elif "inherit_cache" in cls.__dict__:
573 anno_cls.inherit_cache = cls.__dict__["inherit_cache"] # type: ignore
574
575 anno_cls._is_column_operators = issubclass(cls, operators.ColumnOperators)
576
577 return anno_cls
578
579
580def _prepare_annotations(
581 target_hierarchy: Type[SupportsWrappingAnnotations],
582 base_cls: Type[Annotated],
583) -> None:
584 for cls in util.walk_subclasses(target_hierarchy):
585 _new_annotation_type(cls, base_cls)