1# sql/annotation.py
2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8"""The :class:`.Annotated` class and related routines; creates hash-equivalent
9copies of SQL constructs which contain context-specific markers and
10associations.
11
12Note that the :class:`.Annotated` concept as implemented in this module is not
13related in any way to the pep-593 concept of "Annotated".
14
15
16"""
17
18from __future__ import annotations
19
20from operator import itemgetter
21import typing
22from typing import Any
23from typing import Callable
24from typing import cast
25from typing import Dict
26from typing import FrozenSet
27from typing import Mapping
28from typing import Optional
29from typing import overload
30from typing import Sequence
31from typing import Tuple
32from typing import Type
33from typing import TYPE_CHECKING
34from typing import TypeVar
35
36from . import operators
37from .cache_key import HasCacheKey
38from .visitors import anon_map
39from .visitors import ExternallyTraversible
40from .visitors import InternalTraversal
41from .. import util
42from ..util.typing import Literal
43from ..util.typing import Self
44
45if TYPE_CHECKING:
46 from .base import _EntityNamespace
47 from .visitors import _TraverseInternalsType
48
49_AnnotationDict = Mapping[str, Any]
50
51EMPTY_ANNOTATIONS: util.immutabledict[str, Any] = util.EMPTY_DICT
52
53
54class SupportsAnnotations(ExternallyTraversible):
55 __slots__ = ()
56
57 _annotations: util.immutabledict[str, Any] = EMPTY_ANNOTATIONS
58
59 proxy_set: util.generic_fn_descriptor[FrozenSet[Any]]
60
61 _is_immutable: bool
62
63 def _annotate(self, values: _AnnotationDict) -> Self:
64 raise NotImplementedError()
65
66 @overload
67 def _deannotate(
68 self,
69 values: Literal[None] = ...,
70 clone: bool = ...,
71 ) -> Self: ...
72
73 @overload
74 def _deannotate(
75 self,
76 values: Sequence[str] = ...,
77 clone: bool = ...,
78 ) -> SupportsAnnotations: ...
79
80 def _deannotate(
81 self,
82 values: Optional[Sequence[str]] = None,
83 clone: bool = False,
84 ) -> SupportsAnnotations:
85 raise NotImplementedError()
86
87 @util.memoized_property
88 def _annotations_cache_key(self) -> Tuple[Any, ...]:
89 anon_map_ = anon_map()
90
91 return self._gen_annotations_cache_key(anon_map_)
92
93 def _gen_annotations_cache_key(
94 self, anon_map: anon_map
95 ) -> Tuple[Any, ...]:
96 return (
97 "_annotations",
98 tuple(
99 (
100 key,
101 (
102 value._gen_cache_key(anon_map, [])
103 if isinstance(value, HasCacheKey)
104 else value
105 ),
106 )
107 for key, value in sorted(
108 self._annotations.items(), key=_get_item0
109 )
110 ),
111 )
112
113
114_get_item0 = itemgetter(0)
115
116
117class SupportsWrappingAnnotations(SupportsAnnotations):
118 __slots__ = ()
119
120 _constructor: Callable[..., SupportsWrappingAnnotations]
121
122 if TYPE_CHECKING:
123
124 @util.ro_non_memoized_property
125 def entity_namespace(self) -> _EntityNamespace: ...
126
127 def _annotate(self, values: _AnnotationDict) -> Self:
128 """return a copy of this ClauseElement with annotations
129 updated by the given dictionary.
130
131 """
132 return Annotated._as_annotated_instance(self, values) # type: ignore
133
134 def _with_annotations(self, values: _AnnotationDict) -> Self:
135 """return a copy of this ClauseElement with annotations
136 replaced by the given dictionary.
137
138 """
139 return Annotated._as_annotated_instance(self, values) # type: ignore
140
141 @overload
142 def _deannotate(
143 self,
144 values: Literal[None] = ...,
145 clone: bool = ...,
146 ) -> Self: ...
147
148 @overload
149 def _deannotate(
150 self,
151 values: Sequence[str] = ...,
152 clone: bool = ...,
153 ) -> SupportsAnnotations: ...
154
155 def _deannotate(
156 self,
157 values: Optional[Sequence[str]] = None,
158 clone: bool = False,
159 ) -> SupportsAnnotations:
160 """return a copy of this :class:`_expression.ClauseElement`
161 with annotations
162 removed.
163
164 :param values: optional tuple of individual values
165 to remove.
166
167 """
168 if clone:
169 s = self._clone()
170 return s
171 else:
172 return self
173
174
175class SupportsCloneAnnotations(SupportsWrappingAnnotations):
176 # SupportsCloneAnnotations extends from SupportsWrappingAnnotations
177 # to support the structure of having the base ClauseElement
178 # be a subclass of SupportsWrappingAnnotations. Any ClauseElement
179 # subclass that wants to extend from SupportsCloneAnnotations
180 # will inherently also be subclassing SupportsWrappingAnnotations, so
181 # make that specific here.
182
183 if not typing.TYPE_CHECKING:
184 __slots__ = ()
185
186 _clone_annotations_traverse_internals: _TraverseInternalsType = [
187 ("_annotations", InternalTraversal.dp_annotations_key)
188 ]
189
190 def _annotate(self, values: _AnnotationDict) -> Self:
191 """return a copy of this ClauseElement with annotations
192 updated by the given dictionary.
193
194 """
195 new = self._clone()
196 new._annotations = new._annotations.union(values)
197 new.__dict__.pop("_annotations_cache_key", None)
198 new.__dict__.pop("_generate_cache_key", None)
199 return new
200
201 def _with_annotations(self, values: _AnnotationDict) -> Self:
202 """return a copy of this ClauseElement with annotations
203 replaced by the given dictionary.
204
205 """
206 new = self._clone()
207 new._annotations = util.immutabledict(values)
208 new.__dict__.pop("_annotations_cache_key", None)
209 new.__dict__.pop("_generate_cache_key", None)
210 return new
211
212 @overload
213 def _deannotate(
214 self,
215 values: Literal[None] = ...,
216 clone: bool = ...,
217 ) -> Self: ...
218
219 @overload
220 def _deannotate(
221 self,
222 values: Sequence[str] = ...,
223 clone: bool = ...,
224 ) -> SupportsAnnotations: ...
225
226 def _deannotate(
227 self,
228 values: Optional[Sequence[str]] = None,
229 clone: bool = False,
230 ) -> SupportsAnnotations:
231 """return a copy of this :class:`_expression.ClauseElement`
232 with annotations
233 removed.
234
235 :param values: optional tuple of individual values
236 to remove.
237
238 """
239 if clone or self._annotations:
240 # clone is used when we are also copying
241 # the expression for a deep deannotation
242 new = self._clone()
243 new._annotations = util.immutabledict()
244 new.__dict__.pop("_annotations_cache_key", None)
245 return new
246 else:
247 return self
248
249
250class Annotated(SupportsAnnotations):
251 """clones a SupportsAnnotations and applies an 'annotations' dictionary.
252
253 Unlike regular clones, this clone also mimics __hash__() and
254 __eq__() of the original element so that it takes its place
255 in hashed collections.
256
257 A reference to the original element is maintained, for the important
258 reason of keeping its hash value current. When GC'ed, the
259 hash value may be reused, causing conflicts.
260
261 .. note:: The rationale for Annotated producing a brand new class,
262 rather than placing the functionality directly within ClauseElement,
263 is **performance**. The __hash__() method is absent on plain
264 ClauseElement which leads to significantly reduced function call
265 overhead, as the use of sets and dictionaries against ClauseElement
266 objects is prevalent, but most are not "annotated".
267
268 """
269
270 _is_column_operators = False
271
272 @classmethod
273 def _as_annotated_instance(
274 cls, element: SupportsWrappingAnnotations, values: _AnnotationDict
275 ) -> Annotated:
276 try:
277 cls = annotated_classes[element.__class__]
278 except KeyError:
279 cls = _new_annotation_type(element.__class__, cls)
280 return cls(element, values)
281
282 _annotations: util.immutabledict[str, Any]
283 __element: SupportsWrappingAnnotations
284 _hash: int
285
286 def __new__(cls: Type[Self], *args: Any) -> Self:
287 return object.__new__(cls)
288
289 def __init__(
290 self, element: SupportsWrappingAnnotations, values: _AnnotationDict
291 ):
292 self.__dict__ = element.__dict__.copy()
293 self.__dict__.pop("_annotations_cache_key", None)
294 self.__dict__.pop("_generate_cache_key", None)
295 self.__element = element
296 self._annotations = util.immutabledict(values)
297 self._hash = hash(element)
298
299 def _annotate(self, values: _AnnotationDict) -> Self:
300 _values = self._annotations.union(values)
301 new = self._with_annotations(_values)
302 return new
303
304 def _with_annotations(self, values: _AnnotationDict) -> Self:
305 clone = self.__class__.__new__(self.__class__)
306 clone.__dict__ = self.__dict__.copy()
307 clone.__dict__.pop("_annotations_cache_key", None)
308 clone.__dict__.pop("_generate_cache_key", None)
309 clone._annotations = util.immutabledict(values)
310 return clone
311
312 @overload
313 def _deannotate(
314 self,
315 values: Literal[None] = ...,
316 clone: bool = ...,
317 ) -> Self: ...
318
319 @overload
320 def _deannotate(
321 self,
322 values: Sequence[str] = ...,
323 clone: bool = ...,
324 ) -> Annotated: ...
325
326 def _deannotate(
327 self,
328 values: Optional[Sequence[str]] = None,
329 clone: bool = True,
330 ) -> SupportsAnnotations:
331 if values is None:
332 return self.__element
333 else:
334 return self._with_annotations(
335 util.immutabledict(
336 {
337 key: value
338 for key, value in self._annotations.items()
339 if key not in values
340 }
341 )
342 )
343
344 if not typing.TYPE_CHECKING:
345 # manually proxy some methods that need extra attention
346 def _compiler_dispatch(self, visitor: Any, **kw: Any) -> Any:
347 return self.__element.__class__._compiler_dispatch(
348 self, visitor, **kw
349 )
350
351 @property
352 def _constructor(self):
353 return self.__element._constructor
354
355 def _clone(self, **kw: Any) -> Self:
356 clone = self.__element._clone(**kw)
357 if clone is self.__element:
358 # detect immutable, don't change anything
359 return self
360 else:
361 # update the clone with any changes that have occurred
362 # to this object's __dict__.
363 clone.__dict__.update(self.__dict__)
364 return self.__class__(clone, self._annotations)
365
366 def __reduce__(self) -> Tuple[Type[Annotated], Tuple[Any, ...]]:
367 return self.__class__, (self.__element, self._annotations)
368
369 def __hash__(self) -> int:
370 return self._hash
371
372 def __eq__(self, other: Any) -> bool:
373 if self._is_column_operators:
374 return self.__element.__class__.__eq__(self, other)
375 else:
376 return hash(other) == hash(self)
377
378 @util.ro_non_memoized_property
379 def entity_namespace(self) -> _EntityNamespace:
380 if "entity_namespace" in self._annotations:
381 return cast(
382 SupportsWrappingAnnotations,
383 self._annotations["entity_namespace"],
384 ).entity_namespace
385 else:
386 return self.__element.entity_namespace
387
388
389# hard-generate Annotated subclasses. this technique
390# is used instead of on-the-fly types (i.e. type.__new__())
391# so that the resulting objects are pickleable; additionally, other
392# decisions can be made up front about the type of object being annotated
393# just once per class rather than per-instance.
394annotated_classes: Dict[Type[SupportsWrappingAnnotations], Type[Annotated]] = (
395 {}
396)
397
398_SA = TypeVar("_SA", bound="SupportsAnnotations")
399
400
401def _safe_annotate(to_annotate: _SA, annotations: _AnnotationDict) -> _SA:
402 try:
403 _annotate = to_annotate._annotate
404 except AttributeError:
405 # skip objects that don't actually have an `_annotate`
406 # attribute, namely QueryableAttribute inside of a join
407 # condition
408 return to_annotate
409 else:
410 return _annotate(annotations)
411
412
413def _deep_annotate(
414 element: _SA,
415 annotations: _AnnotationDict,
416 exclude: Optional[Sequence[SupportsAnnotations]] = None,
417 *,
418 detect_subquery_cols: bool = False,
419 ind_cols_on_fromclause: bool = False,
420 annotate_callable: Optional[
421 Callable[[SupportsAnnotations, _AnnotationDict], SupportsAnnotations]
422 ] = None,
423) -> _SA:
424 """Deep copy the given ClauseElement, annotating each element
425 with the given annotations dictionary.
426
427 Elements within the exclude collection will be cloned but not annotated.
428
429 """
430
431 # annotated objects hack the __hash__() method so if we want to
432 # uniquely process them we have to use id()
433
434 cloned_ids: Dict[int, SupportsAnnotations] = {}
435
436 def clone(elem: SupportsAnnotations, **kw: Any) -> SupportsAnnotations:
437 # ind_cols_on_fromclause means make sure an AnnotatedFromClause
438 # has its own .c collection independent of that which its proxying.
439 # this is used specifically by orm.LoaderCriteriaOption to break
440 # a reference cycle that it's otherwise prone to building,
441 # see test_relationship_criteria->
442 # test_loader_criteria_subquery_w_same_entity. logic here was
443 # changed for #8796 and made explicit; previously it occurred
444 # by accident
445
446 kw["detect_subquery_cols"] = detect_subquery_cols
447 id_ = id(elem)
448
449 if id_ in cloned_ids:
450 return cloned_ids[id_]
451
452 if (
453 exclude
454 and hasattr(elem, "proxy_set")
455 and elem.proxy_set.intersection(exclude)
456 ):
457 newelem = elem._clone(clone=clone, **kw)
458 elif annotations != elem._annotations:
459 if detect_subquery_cols and elem._is_immutable:
460 to_annotate = elem._clone(clone=clone, **kw)
461 else:
462 to_annotate = elem
463 if annotate_callable:
464 newelem = annotate_callable(to_annotate, annotations)
465 else:
466 newelem = _safe_annotate(to_annotate, annotations)
467 else:
468 newelem = elem
469
470 newelem._copy_internals(
471 clone=clone, ind_cols_on_fromclause=ind_cols_on_fromclause
472 )
473
474 cloned_ids[id_] = newelem
475 return newelem
476
477 if element is not None:
478 element = cast(_SA, clone(element))
479 clone = None # type: ignore # remove gc cycles
480 return element
481
482
483@overload
484def _deep_deannotate(
485 element: Literal[None], values: Optional[Sequence[str]] = None
486) -> Literal[None]: ...
487
488
489@overload
490def _deep_deannotate(
491 element: _SA, values: Optional[Sequence[str]] = None
492) -> _SA: ...
493
494
495def _deep_deannotate(
496 element: Optional[_SA], values: Optional[Sequence[str]] = None
497) -> Optional[_SA]:
498 """Deep copy the given element, removing annotations."""
499
500 cloned: Dict[Any, SupportsAnnotations] = {}
501
502 def clone(elem: SupportsAnnotations, **kw: Any) -> SupportsAnnotations:
503 key: Any
504 if values:
505 key = id(elem)
506 else:
507 key = elem
508
509 if key not in cloned:
510 newelem = elem._deannotate(values=values, clone=True)
511 newelem._copy_internals(clone=clone)
512 cloned[key] = newelem
513 return newelem
514 else:
515 return cloned[key]
516
517 if element is not None:
518 element = cast(_SA, clone(element))
519 clone = None # type: ignore # remove gc cycles
520 return element
521
522
523def _shallow_annotate(element: _SA, annotations: _AnnotationDict) -> _SA:
524 """Annotate the given ClauseElement and copy its internals so that
525 internal objects refer to the new annotated object.
526
527 Basically used to apply a "don't traverse" annotation to a
528 selectable, without digging throughout the whole
529 structure wasting time.
530 """
531 element = element._annotate(annotations)
532 element._copy_internals()
533 return element
534
535
536def _new_annotation_type(
537 cls: Type[SupportsWrappingAnnotations], base_cls: Type[Annotated]
538) -> Type[Annotated]:
539 """Generates a new class that subclasses Annotated and proxies a given
540 element type.
541
542 """
543 if issubclass(cls, Annotated):
544 return cls
545 elif cls in annotated_classes:
546 return annotated_classes[cls]
547
548 for super_ in cls.__mro__:
549 # check if an Annotated subclass more specific than
550 # the given base_cls is already registered, such
551 # as AnnotatedColumnElement.
552 if super_ in annotated_classes:
553 base_cls = annotated_classes[super_]
554 break
555
556 annotated_classes[cls] = anno_cls = cast(
557 Type[Annotated],
558 type("Annotated%s" % cls.__name__, (base_cls, cls), {}),
559 )
560 globals()["Annotated%s" % cls.__name__] = anno_cls
561
562 if "_traverse_internals" in cls.__dict__:
563 anno_cls._traverse_internals = list(cls._traverse_internals) + [
564 ("_annotations", InternalTraversal.dp_annotations_key)
565 ]
566 elif cls.__dict__.get("inherit_cache", False):
567 anno_cls._traverse_internals = list(cls._traverse_internals) + [
568 ("_annotations", InternalTraversal.dp_annotations_key)
569 ]
570
571 # some classes include this even if they have traverse_internals
572 # e.g. BindParameter, add it if present.
573 if cls.__dict__.get("inherit_cache", False):
574 anno_cls.inherit_cache = True # type: ignore
575 elif "inherit_cache" in cls.__dict__:
576 anno_cls.inherit_cache = cls.__dict__["inherit_cache"] # type: ignore
577
578 anno_cls._is_column_operators = issubclass(cls, operators.ColumnOperators)
579
580 return anno_cls
581
582
583def _prepare_annotations(
584 target_hierarchy: Type[SupportsWrappingAnnotations],
585 base_cls: Type[Annotated],
586) -> None:
587 for cls in util.walk_subclasses(target_hierarchy):
588 _new_annotation_type(cls, base_cls)