1# orm/mapped_collection.py 
    2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 
    3# <see AUTHORS file> 
    4# 
    5# This module is part of SQLAlchemy and is released under 
    6# the MIT License: https://www.opensource.org/licenses/mit-license.php 
    7 
    8from __future__ import annotations 
    9 
    10import operator 
    11from typing import Any 
    12from typing import Callable 
    13from typing import Dict 
    14from typing import Generic 
    15from typing import List 
    16from typing import Literal 
    17from typing import Optional 
    18from typing import Sequence 
    19from typing import Tuple 
    20from typing import Type 
    21from typing import TYPE_CHECKING 
    22from typing import TypeVar 
    23from typing import Union 
    24 
    25from . import base 
    26from .collections import collection 
    27from .collections import collection_adapter 
    28from .. import exc as sa_exc 
    29from .. import util 
    30from ..sql import coercions 
    31from ..sql import expression 
    32from ..sql import roles 
    33from ..util.langhelpers import Missing 
    34from ..util.langhelpers import MissingOr 
    35 
    36if TYPE_CHECKING: 
    37    from . import AttributeEventToken 
    38    from . import Mapper 
    39    from .collections import CollectionAdapter 
    40    from ..sql.elements import ColumnElement 
    41 
    42_KT = TypeVar("_KT", bound=Any) 
    43_VT = TypeVar("_VT", bound=Any) 
    44 
    45 
    46class _PlainColumnGetter(Generic[_KT]): 
    47    """Plain column getter, stores collection of Column objects 
    48    directly. 
    49 
    50    Serializes to a :class:`._SerializableColumnGetterV2` 
    51    which has more expensive __call__() performance 
    52    and some rare caveats. 
    53 
    54    """ 
    55 
    56    __slots__ = ("cols", "composite") 
    57 
    58    def __init__(self, cols: Sequence[ColumnElement[_KT]]) -> None: 
    59        self.cols = cols 
    60        self.composite = len(cols) > 1 
    61 
    62    def __reduce__( 
    63        self, 
    64    ) -> Tuple[ 
    65        Type[_SerializableColumnGetterV2[_KT]], 
    66        Tuple[Sequence[Tuple[Optional[str], Optional[str]]]], 
    67    ]: 
    68        return _SerializableColumnGetterV2._reduce_from_cols(self.cols) 
    69 
    70    def _cols(self, mapper: Mapper[_KT]) -> Sequence[ColumnElement[_KT]]: 
    71        return self.cols 
    72 
    73    def __call__(self, value: _KT) -> MissingOr[Union[_KT, Tuple[_KT, ...]]]: 
    74        state = base.instance_state(value) 
    75        m = base._state_mapper(state) 
    76 
    77        key: List[_KT] = [ 
    78            m._get_state_attr_by_column(state, state.dict, col) 
    79            for col in self._cols(m) 
    80        ] 
    81        if self.composite: 
    82            return tuple(key) 
    83        else: 
    84            obj = key[0] 
    85            if obj is None: 
    86                return Missing 
    87            else: 
    88                return obj 
    89 
    90 
    91class _SerializableColumnGetterV2(_PlainColumnGetter[_KT]): 
    92    """Updated serializable getter which deals with 
    93    multi-table mapped classes. 
    94 
    95    Two extremely unusual cases are not supported. 
    96    Mappings which have tables across multiple metadata 
    97    objects, or which are mapped to non-Table selectables 
    98    linked across inheriting mappers may fail to function 
    99    here. 
    100 
    101    """ 
    102 
    103    __slots__ = ("colkeys",) 
    104 
    105    def __init__( 
    106        self, colkeys: Sequence[Tuple[Optional[str], Optional[str]]] 
    107    ) -> None: 
    108        self.colkeys = colkeys 
    109        self.composite = len(colkeys) > 1 
    110 
    111    def __reduce__( 
    112        self, 
    113    ) -> Tuple[ 
    114        Type[_SerializableColumnGetterV2[_KT]], 
    115        Tuple[Sequence[Tuple[Optional[str], Optional[str]]]], 
    116    ]: 
    117        return self.__class__, (self.colkeys,) 
    118 
    119    @classmethod 
    120    def _reduce_from_cols(cls, cols: Sequence[ColumnElement[_KT]]) -> Tuple[ 
    121        Type[_SerializableColumnGetterV2[_KT]], 
    122        Tuple[Sequence[Tuple[Optional[str], Optional[str]]]], 
    123    ]: 
    124        def _table_key(c: ColumnElement[_KT]) -> Optional[str]: 
    125            if not isinstance(c.table, expression.TableClause): 
    126                return None 
    127            else: 
    128                return c.table.key  # type: ignore 
    129 
    130        colkeys = [(c.key, _table_key(c)) for c in cols] 
    131        return _SerializableColumnGetterV2, (colkeys,) 
    132 
    133    def _cols(self, mapper: Mapper[_KT]) -> Sequence[ColumnElement[_KT]]: 
    134        cols: List[ColumnElement[_KT]] = [] 
    135        metadata = getattr(mapper.local_table, "metadata", None) 
    136        for ckey, tkey in self.colkeys: 
    137            if tkey is None or metadata is None or tkey not in metadata: 
    138                cols.append(mapper.local_table.c[ckey])  # type: ignore 
    139            else: 
    140                cols.append(metadata.tables[tkey].c[ckey]) 
    141        return cols 
    142 
    143 
    144def column_keyed_dict( 
    145    mapping_spec: Union[Type[_KT], Callable[[_KT], _VT]], 
    146    *, 
    147    ignore_unpopulated_attribute: bool = False, 
    148) -> Type[KeyFuncDict[_KT, _KT]]: 
    149    """A dictionary-based collection type with column-based keying. 
    150 
    151    .. versionchanged:: 2.0 Renamed :data:`.column_mapped_collection` to 
    152       :class:`.column_keyed_dict`. 
    153 
    154    Returns a :class:`.KeyFuncDict` factory which will produce new 
    155    dictionary keys based on the value of a particular :class:`.Column`-mapped 
    156    attribute on ORM mapped instances to be added to the dictionary. 
    157 
    158    .. note:: the value of the target attribute must be assigned with its 
    159       value at the time that the object is being added to the 
    160       dictionary collection.   Additionally, changes to the key attribute 
    161       are **not tracked**, which means the key in the dictionary is not 
    162       automatically synchronized with the key value on the target object 
    163       itself.  See :ref:`key_collections_mutations` for further details. 
    164 
    165    .. seealso:: 
    166 
    167        :ref:`orm_dictionary_collection` - background on use 
    168 
    169    :param mapping_spec: a :class:`_schema.Column` object that is expected 
    170     to be mapped by the target mapper to a particular attribute on the 
    171     mapped class, the value of which on a particular instance is to be used 
    172     as the key for a new dictionary entry for that instance. 
    173    :param ignore_unpopulated_attribute:  if True, and the mapped attribute 
    174     indicated by the given :class:`_schema.Column` target attribute 
    175     on an object is not populated at all, the operation will be silently 
    176     skipped.  By default, an error is raised. 
    177 
    178     .. versionadded:: 2.0 an error is raised by default if the attribute 
    179        being used for the dictionary key is determined that it was never 
    180        populated with any value.  The 
    181        :paramref:`_orm.column_keyed_dict.ignore_unpopulated_attribute` 
    182        parameter may be set which will instead indicate that this condition 
    183        should be ignored, and the append operation silently skipped. 
    184        This is in contrast to the behavior of the 1.x series which would 
    185        erroneously populate the value in the dictionary with an arbitrary key 
    186        value of ``None``. 
    187 
    188 
    189    """ 
    190    cols = [ 
    191        coercions.expect(roles.ColumnArgumentRole, q, argname="mapping_spec") 
    192        for q in util.to_list(mapping_spec) 
    193    ] 
    194    keyfunc = _PlainColumnGetter(cols) 
    195    return _mapped_collection_cls( 
    196        keyfunc, 
    197        ignore_unpopulated_attribute=ignore_unpopulated_attribute, 
    198    ) 
    199 
    200 
    201class _AttrGetter: 
    202    __slots__ = ("attr_name", "getter") 
    203 
    204    def __init__(self, attr_name: str): 
    205        self.attr_name = attr_name 
    206        self.getter = operator.attrgetter(attr_name) 
    207 
    208    def __call__(self, mapped_object: Any) -> Any: 
    209        obj = self.getter(mapped_object) 
    210        if obj is None: 
    211            state = base.instance_state(mapped_object) 
    212            mp = state.mapper 
    213            if self.attr_name in mp.attrs: 
    214                dict_ = state.dict 
    215                obj = dict_.get(self.attr_name, base.NO_VALUE) 
    216                if obj is None: 
    217                    return Missing 
    218            else: 
    219                return Missing 
    220 
    221        return obj 
    222 
    223    def __reduce__(self) -> Tuple[Type[_AttrGetter], Tuple[str]]: 
    224        return _AttrGetter, (self.attr_name,) 
    225 
    226 
    227def attribute_keyed_dict( 
    228    attr_name: str, *, ignore_unpopulated_attribute: bool = False 
    229) -> Type[KeyFuncDict[Any, Any]]: 
    230    """A dictionary-based collection type with attribute-based keying. 
    231 
    232    .. versionchanged:: 2.0 Renamed :data:`.attribute_mapped_collection` to 
    233       :func:`.attribute_keyed_dict`. 
    234 
    235    Returns a :class:`.KeyFuncDict` factory which will produce new 
    236    dictionary keys based on the value of a particular named attribute on 
    237    ORM mapped instances to be added to the dictionary. 
    238 
    239    .. note:: the value of the target attribute must be assigned with its 
    240       value at the time that the object is being added to the 
    241       dictionary collection.   Additionally, changes to the key attribute 
    242       are **not tracked**, which means the key in the dictionary is not 
    243       automatically synchronized with the key value on the target object 
    244       itself.  See :ref:`key_collections_mutations` for further details. 
    245 
    246    .. seealso:: 
    247 
    248        :ref:`orm_dictionary_collection` - background on use 
    249 
    250    :param attr_name: string name of an ORM-mapped attribute 
    251     on the mapped class, the value of which on a particular instance 
    252     is to be used as the key for a new dictionary entry for that instance. 
    253    :param ignore_unpopulated_attribute:  if True, and the target attribute 
    254     on an object is not populated at all, the operation will be silently 
    255     skipped.  By default, an error is raised. 
    256 
    257     .. versionadded:: 2.0 an error is raised by default if the attribute 
    258        being used for the dictionary key is determined that it was never 
    259        populated with any value.  The 
    260        :paramref:`_orm.attribute_keyed_dict.ignore_unpopulated_attribute` 
    261        parameter may be set which will instead indicate that this condition 
    262        should be ignored, and the append operation silently skipped. 
    263        This is in contrast to the behavior of the 1.x series which would 
    264        erroneously populate the value in the dictionary with an arbitrary key 
    265        value of ``None``. 
    266 
    267 
    268    """ 
    269 
    270    return _mapped_collection_cls( 
    271        _AttrGetter(attr_name), 
    272        ignore_unpopulated_attribute=ignore_unpopulated_attribute, 
    273    ) 
    274 
    275 
    276def keyfunc_mapping( 
    277    keyfunc: Callable[[Any], Any], 
    278    *, 
    279    ignore_unpopulated_attribute: bool = False, 
    280) -> Type[KeyFuncDict[_KT, Any]]: 
    281    """A dictionary-based collection type with arbitrary keying. 
    282 
    283    .. versionchanged:: 2.0 Renamed :data:`.mapped_collection` to 
    284       :func:`.keyfunc_mapping`. 
    285 
    286    Returns a :class:`.KeyFuncDict` factory with a keying function 
    287    generated from keyfunc, a callable that takes an entity and returns a 
    288    key value. 
    289 
    290    .. note:: the given keyfunc is called only once at the time that the 
    291       target object is being added to the collection.   Changes to the 
    292       effective value returned by the function are not tracked. 
    293 
    294 
    295    .. seealso:: 
    296 
    297        :ref:`orm_dictionary_collection` - background on use 
    298 
    299    :param keyfunc: a callable that will be passed the ORM-mapped instance 
    300     which should then generate a new key to use in the dictionary. 
    301     If the value returned is :attr:`.LoaderCallableStatus.NO_VALUE`, an error 
    302     is raised. 
    303    :param ignore_unpopulated_attribute:  if True, and the callable returns 
    304     :attr:`.LoaderCallableStatus.NO_VALUE` for a particular instance, the 
    305     operation will be silently skipped.  By default, an error is raised. 
    306 
    307     .. versionadded:: 2.0 an error is raised by default if the callable 
    308        being used for the dictionary key returns 
    309        :attr:`.LoaderCallableStatus.NO_VALUE`, which in an ORM attribute 
    310        context indicates an attribute that was never populated with any value. 
    311        The :paramref:`_orm.mapped_collection.ignore_unpopulated_attribute` 
    312        parameter may be set which will instead indicate that this condition 
    313        should be ignored, and the append operation silently skipped. This is 
    314        in contrast to the behavior of the 1.x series which would erroneously 
    315        populate the value in the dictionary with an arbitrary key value of 
    316        ``None``. 
    317 
    318 
    319    """ 
    320    return _mapped_collection_cls( 
    321        keyfunc, ignore_unpopulated_attribute=ignore_unpopulated_attribute 
    322    ) 
    323 
    324 
    325class KeyFuncDict(Dict[_KT, _VT]): 
    326    """Base for ORM mapped dictionary classes. 
    327 
    328    Extends the ``dict`` type with additional methods needed by SQLAlchemy ORM 
    329    collection classes. Use of :class:`_orm.KeyFuncDict` is most directly 
    330    by using the :func:`.attribute_keyed_dict` or 
    331    :func:`.column_keyed_dict` class factories. 
    332    :class:`_orm.KeyFuncDict` may also serve as the base for user-defined 
    333    custom dictionary classes. 
    334 
    335    .. versionchanged:: 2.0 Renamed :class:`.MappedCollection` to 
    336       :class:`.KeyFuncDict`. 
    337 
    338    .. seealso:: 
    339 
    340        :func:`_orm.attribute_keyed_dict` 
    341 
    342        :func:`_orm.column_keyed_dict` 
    343 
    344        :ref:`orm_dictionary_collection` 
    345 
    346        :ref:`orm_custom_collection` 
    347 
    348 
    349    """ 
    350 
    351    def __init__( 
    352        self, 
    353        keyfunc: Callable[[Any], Any], 
    354        *dict_args: Any, 
    355        ignore_unpopulated_attribute: bool = False, 
    356    ) -> None: 
    357        """Create a new collection with keying provided by keyfunc. 
    358 
    359        keyfunc may be any callable that takes an object and returns an object 
    360        for use as a dictionary key. 
    361 
    362        The keyfunc will be called every time the ORM needs to add a member by 
    363        value-only (such as when loading instances from the database) or 
    364        remove a member.  The usual cautions about dictionary keying apply- 
    365        ``keyfunc(object)`` should return the same output for the life of the 
    366        collection.  Keying based on mutable properties can result in 
    367        unreachable instances "lost" in the collection. 
    368 
    369        """ 
    370        self.keyfunc = keyfunc 
    371        self.ignore_unpopulated_attribute = ignore_unpopulated_attribute 
    372        super().__init__(*dict_args) 
    373 
    374    @classmethod 
    375    def _unreduce( 
    376        cls, 
    377        keyfunc: Callable[[Any], Any], 
    378        values: Dict[_KT, _KT], 
    379        adapter: Optional[CollectionAdapter] = None, 
    380    ) -> "KeyFuncDict[_KT, _KT]": 
    381        mp: KeyFuncDict[_KT, _KT] = KeyFuncDict(keyfunc) 
    382        mp.update(values) 
    383        # note that the adapter sets itself up onto this collection 
    384        # when its `__setstate__` method is called 
    385        return mp 
    386 
    387    def __reduce__( 
    388        self, 
    389    ) -> Tuple[ 
    390        Callable[[_KT, _KT], KeyFuncDict[_KT, _KT]], 
    391        Tuple[Any, Union[Dict[_KT, _KT], Dict[_KT, _KT]], CollectionAdapter], 
    392    ]: 
    393        return ( 
    394            KeyFuncDict._unreduce, 
    395            ( 
    396                self.keyfunc, 
    397                dict(self), 
    398                collection_adapter(self), 
    399            ), 
    400        ) 
    401 
    402    @util.preload_module("sqlalchemy.orm.attributes") 
    403    def _raise_for_unpopulated( 
    404        self, 
    405        value: _KT, 
    406        initiator: Union[AttributeEventToken, Literal[None, False]] = None, 
    407        *, 
    408        warn_only: bool, 
    409    ) -> None: 
    410        mapper = base.instance_state(value).mapper 
    411 
    412        attributes = util.preloaded.orm_attributes 
    413 
    414        if not isinstance(initiator, attributes.AttributeEventToken): 
    415            relationship = "unknown relationship" 
    416        elif initiator.key in mapper.attrs: 
    417            relationship = f"{mapper.attrs[initiator.key]}" 
    418        else: 
    419            relationship = initiator.key 
    420 
    421        if warn_only: 
    422            util.warn( 
    423                f"Attribute keyed dictionary value for " 
    424                f"attribute '{relationship}' was None; this will raise " 
    425                "in a future release. " 
    426                f"To skip this assignment entirely, " 
    427                f'Set the "ignore_unpopulated_attribute=True" ' 
    428                f"parameter on the mapped collection factory." 
    429            ) 
    430        else: 
    431            raise sa_exc.InvalidRequestError( 
    432                "In event triggered from population of " 
    433                f"attribute '{relationship}' " 
    434                "(potentially from a backref), " 
    435                f"can't populate value in KeyFuncDict; " 
    436                "dictionary key " 
    437                f"derived from {base.instance_str(value)} is not " 
    438                f"populated. Ensure appropriate state is set up on " 
    439                f"the {base.instance_str(value)} object " 
    440                f"before assigning to the {relationship} attribute. " 
    441                f"To skip this assignment entirely, " 
    442                f'Set the "ignore_unpopulated_attribute=True" ' 
    443                f"parameter on the mapped collection factory." 
    444            ) 
    445 
    446    @collection.appender  # type: ignore[misc] 
    447    @collection.internally_instrumented  # type: ignore[misc] 
    448    def set( 
    449        self, 
    450        value: _KT, 
    451        _sa_initiator: Union[AttributeEventToken, Literal[None, False]] = None, 
    452    ) -> None: 
    453        """Add an item by value, consulting the keyfunc for the key.""" 
    454 
    455        key = self.keyfunc(value) 
    456 
    457        if key is base.NO_VALUE: 
    458            if not self.ignore_unpopulated_attribute: 
    459                self._raise_for_unpopulated( 
    460                    value, _sa_initiator, warn_only=False 
    461                ) 
    462            else: 
    463                return 
    464        elif key is Missing: 
    465            if not self.ignore_unpopulated_attribute: 
    466                self._raise_for_unpopulated( 
    467                    value, _sa_initiator, warn_only=True 
    468                ) 
    469                key = None 
    470            else: 
    471                return 
    472 
    473        self.__setitem__(key, value, _sa_initiator)  # type: ignore[call-arg] 
    474 
    475    @collection.remover  # type: ignore[misc] 
    476    @collection.internally_instrumented  # type: ignore[misc] 
    477    def remove( 
    478        self, 
    479        value: _KT, 
    480        _sa_initiator: Union[AttributeEventToken, Literal[None, False]] = None, 
    481    ) -> None: 
    482        """Remove an item by value, consulting the keyfunc for the key.""" 
    483 
    484        key = self.keyfunc(value) 
    485 
    486        if key is base.NO_VALUE: 
    487            if not self.ignore_unpopulated_attribute: 
    488                self._raise_for_unpopulated( 
    489                    value, _sa_initiator, warn_only=False 
    490                ) 
    491            return 
    492        elif key is Missing: 
    493            if not self.ignore_unpopulated_attribute: 
    494                self._raise_for_unpopulated( 
    495                    value, _sa_initiator, warn_only=True 
    496                ) 
    497                key = None 
    498            else: 
    499                return 
    500 
    501        # Let self[key] raise if key is not in this collection 
    502        # testlib.pragma exempt:__ne__ 
    503        if self[key] != value: 
    504            raise sa_exc.InvalidRequestError( 
    505                "Can not remove '%s': collection holds '%s' for key '%s'. " 
    506                "Possible cause: is the KeyFuncDict key function " 
    507                "based on mutable properties or properties that only obtain " 
    508                "values after flush?" % (value, self[key], key) 
    509            ) 
    510        self.__delitem__(key, _sa_initiator)  # type: ignore[call-arg] 
    511 
    512 
    513def _mapped_collection_cls( 
    514    keyfunc: Callable[[Any], Any], ignore_unpopulated_attribute: bool 
    515) -> Type[KeyFuncDict[_KT, _KT]]: 
    516    class _MKeyfuncMapped(KeyFuncDict[_KT, _KT]): 
    517        def __init__(self, *dict_args: Any) -> None: 
    518            super().__init__( 
    519                keyfunc, 
    520                *dict_args, 
    521                ignore_unpopulated_attribute=ignore_unpopulated_attribute, 
    522            ) 
    523 
    524    return _MKeyfuncMapped 
    525 
    526 
    527MappedCollection = KeyFuncDict 
    528"""A synonym for :class:`.KeyFuncDict`. 
    529 
    530.. versionchanged:: 2.0 Renamed :class:`.MappedCollection` to 
    531   :class:`.KeyFuncDict`. 
    532 
    533""" 
    534 
    535mapped_collection = keyfunc_mapping 
    536"""A synonym for :func:`_orm.keyfunc_mapping`. 
    537 
    538.. versionchanged:: 2.0 Renamed :data:`.mapped_collection` to 
    539   :func:`_orm.keyfunc_mapping` 
    540 
    541""" 
    542 
    543attribute_mapped_collection = attribute_keyed_dict 
    544"""A synonym for :func:`_orm.attribute_keyed_dict`. 
    545 
    546.. versionchanged:: 2.0 Renamed :data:`.attribute_mapped_collection` to 
    547   :func:`_orm.attribute_keyed_dict` 
    548 
    549""" 
    550 
    551column_mapped_collection = column_keyed_dict 
    552"""A synonym for :func:`_orm.column_keyed_dict. 
    553 
    554.. versionchanged:: 2.0 Renamed :func:`.column_mapped_collection` to 
    555   :func:`_orm.column_keyed_dict` 
    556 
    557"""