1# sql/annotation.py 
    2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 
    3# <see AUTHORS file> 
    4# 
    5# This module is part of SQLAlchemy and is released under 
    6# the MIT License: https://www.opensource.org/licenses/mit-license.php 
    7 
    8"""The :class:`.Annotated` class and related routines; creates hash-equivalent 
    9copies of SQL constructs which contain context-specific markers and 
    10associations. 
    11 
    12Note that the :class:`.Annotated` concept as implemented in this module is not 
    13related in any way to the pep-593 concept of "Annotated". 
    14 
    15 
    16""" 
    17 
    18from __future__ import annotations 
    19 
    20from operator import itemgetter 
    21import typing 
    22from typing import Any 
    23from typing import Callable 
    24from typing import cast 
    25from typing import Dict 
    26from typing import FrozenSet 
    27from typing import Literal 
    28from typing import Mapping 
    29from typing import Optional 
    30from typing import overload 
    31from typing import Sequence 
    32from typing import Tuple 
    33from typing import Type 
    34from typing import TYPE_CHECKING 
    35from typing import TypeVar 
    36 
    37from . import operators 
    38from .cache_key import HasCacheKey 
    39from .visitors import anon_map 
    40from .visitors import ExternallyTraversible 
    41from .visitors import InternalTraversal 
    42from .. import util 
    43from ..util.typing import Self 
    44 
    45if TYPE_CHECKING: 
    46    from .base import _EntityNamespace 
    47    from .visitors import _TraverseInternalsType 
    48 
    49_AnnotationDict = Mapping[str, Any] 
    50 
    51EMPTY_ANNOTATIONS: util.immutabledict[str, Any] = util.EMPTY_DICT 
    52 
    53 
    54class SupportsAnnotations(ExternallyTraversible): 
    55    __slots__ = () 
    56 
    57    _annotations: util.immutabledict[str, Any] = EMPTY_ANNOTATIONS 
    58 
    59    proxy_set: util.generic_fn_descriptor[FrozenSet[Any]] 
    60 
    61    _is_immutable: bool 
    62 
    63    def _annotate(self, values: _AnnotationDict) -> Self: 
    64        raise NotImplementedError() 
    65 
    66    @overload 
    67    def _deannotate( 
    68        self, 
    69        values: Literal[None] = ..., 
    70        clone: bool = ..., 
    71    ) -> Self: ... 
    72 
    73    @overload 
    74    def _deannotate( 
    75        self, 
    76        values: Sequence[str] = ..., 
    77        clone: bool = ..., 
    78    ) -> SupportsAnnotations: ... 
    79 
    80    def _deannotate( 
    81        self, 
    82        values: Optional[Sequence[str]] = None, 
    83        clone: bool = False, 
    84    ) -> SupportsAnnotations: 
    85        raise NotImplementedError() 
    86 
    87    @util.memoized_property 
    88    def _annotations_cache_key(self) -> Tuple[Any, ...]: 
    89        anon_map_ = anon_map() 
    90 
    91        return self._gen_annotations_cache_key(anon_map_) 
    92 
    93    def _gen_annotations_cache_key( 
    94        self, anon_map: anon_map 
    95    ) -> Tuple[Any, ...]: 
    96        return ( 
    97            "_annotations", 
    98            tuple( 
    99                ( 
    100                    key, 
    101                    ( 
    102                        value._gen_cache_key(anon_map, []) 
    103                        if isinstance(value, HasCacheKey) 
    104                        else value 
    105                    ), 
    106                ) 
    107                for key, value in sorted( 
    108                    self._annotations.items(), key=_get_item0 
    109                ) 
    110            ), 
    111        ) 
    112 
    113 
    114_get_item0 = itemgetter(0) 
    115 
    116 
    117class SupportsWrappingAnnotations(SupportsAnnotations): 
    118    __slots__ = () 
    119 
    120    _constructor: Callable[..., SupportsWrappingAnnotations] 
    121 
    122    if TYPE_CHECKING: 
    123 
    124        @util.ro_non_memoized_property 
    125        def entity_namespace(self) -> _EntityNamespace: ... 
    126 
    127    def _annotate(self, values: _AnnotationDict) -> Self: 
    128        """return a copy of this ClauseElement with annotations 
    129        updated by the given dictionary. 
    130 
    131        """ 
    132        return Annotated._as_annotated_instance(self, values)  # type: ignore 
    133 
    134    def _with_annotations(self, values: _AnnotationDict) -> Self: 
    135        """return a copy of this ClauseElement with annotations 
    136        replaced by the given dictionary. 
    137 
    138        """ 
    139        return Annotated._as_annotated_instance(self, values)  # type: ignore 
    140 
    141    @overload 
    142    def _deannotate( 
    143        self, 
    144        values: Literal[None] = ..., 
    145        clone: bool = ..., 
    146    ) -> Self: ... 
    147 
    148    @overload 
    149    def _deannotate( 
    150        self, 
    151        values: Sequence[str] = ..., 
    152        clone: bool = ..., 
    153    ) -> SupportsAnnotations: ... 
    154 
    155    def _deannotate( 
    156        self, 
    157        values: Optional[Sequence[str]] = None, 
    158        clone: bool = False, 
    159    ) -> SupportsAnnotations: 
    160        """return a copy of this :class:`_expression.ClauseElement` 
    161        with annotations 
    162        removed. 
    163 
    164        :param values: optional tuple of individual values 
    165         to remove. 
    166 
    167        """ 
    168        if clone: 
    169            s = self._clone() 
    170            return s 
    171        else: 
    172            return self 
    173 
    174 
    175class SupportsCloneAnnotations(SupportsWrappingAnnotations): 
    176    # SupportsCloneAnnotations extends from SupportsWrappingAnnotations 
    177    # to support the structure of having the base ClauseElement 
    178    # be a subclass of SupportsWrappingAnnotations.  Any ClauseElement 
    179    # subclass that wants to extend from SupportsCloneAnnotations 
    180    # will inherently also be subclassing SupportsWrappingAnnotations, so 
    181    # make that specific here. 
    182 
    183    if not typing.TYPE_CHECKING: 
    184        __slots__ = () 
    185 
    186    _clone_annotations_traverse_internals: _TraverseInternalsType = [ 
    187        ("_annotations", InternalTraversal.dp_annotations_key) 
    188    ] 
    189 
    190    def _annotate(self, values: _AnnotationDict) -> Self: 
    191        """return a copy of this ClauseElement with annotations 
    192        updated by the given dictionary. 
    193 
    194        """ 
    195        new = self._clone() 
    196        new._annotations = new._annotations.union(values) 
    197        new.__dict__.pop("_annotations_cache_key", None) 
    198        new.__dict__.pop("_generate_cache_key", None) 
    199        return new 
    200 
    201    def _with_annotations(self, values: _AnnotationDict) -> Self: 
    202        """return a copy of this ClauseElement with annotations 
    203        replaced by the given dictionary. 
    204 
    205        """ 
    206        new = self._clone() 
    207        new._annotations = util.immutabledict(values) 
    208        new.__dict__.pop("_annotations_cache_key", None) 
    209        new.__dict__.pop("_generate_cache_key", None) 
    210        return new 
    211 
    212    @overload 
    213    def _deannotate( 
    214        self, 
    215        values: Literal[None] = ..., 
    216        clone: bool = ..., 
    217    ) -> Self: ... 
    218 
    219    @overload 
    220    def _deannotate( 
    221        self, 
    222        values: Sequence[str] = ..., 
    223        clone: bool = ..., 
    224    ) -> SupportsAnnotations: ... 
    225 
    226    def _deannotate( 
    227        self, 
    228        values: Optional[Sequence[str]] = None, 
    229        clone: bool = False, 
    230    ) -> SupportsAnnotations: 
    231        """return a copy of this :class:`_expression.ClauseElement` 
    232        with annotations 
    233        removed. 
    234 
    235        :param values: optional tuple of individual values 
    236         to remove. 
    237 
    238        """ 
    239        if clone or self._annotations: 
    240            # clone is used when we are also copying 
    241            # the expression for a deep deannotation 
    242            new = self._clone() 
    243            new._annotations = util.immutabledict() 
    244            new.__dict__.pop("_annotations_cache_key", None) 
    245            return new 
    246        else: 
    247            return self 
    248 
    249 
    250class Annotated(SupportsAnnotations): 
    251    """clones a SupportsAnnotations and applies an 'annotations' dictionary. 
    252 
    253    Unlike regular clones, this clone also mimics __hash__() and 
    254    __eq__() of the original element so that it takes its place 
    255    in hashed collections. 
    256 
    257    A reference to the original element is maintained, for the important 
    258    reason of keeping its hash value current.  When GC'ed, the 
    259    hash value may be reused, causing conflicts. 
    260 
    261    .. note::  The rationale for Annotated producing a brand new class, 
    262       rather than placing the functionality directly within ClauseElement, 
    263       is **performance**.  The __hash__() method is absent on plain 
    264       ClauseElement which leads to significantly reduced function call 
    265       overhead, as the use of sets and dictionaries against ClauseElement 
    266       objects is prevalent, but most are not "annotated". 
    267 
    268    """ 
    269 
    270    _is_column_operators = False 
    271 
    272    @classmethod 
    273    def _as_annotated_instance( 
    274        cls, element: SupportsWrappingAnnotations, values: _AnnotationDict 
    275    ) -> Annotated: 
    276        try: 
    277            cls = annotated_classes[element.__class__] 
    278        except KeyError: 
    279            cls = _new_annotation_type(element.__class__, cls) 
    280        return cls(element, values) 
    281 
    282    _annotations: util.immutabledict[str, Any] 
    283    __element: SupportsWrappingAnnotations 
    284    _hash: int 
    285 
    286    def __new__(cls: Type[Self], *args: Any) -> Self: 
    287        return object.__new__(cls) 
    288 
    289    def __init__( 
    290        self, element: SupportsWrappingAnnotations, values: _AnnotationDict 
    291    ): 
    292        self.__dict__ = element.__dict__.copy() 
    293        self.__dict__.pop("_annotations_cache_key", None) 
    294        self.__dict__.pop("_generate_cache_key", None) 
    295        self.__element = element 
    296        self._annotations = util.immutabledict(values) 
    297        self._hash = hash(element) 
    298 
    299    def _annotate(self, values: _AnnotationDict) -> Self: 
    300        _values = self._annotations.union(values) 
    301        new = self._with_annotations(_values) 
    302        return new 
    303 
    304    def _with_annotations(self, values: _AnnotationDict) -> Self: 
    305        clone = self.__class__.__new__(self.__class__) 
    306        clone.__dict__ = self.__dict__.copy() 
    307        clone.__dict__.pop("_annotations_cache_key", None) 
    308        clone.__dict__.pop("_generate_cache_key", None) 
    309        clone._annotations = util.immutabledict(values) 
    310        return clone 
    311 
    312    @overload 
    313    def _deannotate( 
    314        self, 
    315        values: Literal[None] = ..., 
    316        clone: bool = ..., 
    317    ) -> Self: ... 
    318 
    319    @overload 
    320    def _deannotate( 
    321        self, 
    322        values: Sequence[str] = ..., 
    323        clone: bool = ..., 
    324    ) -> Annotated: ... 
    325 
    326    def _deannotate( 
    327        self, 
    328        values: Optional[Sequence[str]] = None, 
    329        clone: bool = True, 
    330    ) -> SupportsAnnotations: 
    331        if values is None: 
    332            return self.__element 
    333        else: 
    334            return self._with_annotations( 
    335                util.immutabledict( 
    336                    { 
    337                        key: value 
    338                        for key, value in self._annotations.items() 
    339                        if key not in values 
    340                    } 
    341                ) 
    342            ) 
    343 
    344    if not typing.TYPE_CHECKING: 
    345        # manually proxy some methods that need extra attention 
    346        def _compiler_dispatch(self, visitor: Any, **kw: Any) -> Any: 
    347            return self.__element.__class__._compiler_dispatch( 
    348                self, visitor, **kw 
    349            ) 
    350 
    351        @property 
    352        def _constructor(self): 
    353            return self.__element._constructor 
    354 
    355    def _clone(self, **kw: Any) -> Self: 
    356        clone = self.__element._clone(**kw) 
    357        if clone is self.__element: 
    358            # detect immutable, don't change anything 
    359            return self 
    360        else: 
    361            # update the clone with any changes that have occurred 
    362            # to this object's __dict__. 
    363            clone.__dict__.update(self.__dict__) 
    364            return self.__class__(clone, self._annotations) 
    365 
    366    def __reduce__(self) -> Tuple[Type[Annotated], Tuple[Any, ...]]: 
    367        return self.__class__, (self.__element, self._annotations) 
    368 
    369    def __hash__(self) -> int: 
    370        return self._hash 
    371 
    372    def __eq__(self, other: Any) -> bool: 
    373        if self._is_column_operators: 
    374            return self.__element.__class__.__eq__(self, other) 
    375        else: 
    376            return hash(other) == hash(self) 
    377 
    378    @util.ro_non_memoized_property 
    379    def entity_namespace(self) -> _EntityNamespace: 
    380        if "entity_namespace" in self._annotations: 
    381            return cast( 
    382                SupportsWrappingAnnotations, 
    383                self._annotations["entity_namespace"], 
    384            ).entity_namespace 
    385        else: 
    386            return self.__element.entity_namespace 
    387 
    388 
    389# hard-generate Annotated subclasses.  this technique 
    390# is used instead of on-the-fly types (i.e. type.__new__()) 
    391# so that the resulting objects are pickleable; additionally, other 
    392# decisions can be made up front about the type of object being annotated 
    393# just once per class rather than per-instance. 
    394annotated_classes: Dict[Type[SupportsWrappingAnnotations], Type[Annotated]] = ( 
    395    {} 
    396) 
    397 
    398_SA = TypeVar("_SA", bound="SupportsAnnotations") 
    399 
    400 
    401def _safe_annotate(to_annotate: _SA, annotations: _AnnotationDict) -> _SA: 
    402    try: 
    403        _annotate = to_annotate._annotate 
    404    except AttributeError: 
    405        # skip objects that don't actually have an `_annotate` 
    406        # attribute, namely QueryableAttribute inside of a join 
    407        # condition 
    408        return to_annotate 
    409    else: 
    410        return _annotate(annotations) 
    411 
    412 
    413def _deep_annotate( 
    414    element: _SA, 
    415    annotations: _AnnotationDict, 
    416    exclude: Optional[Sequence[SupportsAnnotations]] = None, 
    417    *, 
    418    detect_subquery_cols: bool = False, 
    419    ind_cols_on_fromclause: bool = False, 
    420    annotate_callable: Optional[ 
    421        Callable[[SupportsAnnotations, _AnnotationDict], SupportsAnnotations] 
    422    ] = None, 
    423) -> _SA: 
    424    """Deep copy the given ClauseElement, annotating each element 
    425    with the given annotations dictionary. 
    426 
    427    Elements within the exclude collection will be cloned but not annotated. 
    428 
    429    """ 
    430 
    431    # annotated objects hack the __hash__() method so if we want to 
    432    # uniquely process them we have to use id() 
    433 
    434    cloned_ids: Dict[int, SupportsAnnotations] = {} 
    435 
    436    def clone(elem: SupportsAnnotations, **kw: Any) -> SupportsAnnotations: 
    437        # ind_cols_on_fromclause means make sure an AnnotatedFromClause 
    438        # has its own .c collection independent of that which its proxying. 
    439        # this is used specifically by orm.LoaderCriteriaOption to break 
    440        # a reference cycle that it's otherwise prone to building, 
    441        # see test_relationship_criteria-> 
    442        # test_loader_criteria_subquery_w_same_entity.  logic here was 
    443        # changed for #8796 and made explicit; previously it occurred 
    444        # by accident 
    445 
    446        kw["detect_subquery_cols"] = detect_subquery_cols 
    447        id_ = id(elem) 
    448 
    449        if id_ in cloned_ids: 
    450            return cloned_ids[id_] 
    451 
    452        if ( 
    453            exclude 
    454            and hasattr(elem, "proxy_set") 
    455            and elem.proxy_set.intersection(exclude) 
    456        ): 
    457            newelem = elem._clone(clone=clone, **kw) 
    458        elif annotations != elem._annotations: 
    459            if detect_subquery_cols and elem._is_immutable: 
    460                to_annotate = elem._clone(clone=clone, **kw) 
    461            else: 
    462                to_annotate = elem 
    463            if annotate_callable: 
    464                newelem = annotate_callable(to_annotate, annotations) 
    465            else: 
    466                newelem = _safe_annotate(to_annotate, annotations) 
    467        else: 
    468            newelem = elem 
    469 
    470        newelem._copy_internals( 
    471            clone=clone, 
    472            ind_cols_on_fromclause=ind_cols_on_fromclause, 
    473            _annotations_traversal=True, 
    474        ) 
    475 
    476        cloned_ids[id_] = newelem 
    477        return newelem 
    478 
    479    if element is not None: 
    480        element = cast(_SA, clone(element)) 
    481    clone = None  # type: ignore  # remove gc cycles 
    482    return element 
    483 
    484 
    485@overload 
    486def _deep_deannotate( 
    487    element: Literal[None], values: Optional[Sequence[str]] = None 
    488) -> Literal[None]: ... 
    489 
    490 
    491@overload 
    492def _deep_deannotate( 
    493    element: _SA, values: Optional[Sequence[str]] = None 
    494) -> _SA: ... 
    495 
    496 
    497def _deep_deannotate( 
    498    element: Optional[_SA], values: Optional[Sequence[str]] = None 
    499) -> Optional[_SA]: 
    500    """Deep copy the given element, removing annotations.""" 
    501 
    502    cloned: Dict[Any, SupportsAnnotations] = {} 
    503 
    504    def clone(elem: SupportsAnnotations, **kw: Any) -> SupportsAnnotations: 
    505        key: Any 
    506        if values: 
    507            key = id(elem) 
    508        else: 
    509            key = elem 
    510 
    511        if key not in cloned: 
    512            newelem = elem._deannotate(values=values, clone=True) 
    513            newelem._copy_internals(clone=clone, _annotations_traversal=True) 
    514            cloned[key] = newelem 
    515            return newelem 
    516        else: 
    517            return cloned[key] 
    518 
    519    if element is not None: 
    520        element = cast(_SA, clone(element)) 
    521    clone = None  # type: ignore  # remove gc cycles 
    522    return element 
    523 
    524 
    525def _shallow_annotate(element: _SA, annotations: _AnnotationDict) -> _SA: 
    526    """Annotate the given ClauseElement and copy its internals so that 
    527    internal objects refer to the new annotated object. 
    528 
    529    Basically used to apply a "don't traverse" annotation to a 
    530    selectable, without digging throughout the whole 
    531    structure wasting time. 
    532    """ 
    533    element = element._annotate(annotations) 
    534    element._copy_internals(_annotations_traversal=True) 
    535    return element 
    536 
    537 
    538def _new_annotation_type( 
    539    cls: Type[SupportsWrappingAnnotations], base_cls: Type[Annotated] 
    540) -> Type[Annotated]: 
    541    """Generates a new class that subclasses Annotated and proxies a given 
    542    element type. 
    543 
    544    """ 
    545    if issubclass(cls, Annotated): 
    546        return cls 
    547    elif cls in annotated_classes: 
    548        return annotated_classes[cls] 
    549 
    550    for super_ in cls.__mro__: 
    551        # check if an Annotated subclass more specific than 
    552        # the given base_cls is already registered, such 
    553        # as AnnotatedColumnElement. 
    554        if super_ in annotated_classes: 
    555            base_cls = annotated_classes[super_] 
    556            break 
    557 
    558    annotated_classes[cls] = anno_cls = cast( 
    559        Type[Annotated], 
    560        type("Annotated%s" % cls.__name__, (base_cls, cls), {}), 
    561    ) 
    562    globals()["Annotated%s" % cls.__name__] = anno_cls 
    563 
    564    if "_traverse_internals" in cls.__dict__: 
    565        anno_cls._traverse_internals = list(cls._traverse_internals) + [ 
    566            ("_annotations", InternalTraversal.dp_annotations_key) 
    567        ] 
    568    elif cls.__dict__.get("inherit_cache", False): 
    569        anno_cls._traverse_internals = list(cls._traverse_internals) + [ 
    570            ("_annotations", InternalTraversal.dp_annotations_key) 
    571        ] 
    572 
    573    # some classes include this even if they have traverse_internals 
    574    # e.g. BindParameter, add it if present. 
    575    if cls.__dict__.get("inherit_cache", False): 
    576        anno_cls.inherit_cache = True  # type: ignore 
    577    elif "inherit_cache" in cls.__dict__: 
    578        anno_cls.inherit_cache = cls.__dict__["inherit_cache"]  # type: ignore 
    579 
    580    anno_cls._is_column_operators = issubclass(cls, operators.ColumnOperators) 
    581 
    582    return anno_cls 
    583 
    584 
    585def _prepare_annotations( 
    586    target_hierarchy: Type[SupportsWrappingAnnotations], 
    587    base_cls: Type[Annotated], 
    588) -> None: 
    589    for cls in util.walk_subclasses(target_hierarchy): 
    590        _new_annotation_type(cls, base_cls)