Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/util.py: 34%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# orm/util.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
9from __future__ import annotations
11import enum
12import functools
13import re
14import types
15import typing
16from typing import AbstractSet
17from typing import Any
18from typing import Callable
19from typing import cast
20from typing import Dict
21from typing import FrozenSet
22from typing import Generic
23from typing import Iterable
24from typing import Iterator
25from typing import List
26from typing import Match
27from typing import Optional
28from typing import Sequence
29from typing import Tuple
30from typing import Type
31from typing import TYPE_CHECKING
32from typing import TypeVar
33from typing import Union
34import weakref
36from . import attributes # noqa
37from . import exc
38from . import exc as orm_exc
39from ._typing import _O
40from ._typing import insp_is_aliased_class
41from ._typing import insp_is_mapper
42from ._typing import prop_is_relationship
43from .base import _class_to_mapper as _class_to_mapper
44from .base import _MappedAnnotationBase
45from .base import _never_set as _never_set # noqa: F401
46from .base import _none_only_set as _none_only_set # noqa: F401
47from .base import _none_set as _none_set # noqa: F401
48from .base import attribute_str as attribute_str # noqa: F401
49from .base import class_mapper as class_mapper
50from .base import DynamicMapped
51from .base import InspectionAttr as InspectionAttr
52from .base import instance_str as instance_str # noqa: F401
53from .base import Mapped
54from .base import object_mapper as object_mapper
55from .base import object_state as object_state # noqa: F401
56from .base import opt_manager_of_class
57from .base import ORMDescriptor
58from .base import state_attribute_str as state_attribute_str # noqa: F401
59from .base import state_class_str as state_class_str # noqa: F401
60from .base import state_str as state_str # noqa: F401
61from .base import WriteOnlyMapped
62from .interfaces import CriteriaOption
63from .interfaces import MapperProperty as MapperProperty
64from .interfaces import ORMColumnsClauseRole
65from .interfaces import ORMEntityColumnsClauseRole
66from .interfaces import ORMFromClauseRole
67from .path_registry import PathRegistry as PathRegistry
68from .. import event
69from .. import exc as sa_exc
70from .. import inspection
71from .. import sql
72from .. import util
73from ..engine.result import result_tuple
74from ..sql import coercions
75from ..sql import expression
76from ..sql import lambdas
77from ..sql import roles
78from ..sql import util as sql_util
79from ..sql import visitors
80from ..sql._typing import is_selectable
81from ..sql.annotation import SupportsCloneAnnotations
82from ..sql.base import ColumnCollection
83from ..sql.cache_key import HasCacheKey
84from ..sql.cache_key import MemoizedHasCacheKey
85from ..sql.elements import ColumnElement
86from ..sql.elements import KeyedColumnElement
87from ..sql.selectable import FromClause
88from ..util.langhelpers import MemoizedSlots
89from ..util.typing import de_stringify_annotation as _de_stringify_annotation
90from ..util.typing import eval_name_only as _eval_name_only
91from ..util.typing import fixup_container_fwd_refs
92from ..util.typing import get_origin
93from ..util.typing import is_origin_of_cls
94from ..util.typing import Literal
95from ..util.typing import Protocol
97if typing.TYPE_CHECKING:
98 from ._typing import _EntityType
99 from ._typing import _IdentityKeyType
100 from ._typing import _InternalEntityType
101 from ._typing import _ORMCOLEXPR
102 from .context import _MapperEntity
103 from .context import ORMCompileState
104 from .mapper import Mapper
105 from .path_registry import AbstractEntityRegistry
106 from .query import Query
107 from .relationships import RelationshipProperty
108 from ..engine import Row
109 from ..engine import RowMapping
110 from ..sql._typing import _CE
111 from ..sql._typing import _ColumnExpressionArgument
112 from ..sql._typing import _EquivalentColumnMap
113 from ..sql._typing import _FromClauseArgument
114 from ..sql._typing import _OnClauseArgument
115 from ..sql._typing import _PropagateAttrsType
116 from ..sql.annotation import _SA
117 from ..sql.base import ReadOnlyColumnCollection
118 from ..sql.elements import BindParameter
119 from ..sql.selectable import _ColumnsClauseElement
120 from ..sql.selectable import Select
121 from ..sql.selectable import Selectable
122 from ..sql.visitors import anon_map
123 from ..util.typing import _AnnotationScanType
125_T = TypeVar("_T", bound=Any)
127all_cascades = frozenset(
128 (
129 "delete",
130 "delete-orphan",
131 "all",
132 "merge",
133 "expunge",
134 "save-update",
135 "refresh-expire",
136 "none",
137 )
138)
140_de_stringify_partial = functools.partial(
141 functools.partial,
142 locals_=util.immutabledict(
143 {
144 "Mapped": Mapped,
145 "WriteOnlyMapped": WriteOnlyMapped,
146 "DynamicMapped": DynamicMapped,
147 }
148 ),
149)
151# partial is practically useless as we have to write out the whole
152# function and maintain the signature anyway
155class _DeStringifyAnnotation(Protocol):
156 def __call__(
157 self,
158 cls: Type[Any],
159 annotation: _AnnotationScanType,
160 originating_module: str,
161 *,
162 str_cleanup_fn: Optional[Callable[[str, str], str]] = None,
163 include_generic: bool = False,
164 ) -> Type[Any]: ...
167de_stringify_annotation = cast(
168 _DeStringifyAnnotation, _de_stringify_partial(_de_stringify_annotation)
169)
172class _EvalNameOnly(Protocol):
173 def __call__(self, name: str, module_name: str) -> Any: ...
176eval_name_only = cast(_EvalNameOnly, _de_stringify_partial(_eval_name_only))
179class CascadeOptions(FrozenSet[str]):
180 """Keeps track of the options sent to
181 :paramref:`.relationship.cascade`"""
183 _add_w_all_cascades = all_cascades.difference(
184 ["all", "none", "delete-orphan"]
185 )
186 _allowed_cascades = all_cascades
188 _viewonly_cascades = ["expunge", "all", "none", "refresh-expire", "merge"]
190 __slots__ = (
191 "save_update",
192 "delete",
193 "refresh_expire",
194 "merge",
195 "expunge",
196 "delete_orphan",
197 )
199 save_update: bool
200 delete: bool
201 refresh_expire: bool
202 merge: bool
203 expunge: bool
204 delete_orphan: bool
206 def __new__(
207 cls, value_list: Optional[Union[Iterable[str], str]]
208 ) -> CascadeOptions:
209 if isinstance(value_list, str) or value_list is None:
210 return cls.from_string(value_list) # type: ignore
211 values = set(value_list)
212 if values.difference(cls._allowed_cascades):
213 raise sa_exc.ArgumentError(
214 "Invalid cascade option(s): %s"
215 % ", ".join(
216 [
217 repr(x)
218 for x in sorted(
219 values.difference(cls._allowed_cascades)
220 )
221 ]
222 )
223 )
225 if "all" in values:
226 values.update(cls._add_w_all_cascades)
227 if "none" in values:
228 values.clear()
229 values.discard("all")
231 self = super().__new__(cls, values)
232 self.save_update = "save-update" in values
233 self.delete = "delete" in values
234 self.refresh_expire = "refresh-expire" in values
235 self.merge = "merge" in values
236 self.expunge = "expunge" in values
237 self.delete_orphan = "delete-orphan" in values
239 if self.delete_orphan and not self.delete:
240 util.warn("The 'delete-orphan' cascade option requires 'delete'.")
241 return self
243 def __repr__(self):
244 return "CascadeOptions(%r)" % (",".join([x for x in sorted(self)]))
246 @classmethod
247 def from_string(cls, arg):
248 values = [c for c in re.split(r"\s*,\s*", arg or "") if c]
249 return cls(values)
252def _validator_events(desc, key, validator, include_removes, include_backrefs):
253 """Runs a validation method on an attribute value to be set or
254 appended.
255 """
257 if not include_backrefs:
259 def detect_is_backref(state, initiator):
260 impl = state.manager[key].impl
261 return initiator.impl is not impl
263 if include_removes:
265 def append(state, value, initiator):
266 if initiator.op is not attributes.OP_BULK_REPLACE and (
267 include_backrefs or not detect_is_backref(state, initiator)
268 ):
269 return validator(state.obj(), key, value, False)
270 else:
271 return value
273 def bulk_set(state, values, initiator):
274 if include_backrefs or not detect_is_backref(state, initiator):
275 obj = state.obj()
276 values[:] = [
277 validator(obj, key, value, False) for value in values
278 ]
280 def set_(state, value, oldvalue, initiator):
281 if include_backrefs or not detect_is_backref(state, initiator):
282 return validator(state.obj(), key, value, False)
283 else:
284 return value
286 def remove(state, value, initiator):
287 if include_backrefs or not detect_is_backref(state, initiator):
288 validator(state.obj(), key, value, True)
290 else:
292 def append(state, value, initiator):
293 if initiator.op is not attributes.OP_BULK_REPLACE and (
294 include_backrefs or not detect_is_backref(state, initiator)
295 ):
296 return validator(state.obj(), key, value)
297 else:
298 return value
300 def bulk_set(state, values, initiator):
301 if include_backrefs or not detect_is_backref(state, initiator):
302 obj = state.obj()
303 values[:] = [validator(obj, key, value) for value in values]
305 def set_(state, value, oldvalue, initiator):
306 if include_backrefs or not detect_is_backref(state, initiator):
307 return validator(state.obj(), key, value)
308 else:
309 return value
311 event.listen(desc, "append", append, raw=True, retval=True)
312 event.listen(desc, "bulk_replace", bulk_set, raw=True)
313 event.listen(desc, "set", set_, raw=True, retval=True)
314 if include_removes:
315 event.listen(desc, "remove", remove, raw=True, retval=True)
318def polymorphic_union(
319 table_map, typecolname, aliasname="p_union", cast_nulls=True
320):
321 """Create a ``UNION`` statement used by a polymorphic mapper.
323 See :ref:`concrete_inheritance` for an example of how
324 this is used.
326 :param table_map: mapping of polymorphic identities to
327 :class:`_schema.Table` objects.
328 :param typecolname: string name of a "discriminator" column, which will be
329 derived from the query, producing the polymorphic identity for
330 each row. If ``None``, no polymorphic discriminator is generated.
331 :param aliasname: name of the :func:`~sqlalchemy.sql.expression.alias()`
332 construct generated.
333 :param cast_nulls: if True, non-existent columns, which are represented
334 as labeled NULLs, will be passed into CAST. This is a legacy behavior
335 that is problematic on some backends such as Oracle - in which case it
336 can be set to False.
338 """
340 colnames: util.OrderedSet[str] = util.OrderedSet()
341 colnamemaps = {}
342 types = {}
343 for key in table_map:
344 table = table_map[key]
346 table = coercions.expect(
347 roles.StrictFromClauseRole, table, allow_select=True
348 )
349 table_map[key] = table
351 m = {}
352 for c in table.c:
353 if c.key == typecolname:
354 raise sa_exc.InvalidRequestError(
355 "Polymorphic union can't use '%s' as the discriminator "
356 "column due to mapped column %r; please apply the "
357 "'typecolname' "
358 "argument; this is available on "
359 "ConcreteBase as '_concrete_discriminator_name'"
360 % (typecolname, c)
361 )
362 colnames.add(c.key)
363 m[c.key] = c
364 types[c.key] = c.type
365 colnamemaps[table] = m
367 def col(name, table):
368 try:
369 return colnamemaps[table][name]
370 except KeyError:
371 if cast_nulls:
372 return sql.cast(sql.null(), types[name]).label(name)
373 else:
374 return sql.type_coerce(sql.null(), types[name]).label(name)
376 result = []
377 for type_, table in table_map.items():
378 if typecolname is not None:
379 result.append(
380 sql.select(
381 *(
382 [col(name, table) for name in colnames]
383 + [
384 sql.literal_column(
385 sql_util._quote_ddl_expr(type_)
386 ).label(typecolname)
387 ]
388 )
389 ).select_from(table)
390 )
391 else:
392 result.append(
393 sql.select(
394 *[col(name, table) for name in colnames]
395 ).select_from(table)
396 )
397 return sql.union_all(*result).alias(aliasname)
400def identity_key(
401 class_: Optional[Type[_T]] = None,
402 ident: Union[Any, Tuple[Any, ...]] = None,
403 *,
404 instance: Optional[_T] = None,
405 row: Optional[Union[Row[Any], RowMapping]] = None,
406 identity_token: Optional[Any] = None,
407) -> _IdentityKeyType[_T]:
408 r"""Generate "identity key" tuples, as are used as keys in the
409 :attr:`.Session.identity_map` dictionary.
411 This function has several call styles:
413 * ``identity_key(class, ident, identity_token=token)``
415 This form receives a mapped class and a primary key scalar or
416 tuple as an argument.
418 E.g.::
420 >>> identity_key(MyClass, (1, 2))
421 (<class '__main__.MyClass'>, (1, 2), None)
423 :param class: mapped class (must be a positional argument)
424 :param ident: primary key, may be a scalar or tuple argument.
425 :param identity_token: optional identity token
427 .. versionadded:: 1.2 added identity_token
430 * ``identity_key(instance=instance)``
432 This form will produce the identity key for a given instance. The
433 instance need not be persistent, only that its primary key attributes
434 are populated (else the key will contain ``None`` for those missing
435 values).
437 E.g.::
439 >>> instance = MyClass(1, 2)
440 >>> identity_key(instance=instance)
441 (<class '__main__.MyClass'>, (1, 2), None)
443 In this form, the given instance is ultimately run though
444 :meth:`_orm.Mapper.identity_key_from_instance`, which will have the
445 effect of performing a database check for the corresponding row
446 if the object is expired.
448 :param instance: object instance (must be given as a keyword arg)
450 * ``identity_key(class, row=row, identity_token=token)``
452 This form is similar to the class/tuple form, except is passed a
453 database result row as a :class:`.Row` or :class:`.RowMapping` object.
455 E.g.::
457 >>> row = engine.execute(text("select * from table where a=1 and b=2")).first()
458 >>> identity_key(MyClass, row=row)
459 (<class '__main__.MyClass'>, (1, 2), None)
461 :param class: mapped class (must be a positional argument)
462 :param row: :class:`.Row` row returned by a :class:`_engine.CursorResult`
463 (must be given as a keyword arg)
464 :param identity_token: optional identity token
466 .. versionadded:: 1.2 added identity_token
468 """ # noqa: E501
469 if class_ is not None:
470 mapper = class_mapper(class_)
471 if row is None:
472 if ident is None:
473 raise sa_exc.ArgumentError("ident or row is required")
474 return mapper.identity_key_from_primary_key(
475 tuple(util.to_list(ident)), identity_token=identity_token
476 )
477 else:
478 return mapper.identity_key_from_row(
479 row, identity_token=identity_token
480 )
481 elif instance is not None:
482 mapper = object_mapper(instance)
483 return mapper.identity_key_from_instance(instance)
484 else:
485 raise sa_exc.ArgumentError("class or instance is required")
488class _TraceAdaptRole(enum.Enum):
489 """Enumeration of all the use cases for ORMAdapter.
491 ORMAdapter remains one of the most complicated aspects of the ORM, as it is
492 used for in-place adaption of column expressions to be applied to a SELECT,
493 replacing :class:`.Table` and other objects that are mapped to classes with
494 aliases of those tables in the case of joined eager loading, or in the case
495 of polymorphic loading as used with concrete mappings or other custom "with
496 polymorphic" parameters, with whole user-defined subqueries. The
497 enumerations provide an overview of all the use cases used by ORMAdapter, a
498 layer of formality as to the introduction of new ORMAdapter use cases (of
499 which none are anticipated), as well as a means to trace the origins of a
500 particular ORMAdapter within runtime debugging.
502 SQLAlchemy 2.0 has greatly scaled back ORM features which relied heavily on
503 open-ended statement adaption, including the ``Query.with_polymorphic()``
504 method and the ``Query.select_from_entity()`` methods, favoring
505 user-explicit aliasing schemes using the ``aliased()`` and
506 ``with_polymorphic()`` standalone constructs; these still use adaption,
507 however the adaption is applied in a narrower scope.
509 """
511 # aliased() use that is used to adapt individual attributes at query
512 # construction time
513 ALIASED_INSP = enum.auto()
515 # joinedload cases; typically adapt an ON clause of a relationship
516 # join
517 JOINEDLOAD_USER_DEFINED_ALIAS = enum.auto()
518 JOINEDLOAD_PATH_WITH_POLYMORPHIC = enum.auto()
519 JOINEDLOAD_MEMOIZED_ADAPTER = enum.auto()
521 # polymorphic cases - these are complex ones that replace FROM
522 # clauses, replacing tables with subqueries
523 MAPPER_POLYMORPHIC_ADAPTER = enum.auto()
524 WITH_POLYMORPHIC_ADAPTER = enum.auto()
525 WITH_POLYMORPHIC_ADAPTER_RIGHT_JOIN = enum.auto()
526 DEPRECATED_JOIN_ADAPT_RIGHT_SIDE = enum.auto()
528 # the from_statement() case, used only to adapt individual attributes
529 # from a given statement to local ORM attributes at result fetching
530 # time. assigned to ORMCompileState._from_obj_alias
531 ADAPT_FROM_STATEMENT = enum.auto()
533 # the joinedload for queries that have LIMIT/OFFSET/DISTINCT case;
534 # the query is placed inside of a subquery with the LIMIT/OFFSET/etc.,
535 # joinedloads are then placed on the outside.
536 # assigned to ORMCompileState.compound_eager_adapter
537 COMPOUND_EAGER_STATEMENT = enum.auto()
539 # the legacy Query._set_select_from() case.
540 # this is needed for Query's set operations (i.e. UNION, etc. )
541 # as well as "legacy from_self()", which while removed from 2.0 as
542 # public API, is used for the Query.count() method. this one
543 # still does full statement traversal
544 # assigned to ORMCompileState._from_obj_alias
545 LEGACY_SELECT_FROM_ALIAS = enum.auto()
548class ORMStatementAdapter(sql_util.ColumnAdapter):
549 """ColumnAdapter which includes a role attribute."""
551 __slots__ = ("role",)
553 def __init__(
554 self,
555 role: _TraceAdaptRole,
556 selectable: Selectable,
557 *,
558 equivalents: Optional[_EquivalentColumnMap] = None,
559 adapt_required: bool = False,
560 allow_label_resolve: bool = True,
561 anonymize_labels: bool = False,
562 adapt_on_names: bool = False,
563 adapt_from_selectables: Optional[AbstractSet[FromClause]] = None,
564 ):
565 self.role = role
566 super().__init__(
567 selectable,
568 equivalents=equivalents,
569 adapt_required=adapt_required,
570 allow_label_resolve=allow_label_resolve,
571 anonymize_labels=anonymize_labels,
572 adapt_on_names=adapt_on_names,
573 adapt_from_selectables=adapt_from_selectables,
574 )
577class ORMAdapter(sql_util.ColumnAdapter):
578 """ColumnAdapter subclass which excludes adaptation of entities from
579 non-matching mappers.
581 """
583 __slots__ = ("role", "mapper", "is_aliased_class", "aliased_insp")
585 is_aliased_class: bool
586 aliased_insp: Optional[AliasedInsp[Any]]
588 def __init__(
589 self,
590 role: _TraceAdaptRole,
591 entity: _InternalEntityType[Any],
592 *,
593 equivalents: Optional[_EquivalentColumnMap] = None,
594 adapt_required: bool = False,
595 allow_label_resolve: bool = True,
596 anonymize_labels: bool = False,
597 selectable: Optional[Selectable] = None,
598 limit_on_entity: bool = True,
599 adapt_on_names: bool = False,
600 adapt_from_selectables: Optional[AbstractSet[FromClause]] = None,
601 ):
602 self.role = role
603 self.mapper = entity.mapper
604 if selectable is None:
605 selectable = entity.selectable
606 if insp_is_aliased_class(entity):
607 self.is_aliased_class = True
608 self.aliased_insp = entity
609 else:
610 self.is_aliased_class = False
611 self.aliased_insp = None
613 super().__init__(
614 selectable,
615 equivalents,
616 adapt_required=adapt_required,
617 allow_label_resolve=allow_label_resolve,
618 anonymize_labels=anonymize_labels,
619 include_fn=self._include_fn if limit_on_entity else None,
620 adapt_on_names=adapt_on_names,
621 adapt_from_selectables=adapt_from_selectables,
622 )
624 def _include_fn(self, elem):
625 entity = elem._annotations.get("parentmapper", None)
627 return not entity or entity.isa(self.mapper) or self.mapper.isa(entity)
630class AliasedClass(
631 inspection.Inspectable["AliasedInsp[_O]"], ORMColumnsClauseRole[_O]
632):
633 r"""Represents an "aliased" form of a mapped class for usage with Query.
635 The ORM equivalent of a :func:`~sqlalchemy.sql.expression.alias`
636 construct, this object mimics the mapped class using a
637 ``__getattr__`` scheme and maintains a reference to a
638 real :class:`~sqlalchemy.sql.expression.Alias` object.
640 A primary purpose of :class:`.AliasedClass` is to serve as an alternate
641 within a SQL statement generated by the ORM, such that an existing
642 mapped entity can be used in multiple contexts. A simple example::
644 # find all pairs of users with the same name
645 user_alias = aliased(User)
646 session.query(User, user_alias).join(
647 (user_alias, User.id > user_alias.id)
648 ).filter(User.name == user_alias.name)
650 :class:`.AliasedClass` is also capable of mapping an existing mapped
651 class to an entirely new selectable, provided this selectable is column-
652 compatible with the existing mapped selectable, and it can also be
653 configured in a mapping as the target of a :func:`_orm.relationship`.
654 See the links below for examples.
656 The :class:`.AliasedClass` object is constructed typically using the
657 :func:`_orm.aliased` function. It also is produced with additional
658 configuration when using the :func:`_orm.with_polymorphic` function.
660 The resulting object is an instance of :class:`.AliasedClass`.
661 This object implements an attribute scheme which produces the
662 same attribute and method interface as the original mapped
663 class, allowing :class:`.AliasedClass` to be compatible
664 with any attribute technique which works on the original class,
665 including hybrid attributes (see :ref:`hybrids_toplevel`).
667 The :class:`.AliasedClass` can be inspected for its underlying
668 :class:`_orm.Mapper`, aliased selectable, and other information
669 using :func:`_sa.inspect`::
671 from sqlalchemy import inspect
673 my_alias = aliased(MyClass)
674 insp = inspect(my_alias)
676 The resulting inspection object is an instance of :class:`.AliasedInsp`.
679 .. seealso::
681 :func:`.aliased`
683 :func:`.with_polymorphic`
685 :ref:`relationship_aliased_class`
687 :ref:`relationship_to_window_function`
690 """
692 __name__: str
694 def __init__(
695 self,
696 mapped_class_or_ac: _EntityType[_O],
697 alias: Optional[FromClause] = None,
698 name: Optional[str] = None,
699 flat: bool = False,
700 adapt_on_names: bool = False,
701 with_polymorphic_mappers: Optional[Sequence[Mapper[Any]]] = None,
702 with_polymorphic_discriminator: Optional[ColumnElement[Any]] = None,
703 base_alias: Optional[AliasedInsp[Any]] = None,
704 use_mapper_path: bool = False,
705 represents_outer_join: bool = False,
706 ):
707 insp = cast(
708 "_InternalEntityType[_O]", inspection.inspect(mapped_class_or_ac)
709 )
710 mapper = insp.mapper
712 nest_adapters = False
714 if alias is None:
715 if insp.is_aliased_class and insp.selectable._is_subquery:
716 alias = insp.selectable.alias()
717 else:
718 alias = (
719 mapper._with_polymorphic_selectable._anonymous_fromclause(
720 name=name,
721 flat=flat,
722 )
723 )
724 elif insp.is_aliased_class:
725 nest_adapters = True
727 assert alias is not None
728 self._aliased_insp = AliasedInsp(
729 self,
730 insp,
731 alias,
732 name,
733 (
734 with_polymorphic_mappers
735 if with_polymorphic_mappers
736 else mapper.with_polymorphic_mappers
737 ),
738 (
739 with_polymorphic_discriminator
740 if with_polymorphic_discriminator is not None
741 else mapper.polymorphic_on
742 ),
743 base_alias,
744 use_mapper_path,
745 adapt_on_names,
746 represents_outer_join,
747 nest_adapters,
748 )
750 self.__name__ = f"aliased({mapper.class_.__name__})"
752 @classmethod
753 def _reconstitute_from_aliased_insp(
754 cls, aliased_insp: AliasedInsp[_O]
755 ) -> AliasedClass[_O]:
756 obj = cls.__new__(cls)
757 obj.__name__ = f"aliased({aliased_insp.mapper.class_.__name__})"
758 obj._aliased_insp = aliased_insp
760 if aliased_insp._is_with_polymorphic:
761 for sub_aliased_insp in aliased_insp._with_polymorphic_entities:
762 if sub_aliased_insp is not aliased_insp:
763 ent = AliasedClass._reconstitute_from_aliased_insp(
764 sub_aliased_insp
765 )
766 setattr(obj, sub_aliased_insp.class_.__name__, ent)
768 return obj
770 def __getattr__(self, key: str) -> Any:
771 try:
772 _aliased_insp = self.__dict__["_aliased_insp"]
773 except KeyError:
774 raise AttributeError()
775 else:
776 target = _aliased_insp._target
777 # maintain all getattr mechanics
778 attr = getattr(target, key)
780 # attribute is a method, that will be invoked against a
781 # "self"; so just return a new method with the same function and
782 # new self
783 if hasattr(attr, "__call__") and hasattr(attr, "__self__"):
784 return types.MethodType(attr.__func__, self)
786 # attribute is a descriptor, that will be invoked against a
787 # "self"; so invoke the descriptor against this self
788 if hasattr(attr, "__get__"):
789 attr = attr.__get__(None, self)
791 # attributes within the QueryableAttribute system will want this
792 # to be invoked so the object can be adapted
793 if hasattr(attr, "adapt_to_entity"):
794 attr = attr.adapt_to_entity(_aliased_insp)
795 setattr(self, key, attr)
797 return attr
799 def _get_from_serialized(
800 self, key: str, mapped_class: _O, aliased_insp: AliasedInsp[_O]
801 ) -> Any:
802 # this method is only used in terms of the
803 # sqlalchemy.ext.serializer extension
804 attr = getattr(mapped_class, key)
805 if hasattr(attr, "__call__") and hasattr(attr, "__self__"):
806 return types.MethodType(attr.__func__, self)
808 # attribute is a descriptor, that will be invoked against a
809 # "self"; so invoke the descriptor against this self
810 if hasattr(attr, "__get__"):
811 attr = attr.__get__(None, self)
813 # attributes within the QueryableAttribute system will want this
814 # to be invoked so the object can be adapted
815 if hasattr(attr, "adapt_to_entity"):
816 aliased_insp._weak_entity = weakref.ref(self)
817 attr = attr.adapt_to_entity(aliased_insp)
818 setattr(self, key, attr)
820 return attr
822 def __repr__(self) -> str:
823 return "<AliasedClass at 0x%x; %s>" % (
824 id(self),
825 self._aliased_insp._target.__name__,
826 )
828 def __str__(self) -> str:
829 return str(self._aliased_insp)
832@inspection._self_inspects
833class AliasedInsp(
834 ORMEntityColumnsClauseRole[_O],
835 ORMFromClauseRole,
836 HasCacheKey,
837 InspectionAttr,
838 MemoizedSlots,
839 inspection.Inspectable["AliasedInsp[_O]"],
840 Generic[_O],
841):
842 """Provide an inspection interface for an
843 :class:`.AliasedClass` object.
845 The :class:`.AliasedInsp` object is returned
846 given an :class:`.AliasedClass` using the
847 :func:`_sa.inspect` function::
849 from sqlalchemy import inspect
850 from sqlalchemy.orm import aliased
852 my_alias = aliased(MyMappedClass)
853 insp = inspect(my_alias)
855 Attributes on :class:`.AliasedInsp`
856 include:
858 * ``entity`` - the :class:`.AliasedClass` represented.
859 * ``mapper`` - the :class:`_orm.Mapper` mapping the underlying class.
860 * ``selectable`` - the :class:`_expression.Alias`
861 construct which ultimately
862 represents an aliased :class:`_schema.Table` or
863 :class:`_expression.Select`
864 construct.
865 * ``name`` - the name of the alias. Also is used as the attribute
866 name when returned in a result tuple from :class:`_query.Query`.
867 * ``with_polymorphic_mappers`` - collection of :class:`_orm.Mapper`
868 objects
869 indicating all those mappers expressed in the select construct
870 for the :class:`.AliasedClass`.
871 * ``polymorphic_on`` - an alternate column or SQL expression which
872 will be used as the "discriminator" for a polymorphic load.
874 .. seealso::
876 :ref:`inspection_toplevel`
878 """
880 __slots__ = (
881 "__weakref__",
882 "_weak_entity",
883 "mapper",
884 "selectable",
885 "name",
886 "_adapt_on_names",
887 "with_polymorphic_mappers",
888 "polymorphic_on",
889 "_use_mapper_path",
890 "_base_alias",
891 "represents_outer_join",
892 "persist_selectable",
893 "local_table",
894 "_is_with_polymorphic",
895 "_with_polymorphic_entities",
896 "_adapter",
897 "_target",
898 "__clause_element__",
899 "_memoized_values",
900 "_all_column_expressions",
901 "_nest_adapters",
902 )
904 _cache_key_traversal = [
905 ("name", visitors.ExtendedInternalTraversal.dp_string),
906 ("_adapt_on_names", visitors.ExtendedInternalTraversal.dp_boolean),
907 ("_use_mapper_path", visitors.ExtendedInternalTraversal.dp_boolean),
908 ("_target", visitors.ExtendedInternalTraversal.dp_inspectable),
909 ("selectable", visitors.ExtendedInternalTraversal.dp_clauseelement),
910 (
911 "with_polymorphic_mappers",
912 visitors.InternalTraversal.dp_has_cache_key_list,
913 ),
914 ("polymorphic_on", visitors.InternalTraversal.dp_clauseelement),
915 ]
917 mapper: Mapper[_O]
918 selectable: FromClause
919 _adapter: ORMAdapter
920 with_polymorphic_mappers: Sequence[Mapper[Any]]
921 _with_polymorphic_entities: Sequence[AliasedInsp[Any]]
923 _weak_entity: weakref.ref[AliasedClass[_O]]
924 """the AliasedClass that refers to this AliasedInsp"""
926 _target: Union[Type[_O], AliasedClass[_O]]
927 """the thing referenced by the AliasedClass/AliasedInsp.
929 In the vast majority of cases, this is the mapped class. However
930 it may also be another AliasedClass (alias of alias).
932 """
934 def __init__(
935 self,
936 entity: AliasedClass[_O],
937 inspected: _InternalEntityType[_O],
938 selectable: FromClause,
939 name: Optional[str],
940 with_polymorphic_mappers: Optional[Sequence[Mapper[Any]]],
941 polymorphic_on: Optional[ColumnElement[Any]],
942 _base_alias: Optional[AliasedInsp[Any]],
943 _use_mapper_path: bool,
944 adapt_on_names: bool,
945 represents_outer_join: bool,
946 nest_adapters: bool,
947 ):
948 mapped_class_or_ac = inspected.entity
949 mapper = inspected.mapper
951 self._weak_entity = weakref.ref(entity)
952 self.mapper = mapper
953 self.selectable = self.persist_selectable = self.local_table = (
954 selectable
955 )
956 self.name = name
957 self.polymorphic_on = polymorphic_on
958 self._base_alias = weakref.ref(_base_alias or self)
959 self._use_mapper_path = _use_mapper_path
960 self.represents_outer_join = represents_outer_join
961 self._nest_adapters = nest_adapters
963 if with_polymorphic_mappers:
964 self._is_with_polymorphic = True
965 self.with_polymorphic_mappers = with_polymorphic_mappers
966 self._with_polymorphic_entities = []
967 for poly in self.with_polymorphic_mappers:
968 if poly is not mapper:
969 ent = AliasedClass(
970 poly.class_,
971 selectable,
972 base_alias=self,
973 adapt_on_names=adapt_on_names,
974 use_mapper_path=_use_mapper_path,
975 )
977 setattr(self.entity, poly.class_.__name__, ent)
978 self._with_polymorphic_entities.append(ent._aliased_insp)
980 else:
981 self._is_with_polymorphic = False
982 self.with_polymorphic_mappers = [mapper]
984 self._adapter = ORMAdapter(
985 _TraceAdaptRole.ALIASED_INSP,
986 mapper,
987 selectable=selectable,
988 equivalents=mapper._equivalent_columns,
989 adapt_on_names=adapt_on_names,
990 anonymize_labels=True,
991 # make sure the adapter doesn't try to grab other tables that
992 # are not even the thing we are mapping, such as embedded
993 # selectables in subqueries or CTEs. See issue #6060
994 adapt_from_selectables={
995 m.selectable
996 for m in self.with_polymorphic_mappers
997 if not adapt_on_names
998 },
999 limit_on_entity=False,
1000 )
1002 if nest_adapters:
1003 # supports "aliased class of aliased class" use case
1004 assert isinstance(inspected, AliasedInsp)
1005 self._adapter = inspected._adapter.wrap(self._adapter)
1007 self._adapt_on_names = adapt_on_names
1008 self._target = mapped_class_or_ac
1010 @property
1011 def _post_inspect(self): # type: ignore[override]
1012 self.mapper._check_configure()
1014 @classmethod
1015 def _alias_factory(
1016 cls,
1017 element: Union[_EntityType[_O], FromClause],
1018 alias: Optional[FromClause] = None,
1019 name: Optional[str] = None,
1020 flat: bool = False,
1021 adapt_on_names: bool = False,
1022 ) -> Union[AliasedClass[_O], FromClause]:
1023 if isinstance(element, FromClause):
1024 if adapt_on_names:
1025 raise sa_exc.ArgumentError(
1026 "adapt_on_names only applies to ORM elements"
1027 )
1028 if name:
1029 return element.alias(name=name, flat=flat)
1030 else:
1031 return coercions.expect(
1032 roles.AnonymizedFromClauseRole, element, flat=flat
1033 )
1034 else:
1035 return AliasedClass(
1036 element,
1037 alias=alias,
1038 flat=flat,
1039 name=name,
1040 adapt_on_names=adapt_on_names,
1041 )
1043 @classmethod
1044 def _with_polymorphic_factory(
1045 cls,
1046 base: Union[Type[_O], Mapper[_O]],
1047 classes: Union[Literal["*"], Iterable[_EntityType[Any]]],
1048 selectable: Union[Literal[False, None], FromClause] = False,
1049 flat: bool = False,
1050 polymorphic_on: Optional[ColumnElement[Any]] = None,
1051 aliased: bool = False,
1052 innerjoin: bool = False,
1053 adapt_on_names: bool = False,
1054 name: Optional[str] = None,
1055 _use_mapper_path: bool = False,
1056 ) -> AliasedClass[_O]:
1057 primary_mapper = _class_to_mapper(base)
1059 if selectable not in (None, False) and flat:
1060 raise sa_exc.ArgumentError(
1061 "the 'flat' and 'selectable' arguments cannot be passed "
1062 "simultaneously to with_polymorphic()"
1063 )
1065 mappers, selectable = primary_mapper._with_polymorphic_args(
1066 classes, selectable, innerjoin=innerjoin
1067 )
1068 if aliased or flat:
1069 assert selectable is not None
1070 selectable = selectable._anonymous_fromclause(flat=flat)
1072 return AliasedClass(
1073 base,
1074 selectable,
1075 name=name,
1076 with_polymorphic_mappers=mappers,
1077 adapt_on_names=adapt_on_names,
1078 with_polymorphic_discriminator=polymorphic_on,
1079 use_mapper_path=_use_mapper_path,
1080 represents_outer_join=not innerjoin,
1081 )
1083 @property
1084 def entity(self) -> AliasedClass[_O]:
1085 # to eliminate reference cycles, the AliasedClass is held weakly.
1086 # this produces some situations where the AliasedClass gets lost,
1087 # particularly when one is created internally and only the AliasedInsp
1088 # is passed around.
1089 # to work around this case, we just generate a new one when we need
1090 # it, as it is a simple class with very little initial state on it.
1091 ent = self._weak_entity()
1092 if ent is None:
1093 ent = AliasedClass._reconstitute_from_aliased_insp(self)
1094 self._weak_entity = weakref.ref(ent)
1095 return ent
1097 is_aliased_class = True
1098 "always returns True"
1100 def _memoized_method___clause_element__(self) -> FromClause:
1101 return self.selectable._annotate(
1102 {
1103 "parentmapper": self.mapper,
1104 "parententity": self,
1105 "entity_namespace": self,
1106 }
1107 )._set_propagate_attrs(
1108 {"compile_state_plugin": "orm", "plugin_subject": self}
1109 )
1111 @property
1112 def entity_namespace(self) -> AliasedClass[_O]:
1113 return self.entity
1115 @property
1116 def class_(self) -> Type[_O]:
1117 """Return the mapped class ultimately represented by this
1118 :class:`.AliasedInsp`."""
1119 return self.mapper.class_
1121 @property
1122 def _path_registry(self) -> AbstractEntityRegistry:
1123 if self._use_mapper_path:
1124 return self.mapper._path_registry
1125 else:
1126 return PathRegistry.per_mapper(self)
1128 def __getstate__(self) -> Dict[str, Any]:
1129 return {
1130 "entity": self.entity,
1131 "mapper": self.mapper,
1132 "alias": self.selectable,
1133 "name": self.name,
1134 "adapt_on_names": self._adapt_on_names,
1135 "with_polymorphic_mappers": self.with_polymorphic_mappers,
1136 "with_polymorphic_discriminator": self.polymorphic_on,
1137 "base_alias": self._base_alias(),
1138 "use_mapper_path": self._use_mapper_path,
1139 "represents_outer_join": self.represents_outer_join,
1140 "nest_adapters": self._nest_adapters,
1141 }
1143 def __setstate__(self, state: Dict[str, Any]) -> None:
1144 self.__init__( # type: ignore
1145 state["entity"],
1146 state["mapper"],
1147 state["alias"],
1148 state["name"],
1149 state["with_polymorphic_mappers"],
1150 state["with_polymorphic_discriminator"],
1151 state["base_alias"],
1152 state["use_mapper_path"],
1153 state["adapt_on_names"],
1154 state["represents_outer_join"],
1155 state["nest_adapters"],
1156 )
1158 def _merge_with(self, other: AliasedInsp[_O]) -> AliasedInsp[_O]:
1159 # assert self._is_with_polymorphic
1160 # assert other._is_with_polymorphic
1162 primary_mapper = other.mapper
1164 assert self.mapper is primary_mapper
1166 our_classes = util.to_set(
1167 mp.class_ for mp in self.with_polymorphic_mappers
1168 )
1169 new_classes = {mp.class_ for mp in other.with_polymorphic_mappers}
1170 if our_classes == new_classes:
1171 return other
1172 else:
1173 classes = our_classes.union(new_classes)
1175 mappers, selectable = primary_mapper._with_polymorphic_args(
1176 classes, None, innerjoin=not other.represents_outer_join
1177 )
1178 selectable = selectable._anonymous_fromclause(flat=True)
1179 return AliasedClass(
1180 primary_mapper,
1181 selectable,
1182 with_polymorphic_mappers=mappers,
1183 with_polymorphic_discriminator=other.polymorphic_on,
1184 use_mapper_path=other._use_mapper_path,
1185 represents_outer_join=other.represents_outer_join,
1186 )._aliased_insp
1188 def _adapt_element(
1189 self, expr: _ORMCOLEXPR, key: Optional[str] = None
1190 ) -> _ORMCOLEXPR:
1191 assert isinstance(expr, ColumnElement)
1192 d: Dict[str, Any] = {
1193 "parententity": self,
1194 "parentmapper": self.mapper,
1195 }
1196 if key:
1197 d["proxy_key"] = key
1199 # IMO mypy should see this one also as returning the same type
1200 # we put into it, but it's not
1201 return (
1202 self._adapter.traverse(expr)
1203 ._annotate(d)
1204 ._set_propagate_attrs(
1205 {"compile_state_plugin": "orm", "plugin_subject": self}
1206 )
1207 )
1209 if TYPE_CHECKING:
1210 # establish compatibility with the _ORMAdapterProto protocol,
1211 # which in turn is compatible with _CoreAdapterProto.
1213 def _orm_adapt_element(
1214 self,
1215 obj: _CE,
1216 key: Optional[str] = None,
1217 ) -> _CE: ...
1219 else:
1220 _orm_adapt_element = _adapt_element
1222 def _entity_for_mapper(self, mapper):
1223 self_poly = self.with_polymorphic_mappers
1224 if mapper in self_poly:
1225 if mapper is self.mapper:
1226 return self
1227 else:
1228 return getattr(
1229 self.entity, mapper.class_.__name__
1230 )._aliased_insp
1231 elif mapper.isa(self.mapper):
1232 return self
1233 else:
1234 assert False, "mapper %s doesn't correspond to %s" % (mapper, self)
1236 def _memoized_attr__get_clause(self):
1237 onclause, replacemap = self.mapper._get_clause
1238 return (
1239 self._adapter.traverse(onclause),
1240 {
1241 self._adapter.traverse(col): param
1242 for col, param in replacemap.items()
1243 },
1244 )
1246 def _memoized_attr__memoized_values(self):
1247 return {}
1249 def _memoized_attr__all_column_expressions(self):
1250 if self._is_with_polymorphic:
1251 cols_plus_keys = self.mapper._columns_plus_keys(
1252 [ent.mapper for ent in self._with_polymorphic_entities]
1253 )
1254 else:
1255 cols_plus_keys = self.mapper._columns_plus_keys()
1257 cols_plus_keys = [
1258 (key, self._adapt_element(col)) for key, col in cols_plus_keys
1259 ]
1261 return ColumnCollection(cols_plus_keys)
1263 def _memo(self, key, callable_, *args, **kw):
1264 if key in self._memoized_values:
1265 return self._memoized_values[key]
1266 else:
1267 self._memoized_values[key] = value = callable_(*args, **kw)
1268 return value
1270 def __repr__(self):
1271 if self.with_polymorphic_mappers:
1272 with_poly = "(%s)" % ", ".join(
1273 mp.class_.__name__ for mp in self.with_polymorphic_mappers
1274 )
1275 else:
1276 with_poly = ""
1277 return "<AliasedInsp at 0x%x; %s%s>" % (
1278 id(self),
1279 self.class_.__name__,
1280 with_poly,
1281 )
1283 def __str__(self):
1284 if self._is_with_polymorphic:
1285 return "with_polymorphic(%s, [%s])" % (
1286 self._target.__name__,
1287 ", ".join(
1288 mp.class_.__name__
1289 for mp in self.with_polymorphic_mappers
1290 if mp is not self.mapper
1291 ),
1292 )
1293 else:
1294 return "aliased(%s)" % (self._target.__name__,)
1297class _WrapUserEntity:
1298 """A wrapper used within the loader_criteria lambda caller so that
1299 we can bypass declared_attr descriptors on unmapped mixins, which
1300 normally emit a warning for such use.
1302 might also be useful for other per-lambda instrumentations should
1303 the need arise.
1305 """
1307 __slots__ = ("subject",)
1309 def __init__(self, subject):
1310 self.subject = subject
1312 @util.preload_module("sqlalchemy.orm.decl_api")
1313 def __getattribute__(self, name):
1314 decl_api = util.preloaded.orm.decl_api
1316 subject = object.__getattribute__(self, "subject")
1317 if name in subject.__dict__ and isinstance(
1318 subject.__dict__[name], decl_api.declared_attr
1319 ):
1320 return subject.__dict__[name].fget(subject)
1321 else:
1322 return getattr(subject, name)
1325class LoaderCriteriaOption(CriteriaOption):
1326 """Add additional WHERE criteria to the load for all occurrences of
1327 a particular entity.
1329 :class:`_orm.LoaderCriteriaOption` is invoked using the
1330 :func:`_orm.with_loader_criteria` function; see that function for
1331 details.
1333 .. versionadded:: 1.4
1335 """
1337 __slots__ = (
1338 "root_entity",
1339 "entity",
1340 "deferred_where_criteria",
1341 "where_criteria",
1342 "_where_crit_orig",
1343 "include_aliases",
1344 "propagate_to_loaders",
1345 )
1347 _traverse_internals = [
1348 ("root_entity", visitors.ExtendedInternalTraversal.dp_plain_obj),
1349 ("entity", visitors.ExtendedInternalTraversal.dp_has_cache_key),
1350 ("where_criteria", visitors.InternalTraversal.dp_clauseelement),
1351 ("include_aliases", visitors.InternalTraversal.dp_boolean),
1352 ("propagate_to_loaders", visitors.InternalTraversal.dp_boolean),
1353 ]
1355 root_entity: Optional[Type[Any]]
1356 entity: Optional[_InternalEntityType[Any]]
1357 where_criteria: Union[ColumnElement[bool], lambdas.DeferredLambdaElement]
1358 deferred_where_criteria: bool
1359 include_aliases: bool
1360 propagate_to_loaders: bool
1362 _where_crit_orig: Any
1364 def __init__(
1365 self,
1366 entity_or_base: _EntityType[Any],
1367 where_criteria: Union[
1368 _ColumnExpressionArgument[bool],
1369 Callable[[Any], _ColumnExpressionArgument[bool]],
1370 ],
1371 loader_only: bool = False,
1372 include_aliases: bool = False,
1373 propagate_to_loaders: bool = True,
1374 track_closure_variables: bool = True,
1375 ):
1376 entity = cast(
1377 "_InternalEntityType[Any]",
1378 inspection.inspect(entity_or_base, False),
1379 )
1380 if entity is None:
1381 self.root_entity = cast("Type[Any]", entity_or_base)
1382 self.entity = None
1383 else:
1384 self.root_entity = None
1385 self.entity = entity
1387 self._where_crit_orig = where_criteria
1388 if callable(where_criteria):
1389 if self.root_entity is not None:
1390 wrap_entity = self.root_entity
1391 else:
1392 assert entity is not None
1393 wrap_entity = entity.entity
1395 self.deferred_where_criteria = True
1396 self.where_criteria = lambdas.DeferredLambdaElement(
1397 where_criteria,
1398 roles.WhereHavingRole,
1399 lambda_args=(_WrapUserEntity(wrap_entity),),
1400 opts=lambdas.LambdaOptions(
1401 track_closure_variables=track_closure_variables
1402 ),
1403 )
1404 else:
1405 self.deferred_where_criteria = False
1406 self.where_criteria = coercions.expect(
1407 roles.WhereHavingRole, where_criteria
1408 )
1410 self.include_aliases = include_aliases
1411 self.propagate_to_loaders = propagate_to_loaders
1413 @classmethod
1414 def _unreduce(
1415 cls, entity, where_criteria, include_aliases, propagate_to_loaders
1416 ):
1417 return LoaderCriteriaOption(
1418 entity,
1419 where_criteria,
1420 include_aliases=include_aliases,
1421 propagate_to_loaders=propagate_to_loaders,
1422 )
1424 def __reduce__(self):
1425 return (
1426 LoaderCriteriaOption._unreduce,
1427 (
1428 self.entity.class_ if self.entity else self.root_entity,
1429 self._where_crit_orig,
1430 self.include_aliases,
1431 self.propagate_to_loaders,
1432 ),
1433 )
1435 def _all_mappers(self) -> Iterator[Mapper[Any]]:
1436 if self.entity:
1437 yield from self.entity.mapper.self_and_descendants
1438 else:
1439 assert self.root_entity
1440 stack = list(self.root_entity.__subclasses__())
1441 while stack:
1442 subclass = stack.pop(0)
1443 ent = cast(
1444 "_InternalEntityType[Any]",
1445 inspection.inspect(subclass, raiseerr=False),
1446 )
1447 if ent:
1448 yield from ent.mapper.self_and_descendants
1449 else:
1450 stack.extend(subclass.__subclasses__())
1452 def _should_include(self, compile_state: ORMCompileState) -> bool:
1453 if (
1454 compile_state.select_statement._annotations.get(
1455 "for_loader_criteria", None
1456 )
1457 is self
1458 ):
1459 return False
1460 return True
1462 def _resolve_where_criteria(
1463 self, ext_info: _InternalEntityType[Any]
1464 ) -> ColumnElement[bool]:
1465 if self.deferred_where_criteria:
1466 crit = cast(
1467 "ColumnElement[bool]",
1468 self.where_criteria._resolve_with_args(ext_info.entity),
1469 )
1470 else:
1471 crit = self.where_criteria # type: ignore
1472 assert isinstance(crit, ColumnElement)
1473 return sql_util._deep_annotate(
1474 crit,
1475 {"for_loader_criteria": self},
1476 detect_subquery_cols=True,
1477 ind_cols_on_fromclause=True,
1478 )
1480 def process_compile_state_replaced_entities(
1481 self,
1482 compile_state: ORMCompileState,
1483 mapper_entities: Iterable[_MapperEntity],
1484 ) -> None:
1485 self.process_compile_state(compile_state)
1487 def process_compile_state(self, compile_state: ORMCompileState) -> None:
1488 """Apply a modification to a given :class:`.CompileState`."""
1490 # if options to limit the criteria to immediate query only,
1491 # use compile_state.attributes instead
1493 self.get_global_criteria(compile_state.global_attributes)
1495 def get_global_criteria(self, attributes: Dict[Any, Any]) -> None:
1496 for mp in self._all_mappers():
1497 load_criteria = attributes.setdefault(
1498 ("additional_entity_criteria", mp), []
1499 )
1501 load_criteria.append(self)
1504inspection._inspects(AliasedClass)(lambda target: target._aliased_insp)
1507@inspection._inspects(type)
1508def _inspect_mc(
1509 class_: Type[_O],
1510) -> Optional[Mapper[_O]]:
1511 try:
1512 class_manager = opt_manager_of_class(class_)
1513 if class_manager is None or not class_manager.is_mapped:
1514 return None
1515 mapper = class_manager.mapper
1516 except exc.NO_STATE:
1517 return None
1518 else:
1519 return mapper
1522GenericAlias = type(List[Any])
1525@inspection._inspects(GenericAlias)
1526def _inspect_generic_alias(
1527 class_: Type[_O],
1528) -> Optional[Mapper[_O]]:
1529 origin = cast("Type[_O]", get_origin(class_))
1530 return _inspect_mc(origin)
1533@inspection._self_inspects
1534class Bundle(
1535 ORMColumnsClauseRole[_T],
1536 SupportsCloneAnnotations,
1537 MemoizedHasCacheKey,
1538 inspection.Inspectable["Bundle[_T]"],
1539 InspectionAttr,
1540):
1541 """A grouping of SQL expressions that are returned by a :class:`.Query`
1542 under one namespace.
1544 The :class:`.Bundle` essentially allows nesting of the tuple-based
1545 results returned by a column-oriented :class:`_query.Query` object.
1546 It also
1547 is extensible via simple subclassing, where the primary capability
1548 to override is that of how the set of expressions should be returned,
1549 allowing post-processing as well as custom return types, without
1550 involving ORM identity-mapped classes.
1552 .. seealso::
1554 :ref:`bundles`
1557 """
1559 single_entity = False
1560 """If True, queries for a single Bundle will be returned as a single
1561 entity, rather than an element within a keyed tuple."""
1563 is_clause_element = False
1565 is_mapper = False
1567 is_aliased_class = False
1569 is_bundle = True
1571 _propagate_attrs: _PropagateAttrsType = util.immutabledict()
1573 proxy_set = util.EMPTY_SET
1575 exprs: List[_ColumnsClauseElement]
1577 def __init__(
1578 self, name: str, *exprs: _ColumnExpressionArgument[Any], **kw: Any
1579 ):
1580 r"""Construct a new :class:`.Bundle`.
1582 e.g.::
1584 bn = Bundle("mybundle", MyClass.x, MyClass.y)
1586 for row in session.query(bn).filter(bn.c.x == 5).filter(bn.c.y == 4):
1587 print(row.mybundle.x, row.mybundle.y)
1589 :param name: name of the bundle.
1590 :param \*exprs: columns or SQL expressions comprising the bundle.
1591 :param single_entity=False: if True, rows for this :class:`.Bundle`
1592 can be returned as a "single entity" outside of any enclosing tuple
1593 in the same manner as a mapped entity.
1595 """ # noqa: E501
1596 self.name = self._label = name
1597 coerced_exprs = [
1598 coercions.expect(
1599 roles.ColumnsClauseRole, expr, apply_propagate_attrs=self
1600 )
1601 for expr in exprs
1602 ]
1603 self.exprs = coerced_exprs
1605 self.c = self.columns = ColumnCollection(
1606 (getattr(col, "key", col._label), col)
1607 for col in [e._annotations.get("bundle", e) for e in coerced_exprs]
1608 ).as_readonly()
1609 self.single_entity = kw.pop("single_entity", self.single_entity)
1611 def _gen_cache_key(
1612 self, anon_map: anon_map, bindparams: List[BindParameter[Any]]
1613 ) -> Tuple[Any, ...]:
1614 return (self.__class__, self.name, self.single_entity) + tuple(
1615 [expr._gen_cache_key(anon_map, bindparams) for expr in self.exprs]
1616 )
1618 @property
1619 def mapper(self) -> Optional[Mapper[Any]]:
1620 mp: Optional[Mapper[Any]] = self.exprs[0]._annotations.get(
1621 "parentmapper", None
1622 )
1623 return mp
1625 @property
1626 def entity(self) -> Optional[_InternalEntityType[Any]]:
1627 ie: Optional[_InternalEntityType[Any]] = self.exprs[
1628 0
1629 ]._annotations.get("parententity", None)
1630 return ie
1632 @property
1633 def entity_namespace(
1634 self,
1635 ) -> ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]:
1636 return self.c
1638 columns: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]
1640 """A namespace of SQL expressions referred to by this :class:`.Bundle`.
1642 e.g.::
1644 bn = Bundle("mybundle", MyClass.x, MyClass.y)
1646 q = sess.query(bn).filter(bn.c.x == 5)
1648 Nesting of bundles is also supported::
1650 b1 = Bundle(
1651 "b1",
1652 Bundle("b2", MyClass.a, MyClass.b),
1653 Bundle("b3", MyClass.x, MyClass.y),
1654 )
1656 q = sess.query(b1).filter(b1.c.b2.c.a == 5).filter(b1.c.b3.c.y == 9)
1658 .. seealso::
1660 :attr:`.Bundle.c`
1662 """ # noqa: E501
1664 c: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]
1665 """An alias for :attr:`.Bundle.columns`."""
1667 def _clone(self, **kw):
1668 cloned = self.__class__.__new__(self.__class__)
1669 cloned.__dict__.update(self.__dict__)
1670 return cloned
1672 def __clause_element__(self):
1673 # ensure existing entity_namespace remains
1674 annotations = {"bundle": self, "entity_namespace": self}
1675 annotations.update(self._annotations)
1677 plugin_subject = self.exprs[0]._propagate_attrs.get(
1678 "plugin_subject", self.entity
1679 )
1680 return (
1681 expression.ClauseList(
1682 _literal_as_text_role=roles.ColumnsClauseRole,
1683 group=False,
1684 *[e._annotations.get("bundle", e) for e in self.exprs],
1685 )
1686 ._annotate(annotations)
1687 ._set_propagate_attrs(
1688 # the Bundle *must* use the orm plugin no matter what. the
1689 # subject can be None but it's much better if it's not.
1690 {
1691 "compile_state_plugin": "orm",
1692 "plugin_subject": plugin_subject,
1693 }
1694 )
1695 )
1697 @property
1698 def clauses(self):
1699 return self.__clause_element__().clauses
1701 def label(self, name):
1702 """Provide a copy of this :class:`.Bundle` passing a new label."""
1704 cloned = self._clone()
1705 cloned.name = name
1706 return cloned
1708 def create_row_processor(
1709 self,
1710 query: Select[Any],
1711 procs: Sequence[Callable[[Row[Any]], Any]],
1712 labels: Sequence[str],
1713 ) -> Callable[[Row[Any]], Any]:
1714 """Produce the "row processing" function for this :class:`.Bundle`.
1716 May be overridden by subclasses to provide custom behaviors when
1717 results are fetched. The method is passed the statement object and a
1718 set of "row processor" functions at query execution time; these
1719 processor functions when given a result row will return the individual
1720 attribute value, which can then be adapted into any kind of return data
1721 structure.
1723 The example below illustrates replacing the usual :class:`.Row`
1724 return structure with a straight Python dictionary::
1726 from sqlalchemy.orm import Bundle
1729 class DictBundle(Bundle):
1730 def create_row_processor(self, query, procs, labels):
1731 "Override create_row_processor to return values as dictionaries"
1733 def proc(row):
1734 return dict(zip(labels, (proc(row) for proc in procs)))
1736 return proc
1738 A result from the above :class:`_orm.Bundle` will return dictionary
1739 values::
1741 bn = DictBundle("mybundle", MyClass.data1, MyClass.data2)
1742 for row in session.execute(select(bn)).where(bn.c.data1 == "d1"):
1743 print(row.mybundle["data1"], row.mybundle["data2"])
1745 """ # noqa: E501
1746 keyed_tuple = result_tuple(labels, [() for l in labels])
1748 def proc(row: Row[Any]) -> Any:
1749 return keyed_tuple([proc(row) for proc in procs])
1751 return proc
1754def _orm_annotate(element: _SA, exclude: Optional[Any] = None) -> _SA:
1755 """Deep copy the given ClauseElement, annotating each element with the
1756 "_orm_adapt" flag.
1758 Elements within the exclude collection will be cloned but not annotated.
1760 """
1761 return sql_util._deep_annotate(element, {"_orm_adapt": True}, exclude)
1764def _orm_deannotate(element: _SA) -> _SA:
1765 """Remove annotations that link a column to a particular mapping.
1767 Note this doesn't affect "remote" and "foreign" annotations
1768 passed by the :func:`_orm.foreign` and :func:`_orm.remote`
1769 annotators.
1771 """
1773 return sql_util._deep_deannotate(
1774 element, values=("_orm_adapt", "parententity")
1775 )
1778def _orm_full_deannotate(element: _SA) -> _SA:
1779 return sql_util._deep_deannotate(element)
1782class _ORMJoin(expression.Join):
1783 """Extend Join to support ORM constructs as input."""
1785 __visit_name__ = expression.Join.__visit_name__
1787 inherit_cache = True
1789 def __init__(
1790 self,
1791 left: _FromClauseArgument,
1792 right: _FromClauseArgument,
1793 onclause: Optional[_OnClauseArgument] = None,
1794 isouter: bool = False,
1795 full: bool = False,
1796 _left_memo: Optional[Any] = None,
1797 _right_memo: Optional[Any] = None,
1798 _extra_criteria: Tuple[ColumnElement[bool], ...] = (),
1799 ):
1800 left_info = cast(
1801 "Union[FromClause, _InternalEntityType[Any]]",
1802 inspection.inspect(left),
1803 )
1805 right_info = cast(
1806 "Union[FromClause, _InternalEntityType[Any]]",
1807 inspection.inspect(right),
1808 )
1809 adapt_to = right_info.selectable
1811 # used by joined eager loader
1812 self._left_memo = _left_memo
1813 self._right_memo = _right_memo
1815 if isinstance(onclause, attributes.QueryableAttribute):
1816 if TYPE_CHECKING:
1817 assert isinstance(
1818 onclause.comparator, RelationshipProperty.Comparator
1819 )
1820 on_selectable = onclause.comparator._source_selectable()
1821 prop = onclause.property
1822 _extra_criteria += onclause._extra_criteria
1823 elif isinstance(onclause, MapperProperty):
1824 # used internally by joined eager loader...possibly not ideal
1825 prop = onclause
1826 on_selectable = prop.parent.selectable
1827 else:
1828 prop = None
1829 on_selectable = None
1831 left_selectable = left_info.selectable
1832 if prop:
1833 adapt_from: Optional[FromClause]
1834 if sql_util.clause_is_present(on_selectable, left_selectable):
1835 adapt_from = on_selectable
1836 else:
1837 assert isinstance(left_selectable, FromClause)
1838 adapt_from = left_selectable
1840 (
1841 pj,
1842 sj,
1843 source,
1844 dest,
1845 secondary,
1846 target_adapter,
1847 ) = prop._create_joins(
1848 source_selectable=adapt_from,
1849 dest_selectable=adapt_to,
1850 source_polymorphic=True,
1851 of_type_entity=right_info,
1852 alias_secondary=True,
1853 extra_criteria=_extra_criteria,
1854 )
1856 if sj is not None:
1857 if isouter:
1858 # note this is an inner join from secondary->right
1859 right = sql.join(secondary, right, sj)
1860 onclause = pj
1861 else:
1862 left = sql.join(left, secondary, pj, isouter)
1863 onclause = sj
1864 else:
1865 onclause = pj
1867 self._target_adapter = target_adapter
1869 # we don't use the normal coercions logic for _ORMJoin
1870 # (probably should), so do some gymnastics to get the entity.
1871 # logic here is for #8721, which was a major bug in 1.4
1872 # for almost two years, not reported/fixed until 1.4.43 (!)
1873 if is_selectable(left_info):
1874 parententity = left_selectable._annotations.get(
1875 "parententity", None
1876 )
1877 elif insp_is_mapper(left_info) or insp_is_aliased_class(left_info):
1878 parententity = left_info
1879 else:
1880 parententity = None
1882 if parententity is not None:
1883 self._annotations = self._annotations.union(
1884 {"parententity": parententity}
1885 )
1887 augment_onclause = bool(_extra_criteria) and not prop
1888 expression.Join.__init__(self, left, right, onclause, isouter, full)
1890 assert self.onclause is not None
1892 if augment_onclause:
1893 self.onclause &= sql.and_(*_extra_criteria)
1895 if (
1896 not prop
1897 and getattr(right_info, "mapper", None)
1898 and right_info.mapper.single # type: ignore
1899 ):
1900 right_info = cast("_InternalEntityType[Any]", right_info)
1901 # if single inheritance target and we are using a manual
1902 # or implicit ON clause, augment it the same way we'd augment the
1903 # WHERE.
1904 single_crit = right_info.mapper._single_table_criterion
1905 if single_crit is not None:
1906 if insp_is_aliased_class(right_info):
1907 single_crit = right_info._adapter.traverse(single_crit)
1908 self.onclause = self.onclause & single_crit
1910 def _splice_into_center(self, other):
1911 """Splice a join into the center.
1913 Given join(a, b) and join(b, c), return join(a, b).join(c)
1915 """
1916 leftmost = other
1917 while isinstance(leftmost, sql.Join):
1918 leftmost = leftmost.left
1920 assert self.right is leftmost
1922 left = _ORMJoin(
1923 self.left,
1924 other.left,
1925 self.onclause,
1926 isouter=self.isouter,
1927 _left_memo=self._left_memo,
1928 _right_memo=other._left_memo._path_registry,
1929 )
1931 return _ORMJoin(
1932 left,
1933 other.right,
1934 other.onclause,
1935 isouter=other.isouter,
1936 _right_memo=other._right_memo,
1937 )
1939 def join(
1940 self,
1941 right: _FromClauseArgument,
1942 onclause: Optional[_OnClauseArgument] = None,
1943 isouter: bool = False,
1944 full: bool = False,
1945 ) -> _ORMJoin:
1946 return _ORMJoin(self, right, onclause, full=full, isouter=isouter)
1948 def outerjoin(
1949 self,
1950 right: _FromClauseArgument,
1951 onclause: Optional[_OnClauseArgument] = None,
1952 full: bool = False,
1953 ) -> _ORMJoin:
1954 return _ORMJoin(self, right, onclause, isouter=True, full=full)
1957def with_parent(
1958 instance: object,
1959 prop: attributes.QueryableAttribute[Any],
1960 from_entity: Optional[_EntityType[Any]] = None,
1961) -> ColumnElement[bool]:
1962 """Create filtering criterion that relates this query's primary entity
1963 to the given related instance, using established
1964 :func:`_orm.relationship()`
1965 configuration.
1967 E.g.::
1969 stmt = select(Address).where(with_parent(some_user, User.addresses))
1971 The SQL rendered is the same as that rendered when a lazy loader
1972 would fire off from the given parent on that attribute, meaning
1973 that the appropriate state is taken from the parent object in
1974 Python without the need to render joins to the parent table
1975 in the rendered statement.
1977 The given property may also make use of :meth:`_orm.PropComparator.of_type`
1978 to indicate the left side of the criteria::
1981 a1 = aliased(Address)
1982 a2 = aliased(Address)
1983 stmt = select(a1, a2).where(with_parent(u1, User.addresses.of_type(a2)))
1985 The above use is equivalent to using the
1986 :func:`_orm.with_parent.from_entity` argument::
1988 a1 = aliased(Address)
1989 a2 = aliased(Address)
1990 stmt = select(a1, a2).where(
1991 with_parent(u1, User.addresses, from_entity=a2)
1992 )
1994 :param instance:
1995 An instance which has some :func:`_orm.relationship`.
1997 :param property:
1998 Class-bound attribute, which indicates
1999 what relationship from the instance should be used to reconcile the
2000 parent/child relationship.
2002 :param from_entity:
2003 Entity in which to consider as the left side. This defaults to the
2004 "zero" entity of the :class:`_query.Query` itself.
2006 .. versionadded:: 1.2
2008 """ # noqa: E501
2009 prop_t: RelationshipProperty[Any]
2011 if isinstance(prop, str):
2012 raise sa_exc.ArgumentError(
2013 "with_parent() accepts class-bound mapped attributes, not strings"
2014 )
2015 elif isinstance(prop, attributes.QueryableAttribute):
2016 if prop._of_type:
2017 from_entity = prop._of_type
2018 mapper_property = prop.property
2019 if mapper_property is None or not prop_is_relationship(
2020 mapper_property
2021 ):
2022 raise sa_exc.ArgumentError(
2023 f"Expected relationship property for with_parent(), "
2024 f"got {mapper_property}"
2025 )
2026 prop_t = mapper_property
2027 else:
2028 prop_t = prop
2030 return prop_t._with_parent(instance, from_entity=from_entity)
2033def has_identity(object_: object) -> bool:
2034 """Return True if the given object has a database
2035 identity.
2037 This typically corresponds to the object being
2038 in either the persistent or detached state.
2040 .. seealso::
2042 :func:`.was_deleted`
2044 """
2045 state = attributes.instance_state(object_)
2046 return state.has_identity
2049def was_deleted(object_: object) -> bool:
2050 """Return True if the given object was deleted
2051 within a session flush.
2053 This is regardless of whether or not the object is
2054 persistent or detached.
2056 .. seealso::
2058 :attr:`.InstanceState.was_deleted`
2060 """
2062 state = attributes.instance_state(object_)
2063 return state.was_deleted
2066def _entity_corresponds_to(
2067 given: _InternalEntityType[Any], entity: _InternalEntityType[Any]
2068) -> bool:
2069 """determine if 'given' corresponds to 'entity', in terms
2070 of an entity passed to Query that would match the same entity
2071 being referred to elsewhere in the query.
2073 """
2074 if insp_is_aliased_class(entity):
2075 if insp_is_aliased_class(given):
2076 if entity._base_alias() is given._base_alias():
2077 return True
2078 return False
2079 elif insp_is_aliased_class(given):
2080 if given._use_mapper_path:
2081 return entity in given.with_polymorphic_mappers
2082 else:
2083 return entity is given
2085 assert insp_is_mapper(given)
2086 return entity.common_parent(given)
2089def _entity_corresponds_to_use_path_impl(
2090 given: _InternalEntityType[Any], entity: _InternalEntityType[Any]
2091) -> bool:
2092 """determine if 'given' corresponds to 'entity', in terms
2093 of a path of loader options where a mapped attribute is taken to
2094 be a member of a parent entity.
2096 e.g.::
2098 someoption(A).someoption(A.b) # -> fn(A, A) -> True
2099 someoption(A).someoption(C.d) # -> fn(A, C) -> False
2101 a1 = aliased(A)
2102 someoption(a1).someoption(A.b) # -> fn(a1, A) -> False
2103 someoption(a1).someoption(a1.b) # -> fn(a1, a1) -> True
2105 wp = with_polymorphic(A, [A1, A2])
2106 someoption(wp).someoption(A1.foo) # -> fn(wp, A1) -> False
2107 someoption(wp).someoption(wp.A1.foo) # -> fn(wp, wp.A1) -> True
2109 """
2110 if insp_is_aliased_class(given):
2111 return (
2112 insp_is_aliased_class(entity)
2113 and not entity._use_mapper_path
2114 and (given is entity or entity in given._with_polymorphic_entities)
2115 )
2116 elif not insp_is_aliased_class(entity):
2117 return given.isa(entity.mapper)
2118 else:
2119 return (
2120 entity._use_mapper_path
2121 and given in entity.with_polymorphic_mappers
2122 )
2125def _entity_isa(given: _InternalEntityType[Any], mapper: Mapper[Any]) -> bool:
2126 """determine if 'given' "is a" mapper, in terms of the given
2127 would load rows of type 'mapper'.
2129 """
2130 if given.is_aliased_class:
2131 return mapper in given.with_polymorphic_mappers or given.mapper.isa(
2132 mapper
2133 )
2134 elif given.with_polymorphic_mappers:
2135 return mapper in given.with_polymorphic_mappers or given.isa(mapper)
2136 else:
2137 return given.isa(mapper)
2140def _getitem(iterable_query: Query[Any], item: Any) -> Any:
2141 """calculate __getitem__ in terms of an iterable query object
2142 that also has a slice() method.
2144 """
2146 def _no_negative_indexes():
2147 raise IndexError(
2148 "negative indexes are not accepted by SQL "
2149 "index / slice operators"
2150 )
2152 if isinstance(item, slice):
2153 start, stop, step = util.decode_slice(item)
2155 if (
2156 isinstance(stop, int)
2157 and isinstance(start, int)
2158 and stop - start <= 0
2159 ):
2160 return []
2162 elif (isinstance(start, int) and start < 0) or (
2163 isinstance(stop, int) and stop < 0
2164 ):
2165 _no_negative_indexes()
2167 res = iterable_query.slice(start, stop)
2168 if step is not None:
2169 return list(res)[None : None : item.step]
2170 else:
2171 return list(res)
2172 else:
2173 if item == -1:
2174 _no_negative_indexes()
2175 else:
2176 return list(iterable_query[item : item + 1])[0]
2179def _is_mapped_annotation(
2180 raw_annotation: _AnnotationScanType,
2181 cls: Type[Any],
2182 originating_cls: Type[Any],
2183) -> bool:
2184 try:
2185 annotated = de_stringify_annotation(
2186 cls, raw_annotation, originating_cls.__module__
2187 )
2188 except NameError:
2189 # in most cases, at least within our own tests, we can raise
2190 # here, which is more accurate as it prevents us from returning
2191 # false negatives. However, in the real world, try to avoid getting
2192 # involved with end-user annotations that have nothing to do with us.
2193 # see issue #8888 where we bypass using this function in the case
2194 # that we want to detect an unresolvable Mapped[] type.
2195 return False
2196 else:
2197 return is_origin_of_cls(annotated, _MappedAnnotationBase)
2200class _CleanupError(Exception):
2201 pass
2204def _cleanup_mapped_str_annotation(
2205 annotation: str, originating_module: str
2206) -> str:
2207 # fix up an annotation that comes in as the form:
2208 # 'Mapped[List[Address]]' so that it instead looks like:
2209 # 'Mapped[List["Address"]]' , which will allow us to get
2210 # "Address" as a string
2212 # additionally, resolve symbols for these names since this is where
2213 # we'd have to do it
2215 inner: Optional[Match[str]]
2217 mm = re.match(r"^([^ \|]+?)\[(.+)\]$", annotation)
2219 if not mm:
2220 return annotation
2222 # ticket #8759. Resolve the Mapped name to a real symbol.
2223 # originally this just checked the name.
2224 try:
2225 obj = eval_name_only(mm.group(1), originating_module)
2226 except NameError as ne:
2227 raise _CleanupError(
2228 f'For annotation "{annotation}", could not resolve '
2229 f'container type "{mm.group(1)}". '
2230 "Please ensure this type is imported at the module level "
2231 "outside of TYPE_CHECKING blocks"
2232 ) from ne
2234 if obj is typing.ClassVar:
2235 real_symbol = "ClassVar"
2236 else:
2237 try:
2238 if issubclass(obj, _MappedAnnotationBase):
2239 real_symbol = obj.__name__
2240 else:
2241 return annotation
2242 except TypeError:
2243 # avoid isinstance(obj, type) check, just catch TypeError
2244 return annotation
2246 # note: if one of the codepaths above didn't define real_symbol and
2247 # then didn't return, real_symbol raises UnboundLocalError
2248 # which is actually a NameError, and the calling routines don't
2249 # notice this since they are catching NameError anyway. Just in case
2250 # this is being modified in the future, something to be aware of.
2252 stack = []
2253 inner = mm
2254 while True:
2255 stack.append(real_symbol if mm is inner else inner.group(1))
2256 g2 = inner.group(2)
2257 inner = re.match(r"^([^ \|]+?)\[(.+)\]$", g2)
2258 if inner is None:
2259 stack.append(g2)
2260 break
2262 # stacks we want to rewrite, that is, quote the last entry which
2263 # we think is a relationship class name:
2264 #
2265 # ['Mapped', 'List', 'Address']
2266 # ['Mapped', 'A']
2267 #
2268 # stacks we dont want to rewrite, which are generally MappedColumn
2269 # use cases:
2270 #
2271 # ['Mapped', "'Optional[Dict[str, str]]'"]
2272 # ['Mapped', 'dict[str, str] | None']
2274 if (
2275 # avoid already quoted symbols such as
2276 # ['Mapped', "'Optional[Dict[str, str]]'"]
2277 not re.match(r"""^["'].*["']$""", stack[-1])
2278 # avoid further generics like Dict[] such as
2279 # ['Mapped', 'dict[str, str] | None'],
2280 # ['Mapped', 'list[int] | list[str]'],
2281 # ['Mapped', 'Union[list[int], list[str]]'],
2282 and not re.search(r"[\[\]]", stack[-1])
2283 ):
2284 stripchars = "\"' "
2285 stack[-1] = ", ".join(
2286 f'"{elem.strip(stripchars)}"' for elem in stack[-1].split(",")
2287 )
2289 annotation = "[".join(stack) + ("]" * (len(stack) - 1))
2291 return annotation
2294def _extract_mapped_subtype(
2295 raw_annotation: Optional[_AnnotationScanType],
2296 cls: type,
2297 originating_module: str,
2298 key: str,
2299 attr_cls: Type[Any],
2300 required: bool,
2301 is_dataclass_field: bool,
2302 expect_mapped: bool = True,
2303 raiseerr: bool = True,
2304) -> Optional[Tuple[Union[_AnnotationScanType, str], Optional[type]]]:
2305 """given an annotation, figure out if it's ``Mapped[something]`` and if
2306 so, return the ``something`` part.
2308 Includes error raise scenarios and other options.
2310 """
2312 if raw_annotation is None:
2313 if required:
2314 raise orm_exc.MappedAnnotationError(
2315 f"Python typing annotation is required for attribute "
2316 f'"{cls.__name__}.{key}" when primary argument(s) for '
2317 f'"{attr_cls.__name__}" construct are None or not present'
2318 )
2319 return None
2321 try:
2322 # destringify the "outside" of the annotation. note we are not
2323 # adding include_generic so it will *not* dig into generic contents,
2324 # which will remain as ForwardRef or plain str under future annotations
2325 # mode. The full destringify happens later when mapped_column goes
2326 # to do a full lookup in the registry type_annotations_map.
2327 annotated = de_stringify_annotation(
2328 cls,
2329 raw_annotation,
2330 originating_module,
2331 str_cleanup_fn=_cleanup_mapped_str_annotation,
2332 )
2333 except _CleanupError as ce:
2334 raise orm_exc.MappedAnnotationError(
2335 f"Could not interpret annotation {raw_annotation}. "
2336 "Check that it uses names that are correctly imported at the "
2337 "module level. See chained stack trace for more hints."
2338 ) from ce
2339 except NameError as ne:
2340 if raiseerr and "Mapped[" in raw_annotation: # type: ignore
2341 raise orm_exc.MappedAnnotationError(
2342 f"Could not interpret annotation {raw_annotation}. "
2343 "Check that it uses names that are correctly imported at the "
2344 "module level. See chained stack trace for more hints."
2345 ) from ne
2347 annotated = raw_annotation # type: ignore
2349 if is_dataclass_field:
2350 return annotated, None
2351 else:
2352 if not hasattr(annotated, "__origin__") or not is_origin_of_cls(
2353 annotated, _MappedAnnotationBase
2354 ):
2355 if expect_mapped:
2356 if not raiseerr:
2357 return None
2359 origin = getattr(annotated, "__origin__", None)
2360 if origin is typing.ClassVar:
2361 return None
2363 # check for other kind of ORM descriptor like AssociationProxy,
2364 # don't raise for that (issue #9957)
2365 elif isinstance(origin, type) and issubclass(
2366 origin, ORMDescriptor
2367 ):
2368 return None
2370 raise orm_exc.MappedAnnotationError(
2371 f'Type annotation for "{cls.__name__}.{key}" '
2372 "can't be correctly interpreted for "
2373 "Annotated Declarative Table form. ORM annotations "
2374 "should normally make use of the ``Mapped[]`` generic "
2375 "type, or other ORM-compatible generic type, as a "
2376 "container for the actual type, which indicates the "
2377 "intent that the attribute is mapped. "
2378 "Class variables that are not intended to be mapped "
2379 "by the ORM should use ClassVar[]. "
2380 "To allow Annotated Declarative to disregard legacy "
2381 "annotations which don't use Mapped[] to pass, set "
2382 '"__allow_unmapped__ = True" on the class or a '
2383 "superclass this class.",
2384 code="zlpr",
2385 )
2387 else:
2388 return annotated, None
2390 if len(annotated.__args__) != 1:
2391 raise orm_exc.MappedAnnotationError(
2392 "Expected sub-type for Mapped[] annotation"
2393 )
2395 return (
2396 # fix dict/list/set args to be ForwardRef, see #11814
2397 fixup_container_fwd_refs(annotated.__args__[0]),
2398 annotated.__origin__,
2399 )
2402def _mapper_property_as_plain_name(prop: Type[Any]) -> str:
2403 if hasattr(prop, "_mapper_property_name"):
2404 name = prop._mapper_property_name()
2405 else:
2406 name = None
2407 return util.clsname_as_plain_name(prop, name)