1# orm/mapped_collection.py
2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8from __future__ import annotations
9
10import operator
11from typing import Any
12from typing import Callable
13from typing import Dict
14from typing import Generic
15from typing import List
16from typing import Optional
17from typing import Sequence
18from typing import Tuple
19from typing import Type
20from typing import TYPE_CHECKING
21from typing import TypeVar
22from typing import Union
23
24from . import base
25from .collections import collection
26from .collections import collection_adapter
27from .. import exc as sa_exc
28from .. import util
29from ..sql import coercions
30from ..sql import expression
31from ..sql import roles
32from ..util.typing import Literal
33
34if TYPE_CHECKING:
35 from . import AttributeEventToken
36 from . import Mapper
37 from .collections import CollectionAdapter
38 from ..sql.elements import ColumnElement
39
40_KT = TypeVar("_KT", bound=Any)
41_VT = TypeVar("_VT", bound=Any)
42
43_F = TypeVar("_F", bound=Callable[[Any], Any])
44
45
46class _PlainColumnGetter(Generic[_KT]):
47 """Plain column getter, stores collection of Column objects
48 directly.
49
50 Serializes to a :class:`._SerializableColumnGetterV2`
51 which has more expensive __call__() performance
52 and some rare caveats.
53
54 """
55
56 __slots__ = ("cols", "composite")
57
58 def __init__(self, cols: Sequence[ColumnElement[_KT]]) -> None:
59 self.cols = cols
60 self.composite = len(cols) > 1
61
62 def __reduce__(
63 self,
64 ) -> Tuple[
65 Type[_SerializableColumnGetterV2[_KT]],
66 Tuple[Sequence[Tuple[Optional[str], Optional[str]]]],
67 ]:
68 return _SerializableColumnGetterV2._reduce_from_cols(self.cols)
69
70 def _cols(self, mapper: Mapper[_KT]) -> Sequence[ColumnElement[_KT]]:
71 return self.cols
72
73 def __call__(self, value: _KT) -> Union[_KT, Tuple[_KT, ...]]:
74 state = base.instance_state(value)
75 m = base._state_mapper(state)
76
77 key: List[_KT] = [
78 m._get_state_attr_by_column(state, state.dict, col)
79 for col in self._cols(m)
80 ]
81 if self.composite:
82 return tuple(key)
83 else:
84 obj = key[0]
85 if obj is None:
86 return _UNMAPPED_AMBIGUOUS_NONE
87 else:
88 return obj
89
90
91class _SerializableColumnGetterV2(_PlainColumnGetter[_KT]):
92 """Updated serializable getter which deals with
93 multi-table mapped classes.
94
95 Two extremely unusual cases are not supported.
96 Mappings which have tables across multiple metadata
97 objects, or which are mapped to non-Table selectables
98 linked across inheriting mappers may fail to function
99 here.
100
101 """
102
103 __slots__ = ("colkeys",)
104
105 def __init__(
106 self, colkeys: Sequence[Tuple[Optional[str], Optional[str]]]
107 ) -> None:
108 self.colkeys = colkeys
109 self.composite = len(colkeys) > 1
110
111 def __reduce__(
112 self,
113 ) -> Tuple[
114 Type[_SerializableColumnGetterV2[_KT]],
115 Tuple[Sequence[Tuple[Optional[str], Optional[str]]]],
116 ]:
117 return self.__class__, (self.colkeys,)
118
119 @classmethod
120 def _reduce_from_cols(cls, cols: Sequence[ColumnElement[_KT]]) -> Tuple[
121 Type[_SerializableColumnGetterV2[_KT]],
122 Tuple[Sequence[Tuple[Optional[str], Optional[str]]]],
123 ]:
124 def _table_key(c: ColumnElement[_KT]) -> Optional[str]:
125 if not isinstance(c.table, expression.TableClause):
126 return None
127 else:
128 return c.table.key # type: ignore
129
130 colkeys = [(c.key, _table_key(c)) for c in cols]
131 return _SerializableColumnGetterV2, (colkeys,)
132
133 def _cols(self, mapper: Mapper[_KT]) -> Sequence[ColumnElement[_KT]]:
134 cols: List[ColumnElement[_KT]] = []
135 metadata = getattr(mapper.local_table, "metadata", None)
136 for ckey, tkey in self.colkeys:
137 if tkey is None or metadata is None or tkey not in metadata:
138 cols.append(mapper.local_table.c[ckey]) # type: ignore
139 else:
140 cols.append(metadata.tables[tkey].c[ckey])
141 return cols
142
143
144def column_keyed_dict(
145 mapping_spec: Union[Type[_KT], Callable[[_KT], _VT]],
146 *,
147 ignore_unpopulated_attribute: bool = False,
148) -> Type[KeyFuncDict[_KT, _KT]]:
149 """A dictionary-based collection type with column-based keying.
150
151 .. versionchanged:: 2.0 Renamed :data:`.column_mapped_collection` to
152 :class:`.column_keyed_dict`.
153
154 Returns a :class:`.KeyFuncDict` factory which will produce new
155 dictionary keys based on the value of a particular :class:`.Column`-mapped
156 attribute on ORM mapped instances to be added to the dictionary.
157
158 .. note:: the value of the target attribute must be assigned with its
159 value at the time that the object is being added to the
160 dictionary collection. Additionally, changes to the key attribute
161 are **not tracked**, which means the key in the dictionary is not
162 automatically synchronized with the key value on the target object
163 itself. See :ref:`key_collections_mutations` for further details.
164
165 .. seealso::
166
167 :ref:`orm_dictionary_collection` - background on use
168
169 :param mapping_spec: a :class:`_schema.Column` object that is expected
170 to be mapped by the target mapper to a particular attribute on the
171 mapped class, the value of which on a particular instance is to be used
172 as the key for a new dictionary entry for that instance.
173 :param ignore_unpopulated_attribute: if True, and the mapped attribute
174 indicated by the given :class:`_schema.Column` target attribute
175 on an object is not populated at all, the operation will be silently
176 skipped. By default, an error is raised.
177
178 .. versionadded:: 2.0 an error is raised by default if the attribute
179 being used for the dictionary key is determined that it was never
180 populated with any value. The
181 :paramref:`_orm.column_keyed_dict.ignore_unpopulated_attribute`
182 parameter may be set which will instead indicate that this condition
183 should be ignored, and the append operation silently skipped.
184 This is in contrast to the behavior of the 1.x series which would
185 erroneously populate the value in the dictionary with an arbitrary key
186 value of ``None``.
187
188
189 """
190 cols = [
191 coercions.expect(roles.ColumnArgumentRole, q, argname="mapping_spec")
192 for q in util.to_list(mapping_spec)
193 ]
194 keyfunc = _PlainColumnGetter(cols)
195 return _mapped_collection_cls(
196 keyfunc,
197 ignore_unpopulated_attribute=ignore_unpopulated_attribute,
198 )
199
200
201_UNMAPPED_AMBIGUOUS_NONE = object()
202
203
204class _AttrGetter:
205 __slots__ = ("attr_name", "getter")
206
207 def __init__(self, attr_name: str):
208 self.attr_name = attr_name
209 self.getter = operator.attrgetter(attr_name)
210
211 def __call__(self, mapped_object: Any) -> Any:
212 obj = self.getter(mapped_object)
213 if obj is None:
214 state = base.instance_state(mapped_object)
215 mp = state.mapper
216 if self.attr_name in mp.attrs:
217 dict_ = state.dict
218 obj = dict_.get(self.attr_name, base.NO_VALUE)
219 if obj is None:
220 return _UNMAPPED_AMBIGUOUS_NONE
221 else:
222 return _UNMAPPED_AMBIGUOUS_NONE
223
224 return obj
225
226 def __reduce__(self) -> Tuple[Type[_AttrGetter], Tuple[str]]:
227 return _AttrGetter, (self.attr_name,)
228
229
230def attribute_keyed_dict(
231 attr_name: str, *, ignore_unpopulated_attribute: bool = False
232) -> Type[KeyFuncDict[Any, Any]]:
233 """A dictionary-based collection type with attribute-based keying.
234
235 .. versionchanged:: 2.0 Renamed :data:`.attribute_mapped_collection` to
236 :func:`.attribute_keyed_dict`.
237
238 Returns a :class:`.KeyFuncDict` factory which will produce new
239 dictionary keys based on the value of a particular named attribute on
240 ORM mapped instances to be added to the dictionary.
241
242 .. note:: the value of the target attribute must be assigned with its
243 value at the time that the object is being added to the
244 dictionary collection. Additionally, changes to the key attribute
245 are **not tracked**, which means the key in the dictionary is not
246 automatically synchronized with the key value on the target object
247 itself. See :ref:`key_collections_mutations` for further details.
248
249 .. seealso::
250
251 :ref:`orm_dictionary_collection` - background on use
252
253 :param attr_name: string name of an ORM-mapped attribute
254 on the mapped class, the value of which on a particular instance
255 is to be used as the key for a new dictionary entry for that instance.
256 :param ignore_unpopulated_attribute: if True, and the target attribute
257 on an object is not populated at all, the operation will be silently
258 skipped. By default, an error is raised.
259
260 .. versionadded:: 2.0 an error is raised by default if the attribute
261 being used for the dictionary key is determined that it was never
262 populated with any value. The
263 :paramref:`_orm.attribute_keyed_dict.ignore_unpopulated_attribute`
264 parameter may be set which will instead indicate that this condition
265 should be ignored, and the append operation silently skipped.
266 This is in contrast to the behavior of the 1.x series which would
267 erroneously populate the value in the dictionary with an arbitrary key
268 value of ``None``.
269
270
271 """
272
273 return _mapped_collection_cls(
274 _AttrGetter(attr_name),
275 ignore_unpopulated_attribute=ignore_unpopulated_attribute,
276 )
277
278
279def keyfunc_mapping(
280 keyfunc: _F,
281 *,
282 ignore_unpopulated_attribute: bool = False,
283) -> Type[KeyFuncDict[_KT, Any]]:
284 """A dictionary-based collection type with arbitrary keying.
285
286 .. versionchanged:: 2.0 Renamed :data:`.mapped_collection` to
287 :func:`.keyfunc_mapping`.
288
289 Returns a :class:`.KeyFuncDict` factory with a keying function
290 generated from keyfunc, a callable that takes an entity and returns a
291 key value.
292
293 .. note:: the given keyfunc is called only once at the time that the
294 target object is being added to the collection. Changes to the
295 effective value returned by the function are not tracked.
296
297
298 .. seealso::
299
300 :ref:`orm_dictionary_collection` - background on use
301
302 :param keyfunc: a callable that will be passed the ORM-mapped instance
303 which should then generate a new key to use in the dictionary.
304 If the value returned is :attr:`.LoaderCallableStatus.NO_VALUE`, an error
305 is raised.
306 :param ignore_unpopulated_attribute: if True, and the callable returns
307 :attr:`.LoaderCallableStatus.NO_VALUE` for a particular instance, the
308 operation will be silently skipped. By default, an error is raised.
309
310 .. versionadded:: 2.0 an error is raised by default if the callable
311 being used for the dictionary key returns
312 :attr:`.LoaderCallableStatus.NO_VALUE`, which in an ORM attribute
313 context indicates an attribute that was never populated with any value.
314 The :paramref:`_orm.mapped_collection.ignore_unpopulated_attribute`
315 parameter may be set which will instead indicate that this condition
316 should be ignored, and the append operation silently skipped. This is
317 in contrast to the behavior of the 1.x series which would erroneously
318 populate the value in the dictionary with an arbitrary key value of
319 ``None``.
320
321
322 """
323 return _mapped_collection_cls(
324 keyfunc, ignore_unpopulated_attribute=ignore_unpopulated_attribute
325 )
326
327
328class KeyFuncDict(Dict[_KT, _VT]):
329 """Base for ORM mapped dictionary classes.
330
331 Extends the ``dict`` type with additional methods needed by SQLAlchemy ORM
332 collection classes. Use of :class:`_orm.KeyFuncDict` is most directly
333 by using the :func:`.attribute_keyed_dict` or
334 :func:`.column_keyed_dict` class factories.
335 :class:`_orm.KeyFuncDict` may also serve as the base for user-defined
336 custom dictionary classes.
337
338 .. versionchanged:: 2.0 Renamed :class:`.MappedCollection` to
339 :class:`.KeyFuncDict`.
340
341 .. seealso::
342
343 :func:`_orm.attribute_keyed_dict`
344
345 :func:`_orm.column_keyed_dict`
346
347 :ref:`orm_dictionary_collection`
348
349 :ref:`orm_custom_collection`
350
351
352 """
353
354 def __init__(
355 self,
356 keyfunc: _F,
357 *dict_args: Any,
358 ignore_unpopulated_attribute: bool = False,
359 ) -> None:
360 """Create a new collection with keying provided by keyfunc.
361
362 keyfunc may be any callable that takes an object and returns an object
363 for use as a dictionary key.
364
365 The keyfunc will be called every time the ORM needs to add a member by
366 value-only (such as when loading instances from the database) or
367 remove a member. The usual cautions about dictionary keying apply-
368 ``keyfunc(object)`` should return the same output for the life of the
369 collection. Keying based on mutable properties can result in
370 unreachable instances "lost" in the collection.
371
372 """
373 self.keyfunc = keyfunc
374 self.ignore_unpopulated_attribute = ignore_unpopulated_attribute
375 super().__init__(*dict_args)
376
377 @classmethod
378 def _unreduce(
379 cls,
380 keyfunc: _F,
381 values: Dict[_KT, _KT],
382 adapter: Optional[CollectionAdapter] = None,
383 ) -> "KeyFuncDict[_KT, _KT]":
384 mp: KeyFuncDict[_KT, _KT] = KeyFuncDict(keyfunc)
385 mp.update(values)
386 # note that the adapter sets itself up onto this collection
387 # when its `__setstate__` method is called
388 return mp
389
390 def __reduce__(
391 self,
392 ) -> Tuple[
393 Callable[[_KT, _KT], KeyFuncDict[_KT, _KT]],
394 Tuple[Any, Union[Dict[_KT, _KT], Dict[_KT, _KT]], CollectionAdapter],
395 ]:
396 return (
397 KeyFuncDict._unreduce,
398 (
399 self.keyfunc,
400 dict(self),
401 collection_adapter(self),
402 ),
403 )
404
405 @util.preload_module("sqlalchemy.orm.attributes")
406 def _raise_for_unpopulated(
407 self,
408 value: _KT,
409 initiator: Union[AttributeEventToken, Literal[None, False]] = None,
410 *,
411 warn_only: bool,
412 ) -> None:
413 mapper = base.instance_state(value).mapper
414
415 attributes = util.preloaded.orm_attributes
416
417 if not isinstance(initiator, attributes.AttributeEventToken):
418 relationship = "unknown relationship"
419 elif initiator.key in mapper.attrs:
420 relationship = f"{mapper.attrs[initiator.key]}"
421 else:
422 relationship = initiator.key
423
424 if warn_only:
425 util.warn(
426 f"Attribute keyed dictionary value for "
427 f"attribute '{relationship}' was None; this will raise "
428 "in a future release. "
429 f"To skip this assignment entirely, "
430 f'Set the "ignore_unpopulated_attribute=True" '
431 f"parameter on the mapped collection factory."
432 )
433 else:
434 raise sa_exc.InvalidRequestError(
435 "In event triggered from population of "
436 f"attribute '{relationship}' "
437 "(potentially from a backref), "
438 f"can't populate value in KeyFuncDict; "
439 "dictionary key "
440 f"derived from {base.instance_str(value)} is not "
441 f"populated. Ensure appropriate state is set up on "
442 f"the {base.instance_str(value)} object "
443 f"before assigning to the {relationship} attribute. "
444 f"To skip this assignment entirely, "
445 f'Set the "ignore_unpopulated_attribute=True" '
446 f"parameter on the mapped collection factory."
447 )
448
449 @collection.appender # type: ignore[misc]
450 @collection.internally_instrumented # type: ignore[misc]
451 def set(
452 self,
453 value: _KT,
454 _sa_initiator: Union[AttributeEventToken, Literal[None, False]] = None,
455 ) -> None:
456 """Add an item by value, consulting the keyfunc for the key."""
457
458 key = self.keyfunc(value)
459
460 if key is base.NO_VALUE:
461 if not self.ignore_unpopulated_attribute:
462 self._raise_for_unpopulated(
463 value, _sa_initiator, warn_only=False
464 )
465 else:
466 return
467 elif key is _UNMAPPED_AMBIGUOUS_NONE:
468 if not self.ignore_unpopulated_attribute:
469 self._raise_for_unpopulated(
470 value, _sa_initiator, warn_only=True
471 )
472 key = None
473 else:
474 return
475
476 self.__setitem__(key, value, _sa_initiator) # type: ignore[call-arg]
477
478 @collection.remover # type: ignore[misc]
479 @collection.internally_instrumented # type: ignore[misc]
480 def remove(
481 self,
482 value: _KT,
483 _sa_initiator: Union[AttributeEventToken, Literal[None, False]] = None,
484 ) -> None:
485 """Remove an item by value, consulting the keyfunc for the key."""
486
487 key = self.keyfunc(value)
488
489 if key is base.NO_VALUE:
490 if not self.ignore_unpopulated_attribute:
491 self._raise_for_unpopulated(
492 value, _sa_initiator, warn_only=False
493 )
494 return
495 elif key is _UNMAPPED_AMBIGUOUS_NONE:
496 if not self.ignore_unpopulated_attribute:
497 self._raise_for_unpopulated(
498 value, _sa_initiator, warn_only=True
499 )
500 key = None
501 else:
502 return
503
504 # Let self[key] raise if key is not in this collection
505 # testlib.pragma exempt:__ne__
506 if self[key] != value:
507 raise sa_exc.InvalidRequestError(
508 "Can not remove '%s': collection holds '%s' for key '%s'. "
509 "Possible cause: is the KeyFuncDict key function "
510 "based on mutable properties or properties that only obtain "
511 "values after flush?" % (value, self[key], key)
512 )
513 self.__delitem__(key, _sa_initiator) # type: ignore[call-arg]
514
515
516def _mapped_collection_cls(
517 keyfunc: _F, ignore_unpopulated_attribute: bool
518) -> Type[KeyFuncDict[_KT, _KT]]:
519 class _MKeyfuncMapped(KeyFuncDict[_KT, _KT]):
520 def __init__(self, *dict_args: Any) -> None:
521 super().__init__(
522 keyfunc,
523 *dict_args,
524 ignore_unpopulated_attribute=ignore_unpopulated_attribute,
525 )
526
527 return _MKeyfuncMapped
528
529
530MappedCollection = KeyFuncDict
531"""A synonym for :class:`.KeyFuncDict`.
532
533.. versionchanged:: 2.0 Renamed :class:`.MappedCollection` to
534 :class:`.KeyFuncDict`.
535
536"""
537
538mapped_collection = keyfunc_mapping
539"""A synonym for :func:`_orm.keyfunc_mapping`.
540
541.. versionchanged:: 2.0 Renamed :data:`.mapped_collection` to
542 :func:`_orm.keyfunc_mapping`
543
544"""
545
546attribute_mapped_collection = attribute_keyed_dict
547"""A synonym for :func:`_orm.attribute_keyed_dict`.
548
549.. versionchanged:: 2.0 Renamed :data:`.attribute_mapped_collection` to
550 :func:`_orm.attribute_keyed_dict`
551
552"""
553
554column_mapped_collection = column_keyed_dict
555"""A synonym for :func:`_orm.column_keyed_dict.
556
557.. versionchanged:: 2.0 Renamed :func:`.column_mapped_collection` to
558 :func:`_orm.column_keyed_dict`
559
560"""